arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexicographic Sorts (lexsort)
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! # Flattening Dictionaries
121//!
122//! For performance reasons, dictionary arrays are flattened ("hydrated") to their
123//! underlying values during row conversion. See [the issue] for more details.
124//!
125//! This means that the arrays that come out of [`RowConverter::convert_rows`]
126//! may not have the same data types as the input arrays. For example, encoding
127//! a `Dictionary<Int8, Utf8>` and then will come out as a `Utf8` array.
128//!
129//! ```
130//! # use arrow_array::{Array, ArrayRef, DictionaryArray};
131//! # use arrow_array::types::Int8Type;
132//! # use arrow_row::{RowConverter, SortField};
133//! # use arrow_schema::DataType;
134//! # use std::sync::Arc;
135//! // Input is a Dictionary array
136//! let dict: DictionaryArray::<Int8Type> = ["a", "b", "c", "a", "b"].into_iter().collect();
137//! let sort_fields = vec![SortField::new(dict.data_type().clone())];
138//! let arrays = vec![Arc::new(dict) as ArrayRef];
139//! let converter = RowConverter::new(sort_fields).unwrap();
140//! // Convert to rows
141//! let rows = converter.convert_columns(&arrays).unwrap();
142//! let converted = converter.convert_rows(&rows).unwrap();
143//! // result was a Utf8 array, not a Dictionary array
144//! assert_eq!(converted[0].data_type(), &DataType::Utf8);
145//! ```
146//!
147//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
148//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
149//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
150//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
151//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
152//! [compared]: PartialOrd
153//! [compare]: PartialOrd
154//! [the issue]: https://github.com/apache/arrow-rs/issues/4811
155
156#![doc(
157    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::{ArrowDictionaryKeyType, ByteViewType};
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
185///
186/// *Note: The encoding of the row format may change from release to release.*
187///
188/// ## Overview
189///
190/// The row format is a variable length byte sequence created by
191/// concatenating the encoded form of each column. The encoding for
192/// each column depends on its datatype (and sort options).
193///
194/// The encoding is carefully designed in such a way that escaping is
195/// unnecessary: it is never ambiguous as to whether a byte is part of
196/// a sentinel (e.g. null) or a value.
197///
198/// ## Unsigned Integer Encoding
199///
200/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
201/// to the integer's length.
202///
203/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
204/// integer.
205///
206/// ```text
207///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
208///    3          │03│00│00│00│      │01│00│00│00│03│
209///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
210///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
211///   258         │02│01│00│00│      │01│00│00│01│02│
212///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
213///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
214///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
215///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
216///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
217///  NULL         │??│??│??│??│      │00│00│00│00│00│
218///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
219///
220///              32-bit (4 bytes)        Row Format
221///  Value        Little Endian
222/// ```
223///
224/// ## Signed Integer Encoding
225///
226/// Signed integers have their most significant sign bit flipped, and are then encoded in the
227/// same manner as an unsigned integer.
228///
229/// ```text
230///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
231///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
232///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
233///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
234///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
235///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
236///
237///  Value  32-bit (4 bytes)    High bit flipped      Row Format
238///          Little Endian
239/// ```
240///
241/// ## Float Encoding
242///
243/// Floats are converted from IEEE 754 representation to a signed integer representation
244/// by flipping all bar the sign bit if they are negative.
245///
246/// They are then encoded in the same manner as a signed integer.
247///
248/// ## Fixed Length Bytes Encoding
249///
250/// Fixed length bytes are encoded in the same fashion as primitive types above.
251///
252/// For a fixed length array of length `n`:
253///
254/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
255///
256/// A valid value is encoded as `1_u8` followed by the value bytes
257///
258/// ## Variable Length Bytes (including Strings) Encoding
259///
260/// A null is encoded as a `0_u8`.
261///
262/// An empty byte array is encoded as `1_u8`.
263///
264/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
265/// encoded using a block based scheme described below.
266///
267/// The byte array is broken up into fixed-width blocks, each block is written in turn
268/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
269/// with `0_u8` and written to the output, followed by the un-padded length in bytes
270/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
271/// blocks using a length of 32, this is to reduce space amplification for small strings.
272///
273/// Note the following example encodings use a block size of 4 bytes for brevity:
274///
275/// ```text
276///                       ┌───┬───┬───┬───┬───┬───┐
277///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
278///                       └───┴───┴───┴───┴───┴───┘
279///
280///                       ┌───┐
281///  ""                   │01 |
282///                       └───┘
283///
284///  NULL                 ┌───┐
285///                       │00 │
286///                       └───┘
287///
288/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
289///                       │02 │'D'│'e'│'f'│'e'│FF │
290///                       └───┼───┼───┼───┼───┼───┤
291///                           │'n'│'e'│'s'│'t'│FF │
292///                           ├───┼───┼───┼───┼───┤
293///                           │'r'│'a'│'t'│'r'│FF │
294///                           ├───┼───┼───┼───┼───┤
295///                           │'a'│'t'│'i'│'o'│FF │
296///                           ├───┼───┼───┼───┼───┤
297///                           │'n'│00 │00 │00 │01 │
298///                           └───┴───┴───┴───┴───┘
299/// ```
300///
301/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
302/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
303///
304/// ## Dictionary Encoding
305///
306/// Dictionary encoded arrays are hydrated to their underlying values
307///
308/// ## REE Encoding
309///
310/// REE (Run End Encoding) arrays, A form of Run Length Encoding, are hydrated to their underlying values.
311///
312/// ## Struct Encoding
313///
314/// A null is encoded as a `0_u8`.
315///
316/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
317///
318/// This encoding effectively flattens the schema in a depth-first fashion.
319///
320/// For example
321///
322/// ```text
323/// ┌───────┬────────────────────────┬───────┐
324/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
325/// └───────┴────────────────────────┴───────┘
326/// ```
327///
328/// Is encoded as
329///
330/// ```text
331/// ┌───────┬───────────────┬───────┬─────────┬───────┐
332/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
333/// └───────┴───────────────┴───────┴─────────┴───────┘
334/// ```
335///
336/// ## List Encoding
337///
338/// Lists are encoded by first encoding all child elements to the row format.
339///
340/// A list value is then encoded as the concatenation of each of the child elements,
341/// separately encoded using the variable length encoding described above, followed
342/// by the variable length encoding of an empty byte array.
343///
344/// For example given:
345///
346/// ```text
347/// [1_u8, 2_u8, 3_u8]
348/// [1_u8, null]
349/// []
350/// null
351/// ```
352///
353/// The elements would be converted to:
354///
355/// ```text
356///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
357///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
358///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
359///```
360///
361/// Which would be encoded as
362///
363/// ```text
364///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
365///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
366///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
367///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
368///
369///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
370///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
371///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
372///                          └──── 1_u8 ────┘   └──── null ────┘
373///
374///```
375///
376/// With `[]` represented by an empty byte array, and `null` a null byte array.
377///
378/// ## Fixed Size List Encoding
379///
380/// Fixed Size Lists are encoded by first encoding all child elements to the row format.
381///
382/// A non-null list value is then encoded as 0x01 followed by the concatenation of each
383/// of the child elements. A null list value is encoded as a null marker.
384///
385/// For example given:
386///
387/// ```text
388/// [1_u8, 2_u8]
389/// [3_u8, null]
390/// null
391/// ```
392///
393/// The elements would be converted to:
394///
395/// ```text
396///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
397///  1  │01│01│  2  │01│02│  3  │01│03│  null  │00│00│
398///     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
399///```
400///
401/// Which would be encoded as
402///
403/// ```text
404///                 ┌──┬──┬──┬──┬──┐
405///  [1_u8, 2_u8]   │01│01│01│01│02│
406///                 └──┴──┴──┴──┴──┘
407///                     └ 1 ┘ └ 2 ┘
408///                 ┌──┬──┬──┬──┬──┐
409///  [3_u8, null]   │01│01│03│00│00│
410///                 └──┴──┴──┴──┴──┘
411///                     └ 1 ┘ └null┘
412///                 ┌──┐
413///  null           │00│
414///                 └──┘
415///
416///```
417///
418/// ## Union Encoding
419///
420/// A union value is encoded as a single type-id byte followed by the row encoding of the selected child value.
421/// The type-id byte is always present; union arrays have no top-level null marker, so nulls are represented by the child encoding.
422///
423/// For example, given a union of Int32 (type_id = 0) and Utf8 (type_id = 1):
424///
425/// ```text
426///                           ┌──┬──────────────┐
427///  3                        │00│01│80│00│00│03│
428///                           └──┴──────────────┘
429///                            │  └─ signed integer encoding (non-null)
430///                            └──── type_id
431///
432///                           ┌──┬────────────────────────────────┐
433/// "abc"                     │01│02│'a'│'b'│'c'│00│00│00│00│00│03│
434///                           └──┴────────────────────────────────┘
435///                            │  └─ string encoding (non-null)
436///                            └──── type_id
437///
438///                           ┌──┬──────────────┐
439/// null Int32                │00│00│00│00│00│00│
440///                           └──┴──────────────┘
441///                            │  └─ signed integer encoding (null)
442///                            └──── type_id
443///
444///                           ┌──┬──┐
445/// null Utf8                 │01│00│
446///                           └──┴──┘
447///                            │  └─ string encoding (null)
448///                            └──── type_id
449/// ```
450///
451/// See [`UnionArray`] for more details on union types.
452///
453/// # Ordering
454///
455/// ## Float Ordering
456///
457/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
458/// in the IEEE 754 (2008 revision) floating point standard.
459///
460/// The ordering established by this does not always agree with the
461/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
462/// they consider negative and positive zero equal, while this does not
463///
464/// ## Null Ordering
465///
466/// The encoding described above will order nulls first, this can be inverted by representing
467/// nulls as `0xFF_u8` instead of `0_u8`
468///
469/// ## Union Ordering
470///
471/// Values of the same type are ordered according to the ordering of that type.
472/// Values of different types are ordered by their type id.
473/// The type_id is negated when descending order is specified.
474///
475/// ## Reverse Column Ordering
476///
477/// The order of a given column can be reversed by negating the encoded bytes of non-null values
478///
479/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
480/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
481#[derive(Debug)]
482pub struct RowConverter {
483    fields: Arc<[SortField]>,
484    /// State for codecs
485    codecs: Vec<Codec>,
486}
487
488#[derive(Debug)]
489enum Codec {
490    /// No additional codec state is necessary
491    Stateless,
492    /// A row converter for the dictionary values
493    /// and the encoding of a row containing only nulls
494    Dictionary(RowConverter, OwnedRow),
495    /// A row converter for the child fields
496    /// and the encoding of a row containing only nulls
497    Struct(RowConverter, OwnedRow),
498    /// A row converter for the child field
499    List(RowConverter),
500    /// A row converter for the values array of a run-end encoded array
501    RunEndEncoded(RowConverter),
502    /// Row converters for each union field (indexed by type_id)
503    /// and the encoding of null rows for each field
504    Union(Vec<RowConverter>, Vec<OwnedRow>),
505}
506
507impl Codec {
508    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
509        match &sort_field.data_type {
510            DataType::Dictionary(_, values) => {
511                let sort_field =
512                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
513
514                let converter = RowConverter::new(vec![sort_field])?;
515                let null_array = new_null_array(values.as_ref(), 1);
516                let nulls = converter.convert_columns(&[null_array])?;
517
518                let owned = OwnedRow {
519                    data: nulls.buffer.into(),
520                    config: nulls.config,
521                };
522                Ok(Self::Dictionary(converter, owned))
523            }
524            DataType::RunEndEncoded(_, values) => {
525                // Similar to List implementation
526                let options = SortOptions {
527                    descending: false,
528                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
529                };
530
531                let field = SortField::new_with_options(values.data_type().clone(), options);
532                let converter = RowConverter::new(vec![field])?;
533                Ok(Self::RunEndEncoded(converter))
534            }
535            d if !d.is_nested() => Ok(Self::Stateless),
536            DataType::List(f) | DataType::LargeList(f) => {
537                // The encoded contents will be inverted if descending is set to true
538                // As such we set `descending` to false and negate nulls first if it
539                // it set to true
540                let options = SortOptions {
541                    descending: false,
542                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
543                };
544
545                let field = SortField::new_with_options(f.data_type().clone(), options);
546                let converter = RowConverter::new(vec![field])?;
547                Ok(Self::List(converter))
548            }
549            DataType::FixedSizeList(f, _) => {
550                let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
551                let converter = RowConverter::new(vec![field])?;
552                Ok(Self::List(converter))
553            }
554            DataType::Struct(f) => {
555                let sort_fields = f
556                    .iter()
557                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
558                    .collect();
559
560                let converter = RowConverter::new(sort_fields)?;
561                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
562
563                let nulls = converter.convert_columns(&nulls)?;
564                let owned = OwnedRow {
565                    data: nulls.buffer.into(),
566                    config: nulls.config,
567                };
568
569                Ok(Self::Struct(converter, owned))
570            }
571            DataType::Union(fields, _mode) => {
572                // similar to dictionaries and lists, we set descending to false and negate nulls_first
573                // since the encoded contents will be inverted if descending is set
574                let options = SortOptions {
575                    descending: false,
576                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
577                };
578
579                let mut converters = Vec::with_capacity(fields.len());
580                let mut null_rows = Vec::with_capacity(fields.len());
581
582                for (_type_id, field) in fields.iter() {
583                    let sort_field =
584                        SortField::new_with_options(field.data_type().clone(), options);
585                    let converter = RowConverter::new(vec![sort_field])?;
586
587                    let null_array = new_null_array(field.data_type(), 1);
588                    let nulls = converter.convert_columns(&[null_array])?;
589                    let owned = OwnedRow {
590                        data: nulls.buffer.into(),
591                        config: nulls.config,
592                    };
593
594                    converters.push(converter);
595                    null_rows.push(owned);
596                }
597
598                Ok(Self::Union(converters, null_rows))
599            }
600            _ => Err(ArrowError::NotYetImplemented(format!(
601                "not yet implemented: {:?}",
602                sort_field.data_type
603            ))),
604        }
605    }
606
607    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
608        match self {
609            Codec::Stateless => Ok(Encoder::Stateless),
610            Codec::Dictionary(converter, nulls) => {
611                let values = array.as_any_dictionary().values().clone();
612                let rows = converter.convert_columns(&[values])?;
613                Ok(Encoder::Dictionary(rows, nulls.row()))
614            }
615            Codec::Struct(converter, null) => {
616                let v = as_struct_array(array);
617                let rows = converter.convert_columns(v.columns())?;
618                Ok(Encoder::Struct(rows, null.row()))
619            }
620            Codec::List(converter) => {
621                let values = match array.data_type() {
622                    DataType::List(_) => {
623                        let list_array = as_list_array(array);
624                        let first_offset = list_array.offsets()[0] as usize;
625                        let last_offset =
626                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
627
628                        // values can include more data than referenced in the ListArray, only encode
629                        // the referenced values.
630                        list_array
631                            .values()
632                            .slice(first_offset, last_offset - first_offset)
633                    }
634                    DataType::LargeList(_) => {
635                        let list_array = as_large_list_array(array);
636
637                        let first_offset = list_array.offsets()[0] as usize;
638                        let last_offset =
639                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
640
641                        // values can include more data than referenced in the LargeListArray, only encode
642                        // the referenced values.
643                        list_array
644                            .values()
645                            .slice(first_offset, last_offset - first_offset)
646                    }
647                    DataType::FixedSizeList(_, _) => {
648                        as_fixed_size_list_array(array).values().clone()
649                    }
650                    _ => unreachable!(),
651                };
652                let rows = converter.convert_columns(&[values])?;
653                Ok(Encoder::List(rows))
654            }
655            Codec::RunEndEncoded(converter) => {
656                let values = match array.data_type() {
657                    DataType::RunEndEncoded(r, _) => match r.data_type() {
658                        DataType::Int16 => array.as_run::<Int16Type>().values(),
659                        DataType::Int32 => array.as_run::<Int32Type>().values(),
660                        DataType::Int64 => array.as_run::<Int64Type>().values(),
661                        _ => unreachable!("Unsupported run end index type: {r:?}"),
662                    },
663                    _ => unreachable!(),
664                };
665                let rows = converter.convert_columns(std::slice::from_ref(values))?;
666                Ok(Encoder::RunEndEncoded(rows))
667            }
668            Codec::Union(converters, _) => {
669                let union_array = array
670                    .as_any()
671                    .downcast_ref::<UnionArray>()
672                    .expect("expected Union array");
673
674                let type_ids = union_array.type_ids().clone();
675                let offsets = union_array.offsets().cloned();
676
677                let mut child_rows = Vec::with_capacity(converters.len());
678                for (type_id, converter) in converters.iter().enumerate() {
679                    let child_array = union_array.child(type_id as i8);
680                    let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
681                    child_rows.push(rows);
682                }
683
684                Ok(Encoder::Union {
685                    child_rows,
686                    type_ids,
687                    offsets,
688                })
689            }
690        }
691    }
692
693    fn size(&self) -> usize {
694        match self {
695            Codec::Stateless => 0,
696            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
697            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
698            Codec::List(converter) => converter.size(),
699            Codec::RunEndEncoded(converter) => converter.size(),
700            Codec::Union(converters, null_rows) => {
701                converters.iter().map(|c| c.size()).sum::<usize>()
702                    + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
703            }
704        }
705    }
706}
707
708#[derive(Debug)]
709enum Encoder<'a> {
710    /// No additional encoder state is necessary
711    Stateless,
712    /// The encoding of the child array and the encoding of a null row
713    Dictionary(Rows, Row<'a>),
714    /// The row encoding of the child arrays and the encoding of a null row
715    ///
716    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
717    /// values that are masked by a null in the parent StructArray, otherwise
718    /// this would establish an ordering between semantically null values
719    Struct(Rows, Row<'a>),
720    /// The row encoding of the child array
721    List(Rows),
722    /// The row encoding of the values array
723    RunEndEncoded(Rows),
724    /// The row encoding of each union field's child array, type_ids buffer, offsets buffer (for Dense), and mode
725    Union {
726        child_rows: Vec<Rows>,
727        type_ids: ScalarBuffer<i8>,
728        offsets: Option<ScalarBuffer<i32>>,
729    },
730}
731
732/// Configure the data type and sort order for a given column
733#[derive(Debug, Clone, PartialEq, Eq)]
734pub struct SortField {
735    /// Sort options
736    options: SortOptions,
737    /// Data type
738    data_type: DataType,
739}
740
741impl SortField {
742    /// Create a new column with the given data type
743    pub fn new(data_type: DataType) -> Self {
744        Self::new_with_options(data_type, Default::default())
745    }
746
747    /// Create a new column with the given data type and [`SortOptions`]
748    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
749        Self { options, data_type }
750    }
751
752    /// Return size of this instance in bytes.
753    ///
754    /// Includes the size of `Self`.
755    pub fn size(&self) -> usize {
756        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
757    }
758}
759
760impl RowConverter {
761    /// Create a new [`RowConverter`] with the provided schema
762    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
763        if !Self::supports_fields(&fields) {
764            return Err(ArrowError::NotYetImplemented(format!(
765                "Row format support not yet implemented for: {fields:?}"
766            )));
767        }
768
769        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
770        Ok(Self {
771            fields: fields.into(),
772            codecs,
773        })
774    }
775
776    /// Check if the given fields are supported by the row format.
777    pub fn supports_fields(fields: &[SortField]) -> bool {
778        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
779    }
780
781    fn supports_datatype(d: &DataType) -> bool {
782        match d {
783            _ if !d.is_nested() => true,
784            DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
785                Self::supports_datatype(f.data_type())
786            }
787            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
788            DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
789            DataType::Union(fs, _mode) => fs
790                .iter()
791                .all(|(_, f)| Self::supports_datatype(f.data_type())),
792            _ => false,
793        }
794    }
795
796    /// Convert [`ArrayRef`] columns into [`Rows`]
797    ///
798    /// See [`Row`] for information on when [`Row`] can be compared
799    ///
800    /// See [`Self::convert_rows`] for converting [`Rows`] back into [`ArrayRef`]
801    ///
802    /// # Panics
803    ///
804    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
805    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
806        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
807        let mut rows = self.empty_rows(num_rows, 0);
808        self.append(&mut rows, columns)?;
809        Ok(rows)
810    }
811
812    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
813    ///
814    /// See [`Row`] for information on when [`Row`] can be compared
815    ///
816    /// # Panics
817    ///
818    /// Panics if
819    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
820    /// * The provided [`Rows`] were not created by this [`RowConverter`]
821    ///
822    /// ```
823    /// # use std::sync::Arc;
824    /// # use std::collections::HashSet;
825    /// # use arrow_array::cast::AsArray;
826    /// # use arrow_array::StringArray;
827    /// # use arrow_row::{Row, RowConverter, SortField};
828    /// # use arrow_schema::DataType;
829    /// #
830    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
831    /// let a1 = StringArray::from(vec!["hello", "world"]);
832    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
833    ///
834    /// let mut rows = converter.empty_rows(5, 128);
835    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
836    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
837    ///
838    /// let back = converter.convert_rows(&rows).unwrap();
839    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
840    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
841    /// ```
842    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
843        assert!(
844            Arc::ptr_eq(&rows.config.fields, &self.fields),
845            "rows were not produced by this RowConverter"
846        );
847
848        if columns.len() != self.fields.len() {
849            return Err(ArrowError::InvalidArgumentError(format!(
850                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
851                self.fields.len(),
852                columns.len()
853            )));
854        }
855        for colum in columns.iter().skip(1) {
856            if colum.len() != columns[0].len() {
857                return Err(ArrowError::InvalidArgumentError(format!(
858                    "RowConverter columns must all have the same length, expected {} got {}",
859                    columns[0].len(),
860                    colum.len()
861                )));
862            }
863        }
864
865        let encoders = columns
866            .iter()
867            .zip(&self.codecs)
868            .zip(self.fields.iter())
869            .map(|((column, codec), field)| {
870                if !column.data_type().equals_datatype(&field.data_type) {
871                    return Err(ArrowError::InvalidArgumentError(format!(
872                        "RowConverter column schema mismatch, expected {} got {}",
873                        field.data_type,
874                        column.data_type()
875                    )));
876                }
877                codec.encoder(column.as_ref())
878            })
879            .collect::<Result<Vec<_>, _>>()?;
880
881        let write_offset = rows.num_rows();
882        let lengths = row_lengths(columns, &encoders);
883        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
884        rows.buffer.resize(total, 0);
885
886        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
887            // We encode a column at a time to minimise dispatch overheads
888            encode_column(
889                &mut rows.buffer,
890                &mut rows.offsets[write_offset..],
891                column.as_ref(),
892                field.options,
893                &encoder,
894            )
895        }
896
897        if cfg!(debug_assertions) {
898            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
899            rows.offsets
900                .windows(2)
901                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
902        }
903
904        Ok(())
905    }
906
907    /// Convert [`Rows`] columns into [`ArrayRef`]
908    ///
909    /// See [`Self::convert_columns`] for converting [`ArrayRef`] into [`Rows`]
910    ///
911    /// # Panics
912    ///
913    /// Panics if the rows were not produced by this [`RowConverter`]
914    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
915    where
916        I: IntoIterator<Item = Row<'a>>,
917    {
918        let mut validate_utf8 = false;
919        let mut rows: Vec<_> = rows
920            .into_iter()
921            .map(|row| {
922                assert!(
923                    Arc::ptr_eq(&row.config.fields, &self.fields),
924                    "rows were not produced by this RowConverter"
925                );
926                validate_utf8 |= row.config.validate_utf8;
927                row.data
928            })
929            .collect();
930
931        // SAFETY
932        // We have validated that the rows came from this [`RowConverter`]
933        // and therefore must be valid
934        let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
935
936        if cfg!(debug_assertions) {
937            for (i, row) in rows.iter().enumerate() {
938                if !row.is_empty() {
939                    return Err(ArrowError::InvalidArgumentError(format!(
940                        "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
941                        codecs = &self.codecs
942                    )));
943                }
944            }
945        }
946
947        Ok(result)
948    }
949
950    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
951    /// a total length of `data_capacity`
952    ///
953    /// This can be used to buffer a selection of [`Row`]
954    ///
955    /// ```
956    /// # use std::sync::Arc;
957    /// # use std::collections::HashSet;
958    /// # use arrow_array::cast::AsArray;
959    /// # use arrow_array::StringArray;
960    /// # use arrow_row::{Row, RowConverter, SortField};
961    /// # use arrow_schema::DataType;
962    /// #
963    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
964    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
965    ///
966    /// // Convert to row format and deduplicate
967    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
968    /// let mut distinct_rows = converter.empty_rows(3, 100);
969    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
970    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
971    ///
972    /// // Note: we could skip buffering and feed the filtered iterator directly
973    /// // into convert_rows, this is done for demonstration purposes only
974    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
975    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
976    /// assert_eq!(&values, &["hello", "world", "a"]);
977    /// ```
978    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
979        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
980        offsets.push(0);
981
982        Rows {
983            offsets,
984            buffer: Vec::with_capacity(data_capacity),
985            config: RowConfig {
986                fields: self.fields.clone(),
987                validate_utf8: false,
988            },
989        }
990    }
991
992    /// Create a new [Rows] instance from the given binary data.
993    ///
994    /// ```
995    /// # use std::sync::Arc;
996    /// # use std::collections::HashSet;
997    /// # use arrow_array::cast::AsArray;
998    /// # use arrow_array::StringArray;
999    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
1000    /// # use arrow_schema::DataType;
1001    /// #
1002    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1003    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1004    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1005    ///
1006    /// // We can convert rows into binary format and back in batch.
1007    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1008    /// let binary = rows.try_into_binary().expect("known-small array");
1009    /// let converted = converter.from_binary(binary.clone());
1010    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
1011    /// ```
1012    ///
1013    /// # Panics
1014    ///
1015    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
1016    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
1017    /// panic if the data is malformed.
1018    pub fn from_binary(&self, array: BinaryArray) -> Rows {
1019        assert_eq!(
1020            array.null_count(),
1021            0,
1022            "can't construct Rows instance from array with nulls"
1023        );
1024        let (offsets, values, _) = array.into_parts();
1025        let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
1026        // Try zero-copy, if it does not succeed, fall back to copying the values.
1027        let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
1028        Rows {
1029            buffer,
1030            offsets,
1031            config: RowConfig {
1032                fields: Arc::clone(&self.fields),
1033                validate_utf8: true,
1034            },
1035        }
1036    }
1037
1038    /// Convert raw bytes into [`ArrayRef`]
1039    ///
1040    /// # Safety
1041    ///
1042    /// `rows` must contain valid data for this [`RowConverter`]
1043    unsafe fn convert_raw(
1044        &self,
1045        rows: &mut [&[u8]],
1046        validate_utf8: bool,
1047    ) -> Result<Vec<ArrayRef>, ArrowError> {
1048        self.fields
1049            .iter()
1050            .zip(&self.codecs)
1051            .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1052            .collect()
1053    }
1054
1055    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
1056    pub fn parser(&self) -> RowParser {
1057        RowParser::new(Arc::clone(&self.fields))
1058    }
1059
1060    /// Returns the size of this instance in bytes
1061    ///
1062    /// Includes the size of `Self`.
1063    pub fn size(&self) -> usize {
1064        std::mem::size_of::<Self>()
1065            + self.fields.iter().map(|x| x.size()).sum::<usize>()
1066            + self.codecs.capacity() * std::mem::size_of::<Codec>()
1067            + self.codecs.iter().map(Codec::size).sum::<usize>()
1068    }
1069}
1070
1071/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
1072#[derive(Debug)]
1073pub struct RowParser {
1074    config: RowConfig,
1075}
1076
1077impl RowParser {
1078    fn new(fields: Arc<[SortField]>) -> Self {
1079        Self {
1080            config: RowConfig {
1081                fields,
1082                validate_utf8: true,
1083            },
1084        }
1085    }
1086
1087    /// Creates a [`Row`] from the provided `bytes`.
1088    ///
1089    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
1090    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
1091    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1092        Row {
1093            data: bytes,
1094            config: &self.config,
1095        }
1096    }
1097}
1098
1099/// The config of a given set of [`Row`]
1100#[derive(Debug, Clone)]
1101struct RowConfig {
1102    /// The schema for these rows
1103    fields: Arc<[SortField]>,
1104    /// Whether to run UTF-8 validation when converting to arrow arrays
1105    validate_utf8: bool,
1106}
1107
1108/// A row-oriented representation of arrow data, that is normalized for comparison.
1109///
1110/// See the [module level documentation](self) and [`RowConverter`] for more details.
1111#[derive(Debug)]
1112pub struct Rows {
1113    /// Underlying row bytes
1114    buffer: Vec<u8>,
1115    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
1116    offsets: Vec<usize>,
1117    /// The config for these rows
1118    config: RowConfig,
1119}
1120
1121impl Rows {
1122    /// Append a [`Row`] to this [`Rows`]
1123    pub fn push(&mut self, row: Row<'_>) {
1124        assert!(
1125            Arc::ptr_eq(&row.config.fields, &self.config.fields),
1126            "row was not produced by this RowConverter"
1127        );
1128        self.config.validate_utf8 |= row.config.validate_utf8;
1129        self.buffer.extend_from_slice(row.data);
1130        self.offsets.push(self.buffer.len())
1131    }
1132
1133    /// Reserve capacity for `row_capacity` rows with a total length of `data_capacity`
1134    pub fn reserve(&mut self, row_capacity: usize, data_capacity: usize) {
1135        self.buffer.reserve(data_capacity);
1136        self.offsets.reserve(row_capacity);
1137    }
1138
1139    /// Returns the row at index `row`
1140    pub fn row(&self, row: usize) -> Row<'_> {
1141        assert!(row + 1 < self.offsets.len());
1142        unsafe { self.row_unchecked(row) }
1143    }
1144
1145    /// Returns the row at `index` without bounds checking
1146    ///
1147    /// # Safety
1148    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
1149    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1150        let end = unsafe { self.offsets.get_unchecked(index + 1) };
1151        let start = unsafe { self.offsets.get_unchecked(index) };
1152        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1153        Row {
1154            data,
1155            config: &self.config,
1156        }
1157    }
1158
1159    /// Sets the length of this [`Rows`] to 0
1160    pub fn clear(&mut self) {
1161        self.offsets.truncate(1);
1162        self.buffer.clear();
1163    }
1164
1165    /// Returns the number of [`Row`] in this [`Rows`]
1166    pub fn num_rows(&self) -> usize {
1167        self.offsets.len() - 1
1168    }
1169
1170    /// Returns an iterator over the [`Row`] in this [`Rows`]
1171    pub fn iter(&self) -> RowsIter<'_> {
1172        self.into_iter()
1173    }
1174
1175    /// Returns the size of this instance in bytes
1176    ///
1177    /// Includes the size of `Self`.
1178    pub fn size(&self) -> usize {
1179        // Size of fields is accounted for as part of RowConverter
1180        std::mem::size_of::<Self>()
1181            + self.buffer.capacity()
1182            + self.offsets.capacity() * std::mem::size_of::<usize>()
1183    }
1184
1185    /// Create a [BinaryArray] from the [Rows] data without reallocating the
1186    /// underlying bytes.
1187    ///
1188    ///
1189    /// ```
1190    /// # use std::sync::Arc;
1191    /// # use std::collections::HashSet;
1192    /// # use arrow_array::cast::AsArray;
1193    /// # use arrow_array::StringArray;
1194    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
1195    /// # use arrow_schema::DataType;
1196    /// #
1197    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1198    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1199    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1200    ///
1201    /// // We can convert rows into binary format and back.
1202    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1203    /// let binary = rows.try_into_binary().expect("known-small array");
1204    /// let parser = converter.parser();
1205    /// let parsed: Vec<OwnedRow> =
1206    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
1207    /// assert_eq!(values, parsed);
1208    /// ```
1209    ///
1210    /// # Errors
1211    ///
1212    /// This function will return an error if there is more data than can be stored in
1213    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
1214    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1215        if self.buffer.len() > i32::MAX as usize {
1216            return Err(ArrowError::InvalidArgumentError(format!(
1217                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1218                self.buffer.len()
1219            )));
1220        }
1221        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
1222        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1223        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
1224        let array = unsafe {
1225            BinaryArray::new_unchecked(
1226                OffsetBuffer::new_unchecked(offsets_scalar),
1227                Buffer::from_vec(self.buffer),
1228                None,
1229            )
1230        };
1231        Ok(array)
1232    }
1233}
1234
1235impl<'a> IntoIterator for &'a Rows {
1236    type Item = Row<'a>;
1237    type IntoIter = RowsIter<'a>;
1238
1239    fn into_iter(self) -> Self::IntoIter {
1240        RowsIter {
1241            rows: self,
1242            start: 0,
1243            end: self.num_rows(),
1244        }
1245    }
1246}
1247
1248/// An iterator over [`Rows`]
1249#[derive(Debug)]
1250pub struct RowsIter<'a> {
1251    rows: &'a Rows,
1252    start: usize,
1253    end: usize,
1254}
1255
1256impl<'a> Iterator for RowsIter<'a> {
1257    type Item = Row<'a>;
1258
1259    fn next(&mut self) -> Option<Self::Item> {
1260        if self.end == self.start {
1261            return None;
1262        }
1263
1264        // SAFETY: We have checked that `start` is less than `end`
1265        let row = unsafe { self.rows.row_unchecked(self.start) };
1266        self.start += 1;
1267        Some(row)
1268    }
1269
1270    fn size_hint(&self) -> (usize, Option<usize>) {
1271        let len = self.len();
1272        (len, Some(len))
1273    }
1274}
1275
1276impl ExactSizeIterator for RowsIter<'_> {
1277    fn len(&self) -> usize {
1278        self.end - self.start
1279    }
1280}
1281
1282impl DoubleEndedIterator for RowsIter<'_> {
1283    fn next_back(&mut self) -> Option<Self::Item> {
1284        if self.end == self.start {
1285            return None;
1286        }
1287        // Safety: We have checked that `start` is less than `end`
1288        let row = unsafe { self.rows.row_unchecked(self.end) };
1289        self.end -= 1;
1290        Some(row)
1291    }
1292}
1293
1294/// A comparable representation of a row.
1295///
1296/// See the [module level documentation](self) for more details.
1297///
1298/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1299/// returned by calls to [`RowConverter::convert_columns`] on the same
1300/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1301/// ordering established by comparing the [`Row`] is arbitrary.
1302#[derive(Debug, Copy, Clone)]
1303pub struct Row<'a> {
1304    data: &'a [u8],
1305    config: &'a RowConfig,
1306}
1307
1308impl<'a> Row<'a> {
1309    /// Create owned version of the row to detach it from the shared [`Rows`].
1310    pub fn owned(&self) -> OwnedRow {
1311        OwnedRow {
1312            data: self.data.into(),
1313            config: self.config.clone(),
1314        }
1315    }
1316
1317    /// The row's bytes, with the lifetime of the underlying data.
1318    pub fn data(&self) -> &'a [u8] {
1319        self.data
1320    }
1321}
1322
1323// Manually derive these as don't wish to include `fields`
1324
1325impl PartialEq for Row<'_> {
1326    #[inline]
1327    fn eq(&self, other: &Self) -> bool {
1328        self.data.eq(other.data)
1329    }
1330}
1331
1332impl Eq for Row<'_> {}
1333
1334impl PartialOrd for Row<'_> {
1335    #[inline]
1336    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1337        Some(self.cmp(other))
1338    }
1339}
1340
1341impl Ord for Row<'_> {
1342    #[inline]
1343    fn cmp(&self, other: &Self) -> Ordering {
1344        self.data.cmp(other.data)
1345    }
1346}
1347
1348impl Hash for Row<'_> {
1349    #[inline]
1350    fn hash<H: Hasher>(&self, state: &mut H) {
1351        self.data.hash(state)
1352    }
1353}
1354
1355impl AsRef<[u8]> for Row<'_> {
1356    #[inline]
1357    fn as_ref(&self) -> &[u8] {
1358        self.data
1359    }
1360}
1361
1362/// Owned version of a [`Row`] that can be moved/cloned freely.
1363///
1364/// This contains the data for the one specific row (not the entire buffer of all rows).
1365#[derive(Debug, Clone)]
1366pub struct OwnedRow {
1367    data: Box<[u8]>,
1368    config: RowConfig,
1369}
1370
1371impl OwnedRow {
1372    /// Get borrowed [`Row`] from owned version.
1373    ///
1374    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1375    pub fn row(&self) -> Row<'_> {
1376        Row {
1377            data: &self.data,
1378            config: &self.config,
1379        }
1380    }
1381}
1382
1383// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1384
1385impl PartialEq for OwnedRow {
1386    #[inline]
1387    fn eq(&self, other: &Self) -> bool {
1388        self.row().eq(&other.row())
1389    }
1390}
1391
1392impl Eq for OwnedRow {}
1393
1394impl PartialOrd for OwnedRow {
1395    #[inline]
1396    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1397        Some(self.cmp(other))
1398    }
1399}
1400
1401impl Ord for OwnedRow {
1402    #[inline]
1403    fn cmp(&self, other: &Self) -> Ordering {
1404        self.row().cmp(&other.row())
1405    }
1406}
1407
1408impl Hash for OwnedRow {
1409    #[inline]
1410    fn hash<H: Hasher>(&self, state: &mut H) {
1411        self.row().hash(state)
1412    }
1413}
1414
1415impl AsRef<[u8]> for OwnedRow {
1416    #[inline]
1417    fn as_ref(&self) -> &[u8] {
1418        &self.data
1419    }
1420}
1421
1422/// Returns the null sentinel, negated if `invert` is true
1423#[inline]
1424fn null_sentinel(options: SortOptions) -> u8 {
1425    match options.nulls_first {
1426        true => 0,
1427        false => 0xFF,
1428    }
1429}
1430
1431/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types.
1432enum LengthTracker {
1433    /// Fixed state: All rows have length `length`
1434    Fixed { length: usize, num_rows: usize },
1435    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
1436    Variable {
1437        fixed_length: usize,
1438        lengths: Vec<usize>,
1439    },
1440}
1441
1442impl LengthTracker {
1443    fn new(num_rows: usize) -> Self {
1444        Self::Fixed {
1445            length: 0,
1446            num_rows,
1447        }
1448    }
1449
1450    /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker
1451    fn push_fixed(&mut self, new_length: usize) {
1452        match self {
1453            LengthTracker::Fixed { length, .. } => *length += new_length,
1454            LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1455        }
1456    }
1457
1458    /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)`
1459    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1460        match self {
1461            LengthTracker::Fixed { length, .. } => {
1462                *self = LengthTracker::Variable {
1463                    fixed_length: *length,
1464                    lengths: new_lengths.collect(),
1465                }
1466            }
1467            LengthTracker::Variable { lengths, .. } => {
1468                assert_eq!(lengths.len(), new_lengths.len());
1469                lengths
1470                    .iter_mut()
1471                    .zip(new_lengths)
1472                    .for_each(|(length, new_length)| *length += new_length);
1473            }
1474        }
1475    }
1476
1477    /// Returns the tracked row lengths as a slice
1478    fn materialized(&mut self) -> &mut [usize] {
1479        if let LengthTracker::Fixed { length, num_rows } = *self {
1480            *self = LengthTracker::Variable {
1481                fixed_length: length,
1482                lengths: vec![0; num_rows],
1483            };
1484        }
1485
1486        match self {
1487            LengthTracker::Variable { lengths, .. } => lengths,
1488            LengthTracker::Fixed { .. } => unreachable!(),
1489        }
1490    }
1491
1492    /// Initializes the offsets using the tracked lengths. Returns the sum of the
1493    /// lengths of the rows added.
1494    ///
1495    /// We initialize the offsets shifted down by one row index.
1496    ///
1497    /// As the rows are appended to the offsets will be incremented to match
1498    ///
1499    /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
1500    /// The offsets would be initialized to `0, 0, 3, 7`
1501    ///
1502    /// Writing the first row entirely would yield `0, 3, 3, 7`
1503    /// The second, `0, 3, 7, 7`
1504    /// The third, `0, 3, 7, 13`
1505    //
1506    /// This would be the final offsets for reading
1507    //
1508    /// In this way offsets tracks the position during writing whilst eventually serving
1509    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1510        match self {
1511            LengthTracker::Fixed { length, num_rows } => {
1512                offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1513
1514                initial_offset + num_rows * length
1515            }
1516            LengthTracker::Variable {
1517                fixed_length,
1518                lengths,
1519            } => {
1520                let mut acc = initial_offset;
1521
1522                offsets.extend(lengths.iter().map(|length| {
1523                    let current = acc;
1524                    acc += length + fixed_length;
1525                    current
1526                }));
1527
1528                acc
1529            }
1530        }
1531    }
1532}
1533
1534/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1535fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1536    use fixed::FixedLengthEncoding;
1537
1538    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1539    let mut tracker = LengthTracker::new(num_rows);
1540
1541    for (array, encoder) in cols.iter().zip(encoders) {
1542        match encoder {
1543            Encoder::Stateless => {
1544                downcast_primitive_array! {
1545                    array => tracker.push_fixed(fixed::encoded_len(array)),
1546                    DataType::Null => {},
1547                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1548                    DataType::Binary => tracker.push_variable(
1549                        as_generic_binary_array::<i32>(array)
1550                            .iter()
1551                            .map(|slice| variable::encoded_len(slice))
1552                    ),
1553                    DataType::LargeBinary => tracker.push_variable(
1554                        as_generic_binary_array::<i64>(array)
1555                            .iter()
1556                            .map(|slice| variable::encoded_len(slice))
1557                    ),
1558                    DataType::BinaryView => push_byte_view_array_lengths(&mut tracker, array.as_binary_view()),
1559                    DataType::Utf8 => tracker.push_variable(
1560                        array.as_string::<i32>()
1561                            .iter()
1562                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1563                    ),
1564                    DataType::LargeUtf8 => tracker.push_variable(
1565                        array.as_string::<i64>()
1566                            .iter()
1567                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1568                    ),
1569                    DataType::Utf8View => push_byte_view_array_lengths(&mut tracker, array.as_string_view()),
1570                    DataType::FixedSizeBinary(len) => {
1571                        let len = len.to_usize().unwrap();
1572                        tracker.push_fixed(1 + len)
1573                    }
1574                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1575                }
1576            }
1577            Encoder::Dictionary(values, null) => {
1578                downcast_dictionary_array! {
1579                    array => {
1580                        tracker.push_variable(
1581                            array.keys().iter().map(|v| match v {
1582                                Some(k) => values.row(k.as_usize()).data.len(),
1583                                None => null.data.len(),
1584                            })
1585                        )
1586                    }
1587                    _ => unreachable!(),
1588                }
1589            }
1590            Encoder::Struct(rows, null) => {
1591                let array = as_struct_array(array);
1592                tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1593                    true => 1 + rows.row(idx).as_ref().len(),
1594                    false => 1 + null.data.len(),
1595                }));
1596            }
1597            Encoder::List(rows) => match array.data_type() {
1598                DataType::List(_) => {
1599                    list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1600                }
1601                DataType::LargeList(_) => {
1602                    list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1603                }
1604                DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1605                    &mut tracker,
1606                    rows,
1607                    as_fixed_size_list_array(array),
1608                ),
1609                _ => unreachable!(),
1610            },
1611            Encoder::RunEndEncoded(rows) => match array.data_type() {
1612                DataType::RunEndEncoded(r, _) => match r.data_type() {
1613                    DataType::Int16 => run::compute_lengths(
1614                        tracker.materialized(),
1615                        rows,
1616                        array.as_run::<Int16Type>(),
1617                    ),
1618                    DataType::Int32 => run::compute_lengths(
1619                        tracker.materialized(),
1620                        rows,
1621                        array.as_run::<Int32Type>(),
1622                    ),
1623                    DataType::Int64 => run::compute_lengths(
1624                        tracker.materialized(),
1625                        rows,
1626                        array.as_run::<Int64Type>(),
1627                    ),
1628                    _ => unreachable!("Unsupported run end index type: {r:?}"),
1629                },
1630                _ => unreachable!(),
1631            },
1632            Encoder::Union {
1633                child_rows,
1634                type_ids,
1635                offsets,
1636            } => {
1637                let union_array = array
1638                    .as_any()
1639                    .downcast_ref::<UnionArray>()
1640                    .expect("expected UnionArray");
1641
1642                let lengths = (0..union_array.len()).map(|i| {
1643                    let type_id = type_ids[i];
1644                    let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1645                    let child_row = child_rows[type_id as usize].row(child_row_i);
1646
1647                    // length: 1 byte type_id + child row bytes
1648                    1 + child_row.as_ref().len()
1649                });
1650
1651                tracker.push_variable(lengths);
1652            }
1653        }
1654    }
1655
1656    tracker
1657}
1658
1659/// Add to [`LengthTracker`] the encoded length of each item in the [`GenericByteViewArray`]
1660fn push_byte_view_array_lengths<T: ByteViewType>(
1661    tracker: &mut LengthTracker,
1662    array: &GenericByteViewArray<T>,
1663) {
1664    if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
1665        tracker.push_variable(
1666            array
1667                .lengths()
1668                .zip(nulls.iter())
1669                .map(|(length, is_valid)| {
1670                    if is_valid {
1671                        Some(length as usize)
1672                    } else {
1673                        None
1674                    }
1675                })
1676                .map(variable::padded_length),
1677        )
1678    } else {
1679        tracker.push_variable(
1680            array
1681                .lengths()
1682                .map(|len| variable::padded_length(Some(len as usize))),
1683        )
1684    }
1685}
1686
1687/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1688fn encode_column(
1689    data: &mut [u8],
1690    offsets: &mut [usize],
1691    column: &dyn Array,
1692    opts: SortOptions,
1693    encoder: &Encoder<'_>,
1694) {
1695    match encoder {
1696        Encoder::Stateless => {
1697            downcast_primitive_array! {
1698                column => {
1699                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1700                        fixed::encode(data, offsets, column.values(), nulls, opts)
1701                    } else {
1702                        fixed::encode_not_null(data, offsets, column.values(), opts)
1703                    }
1704                }
1705                DataType::Null => {}
1706                DataType::Boolean => {
1707                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1708                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1709                    } else {
1710                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1711                    }
1712                }
1713                DataType::Binary => {
1714                    variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
1715                }
1716                DataType::BinaryView => {
1717                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1718                }
1719                DataType::LargeBinary => {
1720                    variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
1721                }
1722                DataType::Utf8 => variable::encode_generic_byte_array(
1723                    data, offsets,
1724                    column.as_string::<i32>(),
1725                    opts,
1726                ),
1727                DataType::LargeUtf8 => variable::encode_generic_byte_array(
1728                    data, offsets,
1729                    column.as_string::<i64>(),
1730                    opts,
1731                ),
1732                DataType::Utf8View => variable::encode(
1733                    data, offsets,
1734                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1735                    opts,
1736                ),
1737                DataType::FixedSizeBinary(_) => {
1738                    let array = column.as_any().downcast_ref().unwrap();
1739                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1740                }
1741                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1742            }
1743        }
1744        Encoder::Dictionary(values, nulls) => {
1745            downcast_dictionary_array! {
1746                column => encode_dictionary_values(data, offsets, column, values, nulls),
1747                _ => unreachable!()
1748            }
1749        }
1750        Encoder::Struct(rows, null) => {
1751            let array = as_struct_array(column);
1752            let null_sentinel = null_sentinel(opts);
1753            offsets
1754                .iter_mut()
1755                .skip(1)
1756                .enumerate()
1757                .for_each(|(idx, offset)| {
1758                    let (row, sentinel) = match array.is_valid(idx) {
1759                        true => (rows.row(idx), 0x01),
1760                        false => (*null, null_sentinel),
1761                    };
1762                    let end_offset = *offset + 1 + row.as_ref().len();
1763                    data[*offset] = sentinel;
1764                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1765                    *offset = end_offset;
1766                })
1767        }
1768        Encoder::List(rows) => match column.data_type() {
1769            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1770            DataType::LargeList(_) => {
1771                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1772            }
1773            DataType::FixedSizeList(_, _) => {
1774                encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1775            }
1776            _ => unreachable!(),
1777        },
1778        Encoder::RunEndEncoded(rows) => match column.data_type() {
1779            DataType::RunEndEncoded(r, _) => match r.data_type() {
1780                DataType::Int16 => {
1781                    run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1782                }
1783                DataType::Int32 => {
1784                    run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1785                }
1786                DataType::Int64 => {
1787                    run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1788                }
1789                _ => unreachable!("Unsupported run end index type: {r:?}"),
1790            },
1791            _ => unreachable!(),
1792        },
1793        Encoder::Union {
1794            child_rows,
1795            type_ids,
1796            offsets: offsets_buf,
1797        } => {
1798            offsets
1799                .iter_mut()
1800                .skip(1)
1801                .enumerate()
1802                .for_each(|(i, offset)| {
1803                    let type_id = type_ids[i];
1804
1805                    let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1806                    let child_row = child_rows[type_id as usize].row(child_row_idx);
1807                    let child_bytes = child_row.as_ref();
1808
1809                    let type_id_byte = if opts.descending {
1810                        !(type_id as u8)
1811                    } else {
1812                        type_id as u8
1813                    };
1814                    data[*offset] = type_id_byte;
1815
1816                    let child_start = *offset + 1;
1817                    let child_end = child_start + child_bytes.len();
1818                    data[child_start..child_end].copy_from_slice(child_bytes);
1819
1820                    *offset = child_end;
1821                });
1822        }
1823    }
1824}
1825
1826/// Encode dictionary values not preserving the dictionary encoding
1827pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1828    data: &mut [u8],
1829    offsets: &mut [usize],
1830    column: &DictionaryArray<K>,
1831    values: &Rows,
1832    null: &Row<'_>,
1833) {
1834    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1835        let row = match k {
1836            Some(k) => values.row(k.as_usize()).data,
1837            None => null.data,
1838        };
1839        let end_offset = *offset + row.len();
1840        data[*offset..end_offset].copy_from_slice(row);
1841        *offset = end_offset;
1842    }
1843}
1844
1845macro_rules! decode_primitive_helper {
1846    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1847        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1848    };
1849}
1850
1851/// Decodes a the provided `field` from `rows`
1852///
1853/// # Safety
1854///
1855/// Rows must contain valid data for the provided field
1856unsafe fn decode_column(
1857    field: &SortField,
1858    rows: &mut [&[u8]],
1859    codec: &Codec,
1860    validate_utf8: bool,
1861) -> Result<ArrayRef, ArrowError> {
1862    let options = field.options;
1863
1864    let array: ArrayRef = match codec {
1865        Codec::Stateless => {
1866            let data_type = field.data_type.clone();
1867            downcast_primitive! {
1868                data_type => (decode_primitive_helper, rows, data_type, options),
1869                DataType::Null => Arc::new(NullArray::new(rows.len())),
1870                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1871                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1872                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1873                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1874                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1875                DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1876                DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1877                DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1878                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1879            }
1880        }
1881        Codec::Dictionary(converter, _) => {
1882            let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1883            cols.into_iter().next().unwrap()
1884        }
1885        Codec::Struct(converter, _) => {
1886            let (null_count, nulls) = fixed::decode_nulls(rows);
1887            rows.iter_mut().for_each(|row| *row = &row[1..]);
1888            let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1889
1890            let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1891            // Since RowConverter flattens certain data types (i.e. Dictionary),
1892            // we need to use updated data type instead of original field
1893            let corrected_fields: Vec<Field> = match &field.data_type {
1894                DataType::Struct(struct_fields) => struct_fields
1895                    .iter()
1896                    .zip(child_data.iter())
1897                    .map(|(orig_field, child_array)| {
1898                        orig_field
1899                            .as_ref()
1900                            .clone()
1901                            .with_data_type(child_array.data_type().clone())
1902                    })
1903                    .collect(),
1904                _ => unreachable!("Only Struct types should be corrected here"),
1905            };
1906            let corrected_struct_type = DataType::Struct(corrected_fields.into());
1907            let builder = ArrayDataBuilder::new(corrected_struct_type)
1908                .len(rows.len())
1909                .null_count(null_count)
1910                .null_bit_buffer(Some(nulls))
1911                .child_data(child_data);
1912
1913            Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1914        }
1915        Codec::List(converter) => match &field.data_type {
1916            DataType::List(_) => {
1917                Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1918            }
1919            DataType::LargeList(_) => {
1920                Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1921            }
1922            DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1923                list::decode_fixed_size_list(
1924                    converter,
1925                    rows,
1926                    field,
1927                    validate_utf8,
1928                    value_length.as_usize(),
1929                )
1930            }?),
1931            _ => unreachable!(),
1932        },
1933        Codec::RunEndEncoded(converter) => match &field.data_type {
1934            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1935                DataType::Int16 => Arc::new(unsafe {
1936                    run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1937                }?),
1938                DataType::Int32 => Arc::new(unsafe {
1939                    run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1940                }?),
1941                DataType::Int64 => Arc::new(unsafe {
1942                    run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1943                }?),
1944                _ => unreachable!(),
1945            },
1946            _ => unreachable!(),
1947        },
1948        Codec::Union(converters, null_rows) => {
1949            let len = rows.len();
1950
1951            let DataType::Union(union_fields, mode) = &field.data_type else {
1952                unreachable!()
1953            };
1954
1955            let mut type_ids = Vec::with_capacity(len);
1956            let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
1957
1958            for (idx, row) in rows.iter_mut().enumerate() {
1959                let type_id_byte = {
1960                    let id = row[0];
1961                    if options.descending { !id } else { id }
1962                };
1963
1964                let type_id = type_id_byte as i8;
1965                type_ids.push(type_id);
1966
1967                let field_idx = type_id as usize;
1968
1969                let child_row = &row[1..];
1970                rows_by_field[field_idx].push((idx, child_row));
1971            }
1972
1973            let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
1974            let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
1975
1976            for (field_idx, converter) in converters.iter().enumerate() {
1977                let field_rows = &rows_by_field[field_idx];
1978
1979                match &mode {
1980                    UnionMode::Dense => {
1981                        if field_rows.is_empty() {
1982                            let (_, field) = union_fields.iter().nth(field_idx).unwrap();
1983                            child_arrays.push(arrow_array::new_empty_array(field.data_type()));
1984                            continue;
1985                        }
1986
1987                        let mut child_data = field_rows
1988                            .iter()
1989                            .map(|(_, bytes)| *bytes)
1990                            .collect::<Vec<_>>();
1991
1992                        let child_array =
1993                            unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
1994
1995                        // advance row slices by the bytes consumed
1996                        for ((row_idx, original_bytes), remaining_bytes) in
1997                            field_rows.iter().zip(child_data)
1998                        {
1999                            let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
2000                            rows[*row_idx] = &rows[*row_idx][consumed_length..];
2001                        }
2002
2003                        child_arrays.push(child_array.into_iter().next().unwrap());
2004                    }
2005                    UnionMode::Sparse => {
2006                        let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
2007                        let mut field_row_iter = field_rows.iter().peekable();
2008                        let null_row_bytes: &[u8] = &null_rows[field_idx].data;
2009
2010                        for idx in 0..len {
2011                            if let Some((next_idx, bytes)) = field_row_iter.peek() {
2012                                if *next_idx == idx {
2013                                    sparse_data.push(*bytes);
2014
2015                                    field_row_iter.next();
2016                                    continue;
2017                                }
2018                            }
2019                            sparse_data.push(null_row_bytes);
2020                        }
2021
2022                        let child_array =
2023                            unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
2024
2025                        // advance row slices by the bytes consumed for rows that belong to this field
2026                        for (row_idx, child_row) in field_rows.iter() {
2027                            let remaining_len = sparse_data[*row_idx].len();
2028                            let consumed_length = 1 + child_row.len() - remaining_len;
2029                            rows[*row_idx] = &rows[*row_idx][consumed_length..];
2030                        }
2031
2032                        child_arrays.push(child_array.into_iter().next().unwrap());
2033                    }
2034                }
2035            }
2036
2037            // build offsets for dense unions
2038            if let Some(ref mut offsets_vec) = offsets {
2039                let mut count = vec![0i32; converters.len()];
2040                for type_id in &type_ids {
2041                    let field_idx = *type_id as usize;
2042                    offsets_vec.push(count[field_idx]);
2043
2044                    count[field_idx] += 1;
2045                }
2046            }
2047
2048            let type_ids_buffer = ScalarBuffer::from(type_ids);
2049            let offsets_buffer = offsets.map(ScalarBuffer::from);
2050
2051            let union_array = UnionArray::try_new(
2052                union_fields.clone(),
2053                type_ids_buffer,
2054                offsets_buffer,
2055                child_arrays,
2056            )?;
2057
2058            // note: union arrays don't support physical null buffers
2059            // nulls are represented logically though child arrays
2060            Arc::new(union_array)
2061        }
2062    };
2063    Ok(array)
2064}
2065
2066#[cfg(test)]
2067mod tests {
2068    use rand::distr::uniform::SampleUniform;
2069    use rand::distr::{Distribution, StandardUniform};
2070    use rand::{Rng, rng};
2071
2072    use arrow_array::builder::*;
2073    use arrow_array::types::*;
2074    use arrow_array::*;
2075    use arrow_buffer::{Buffer, OffsetBuffer};
2076    use arrow_buffer::{NullBuffer, i256};
2077    use arrow_cast::display::{ArrayFormatter, FormatOptions};
2078    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2079
2080    use super::*;
2081
2082    #[test]
2083    fn test_fixed_width() {
2084        let cols = [
2085            Arc::new(Int16Array::from_iter([
2086                Some(1),
2087                Some(2),
2088                None,
2089                Some(-5),
2090                Some(2),
2091                Some(2),
2092                Some(0),
2093            ])) as ArrayRef,
2094            Arc::new(Float32Array::from_iter([
2095                Some(1.3),
2096                Some(2.5),
2097                None,
2098                Some(4.),
2099                Some(0.1),
2100                Some(-4.),
2101                Some(-0.),
2102            ])) as ArrayRef,
2103        ];
2104
2105        let converter = RowConverter::new(vec![
2106            SortField::new(DataType::Int16),
2107            SortField::new(DataType::Float32),
2108        ])
2109        .unwrap();
2110        let rows = converter.convert_columns(&cols).unwrap();
2111
2112        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2113        assert_eq!(
2114            rows.buffer,
2115            &[
2116                1, 128, 1, //
2117                1, 191, 166, 102, 102, //
2118                1, 128, 2, //
2119                1, 192, 32, 0, 0, //
2120                0, 0, 0, //
2121                0, 0, 0, 0, 0, //
2122                1, 127, 251, //
2123                1, 192, 128, 0, 0, //
2124                1, 128, 2, //
2125                1, 189, 204, 204, 205, //
2126                1, 128, 2, //
2127                1, 63, 127, 255, 255, //
2128                1, 128, 0, //
2129                1, 127, 255, 255, 255 //
2130            ]
2131        );
2132
2133        assert!(rows.row(3) < rows.row(6));
2134        assert!(rows.row(0) < rows.row(1));
2135        assert!(rows.row(3) < rows.row(0));
2136        assert!(rows.row(4) < rows.row(1));
2137        assert!(rows.row(5) < rows.row(4));
2138
2139        let back = converter.convert_rows(&rows).unwrap();
2140        for (expected, actual) in cols.iter().zip(&back) {
2141            assert_eq!(expected, actual);
2142        }
2143    }
2144
2145    #[test]
2146    fn test_decimal32() {
2147        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2148            DECIMAL32_MAX_PRECISION,
2149            7,
2150        ))])
2151        .unwrap();
2152        let col = Arc::new(
2153            Decimal32Array::from_iter([
2154                None,
2155                Some(i32::MIN),
2156                Some(-13),
2157                Some(46_i32),
2158                Some(5456_i32),
2159                Some(i32::MAX),
2160            ])
2161            .with_precision_and_scale(9, 7)
2162            .unwrap(),
2163        ) as ArrayRef;
2164
2165        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2166        for i in 0..rows.num_rows() - 1 {
2167            assert!(rows.row(i) < rows.row(i + 1));
2168        }
2169
2170        let back = converter.convert_rows(&rows).unwrap();
2171        assert_eq!(back.len(), 1);
2172        assert_eq!(col.as_ref(), back[0].as_ref())
2173    }
2174
2175    #[test]
2176    fn test_decimal64() {
2177        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2178            DECIMAL64_MAX_PRECISION,
2179            7,
2180        ))])
2181        .unwrap();
2182        let col = Arc::new(
2183            Decimal64Array::from_iter([
2184                None,
2185                Some(i64::MIN),
2186                Some(-13),
2187                Some(46_i64),
2188                Some(5456_i64),
2189                Some(i64::MAX),
2190            ])
2191            .with_precision_and_scale(18, 7)
2192            .unwrap(),
2193        ) as ArrayRef;
2194
2195        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2196        for i in 0..rows.num_rows() - 1 {
2197            assert!(rows.row(i) < rows.row(i + 1));
2198        }
2199
2200        let back = converter.convert_rows(&rows).unwrap();
2201        assert_eq!(back.len(), 1);
2202        assert_eq!(col.as_ref(), back[0].as_ref())
2203    }
2204
2205    #[test]
2206    fn test_decimal128() {
2207        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2208            DECIMAL128_MAX_PRECISION,
2209            7,
2210        ))])
2211        .unwrap();
2212        let col = Arc::new(
2213            Decimal128Array::from_iter([
2214                None,
2215                Some(i128::MIN),
2216                Some(-13),
2217                Some(46_i128),
2218                Some(5456_i128),
2219                Some(i128::MAX),
2220            ])
2221            .with_precision_and_scale(38, 7)
2222            .unwrap(),
2223        ) as ArrayRef;
2224
2225        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2226        for i in 0..rows.num_rows() - 1 {
2227            assert!(rows.row(i) < rows.row(i + 1));
2228        }
2229
2230        let back = converter.convert_rows(&rows).unwrap();
2231        assert_eq!(back.len(), 1);
2232        assert_eq!(col.as_ref(), back[0].as_ref())
2233    }
2234
2235    #[test]
2236    fn test_decimal256() {
2237        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2238            DECIMAL256_MAX_PRECISION,
2239            7,
2240        ))])
2241        .unwrap();
2242        let col = Arc::new(
2243            Decimal256Array::from_iter([
2244                None,
2245                Some(i256::MIN),
2246                Some(i256::from_parts(0, -1)),
2247                Some(i256::from_parts(u128::MAX, -1)),
2248                Some(i256::from_parts(u128::MAX, 0)),
2249                Some(i256::from_parts(0, 46_i128)),
2250                Some(i256::from_parts(5, 46_i128)),
2251                Some(i256::MAX),
2252            ])
2253            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2254            .unwrap(),
2255        ) as ArrayRef;
2256
2257        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2258        for i in 0..rows.num_rows() - 1 {
2259            assert!(rows.row(i) < rows.row(i + 1));
2260        }
2261
2262        let back = converter.convert_rows(&rows).unwrap();
2263        assert_eq!(back.len(), 1);
2264        assert_eq!(col.as_ref(), back[0].as_ref())
2265    }
2266
2267    #[test]
2268    fn test_bool() {
2269        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2270
2271        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2272
2273        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2274        assert!(rows.row(2) > rows.row(1));
2275        assert!(rows.row(2) > rows.row(0));
2276        assert!(rows.row(1) > rows.row(0));
2277
2278        let cols = converter.convert_rows(&rows).unwrap();
2279        assert_eq!(&cols[0], &col);
2280
2281        let converter = RowConverter::new(vec![SortField::new_with_options(
2282            DataType::Boolean,
2283            SortOptions::default().desc().with_nulls_first(false),
2284        )])
2285        .unwrap();
2286
2287        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2288        assert!(rows.row(2) < rows.row(1));
2289        assert!(rows.row(2) < rows.row(0));
2290        assert!(rows.row(1) < rows.row(0));
2291        let cols = converter.convert_rows(&rows).unwrap();
2292        assert_eq!(&cols[0], &col);
2293    }
2294
2295    #[test]
2296    fn test_timezone() {
2297        let a =
2298            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2299        let d = a.data_type().clone();
2300
2301        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2302        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2303        let back = converter.convert_rows(&rows).unwrap();
2304        assert_eq!(back.len(), 1);
2305        assert_eq!(back[0].data_type(), &d);
2306
2307        // Test dictionary
2308        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2309        a.append(34).unwrap();
2310        a.append_null();
2311        a.append(345).unwrap();
2312
2313        // Construct dictionary with a timezone
2314        let dict = a.finish();
2315        let values = TimestampNanosecondArray::from(dict.values().to_data());
2316        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2317        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2318        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2319
2320        assert_eq!(dict_with_tz.data_type(), &d);
2321        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2322        let rows = converter
2323            .convert_columns(&[Arc::new(dict_with_tz) as _])
2324            .unwrap();
2325        let back = converter.convert_rows(&rows).unwrap();
2326        assert_eq!(back.len(), 1);
2327        assert_eq!(back[0].data_type(), &v);
2328    }
2329
2330    #[test]
2331    fn test_null_encoding() {
2332        let col = Arc::new(NullArray::new(10));
2333        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2334        let rows = converter.convert_columns(&[col]).unwrap();
2335        assert_eq!(rows.num_rows(), 10);
2336        assert_eq!(rows.row(1).data.len(), 0);
2337    }
2338
2339    #[test]
2340    fn test_variable_width() {
2341        let col = Arc::new(StringArray::from_iter([
2342            Some("hello"),
2343            Some("he"),
2344            None,
2345            Some("foo"),
2346            Some(""),
2347        ])) as ArrayRef;
2348
2349        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2350        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2351
2352        assert!(rows.row(1) < rows.row(0));
2353        assert!(rows.row(2) < rows.row(4));
2354        assert!(rows.row(3) < rows.row(0));
2355        assert!(rows.row(3) < rows.row(1));
2356
2357        let cols = converter.convert_rows(&rows).unwrap();
2358        assert_eq!(&cols[0], &col);
2359
2360        let col = Arc::new(BinaryArray::from_iter([
2361            None,
2362            Some(vec![0_u8; 0]),
2363            Some(vec![0_u8; 6]),
2364            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2365            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2366            Some(vec![0_u8; variable::BLOCK_SIZE]),
2367            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2368            Some(vec![1_u8; 6]),
2369            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2370            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2371            Some(vec![1_u8; variable::BLOCK_SIZE]),
2372            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2373            Some(vec![0xFF_u8; 6]),
2374            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2375            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2376            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2377            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2378        ])) as ArrayRef;
2379
2380        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2381        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2382
2383        for i in 0..rows.num_rows() {
2384            for j in i + 1..rows.num_rows() {
2385                assert!(
2386                    rows.row(i) < rows.row(j),
2387                    "{} < {} - {:?} < {:?}",
2388                    i,
2389                    j,
2390                    rows.row(i),
2391                    rows.row(j)
2392                );
2393            }
2394        }
2395
2396        let cols = converter.convert_rows(&rows).unwrap();
2397        assert_eq!(&cols[0], &col);
2398
2399        let converter = RowConverter::new(vec![SortField::new_with_options(
2400            DataType::Binary,
2401            SortOptions::default().desc().with_nulls_first(false),
2402        )])
2403        .unwrap();
2404        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2405
2406        for i in 0..rows.num_rows() {
2407            for j in i + 1..rows.num_rows() {
2408                assert!(
2409                    rows.row(i) > rows.row(j),
2410                    "{} > {} - {:?} > {:?}",
2411                    i,
2412                    j,
2413                    rows.row(i),
2414                    rows.row(j)
2415                );
2416            }
2417        }
2418
2419        let cols = converter.convert_rows(&rows).unwrap();
2420        assert_eq!(&cols[0], &col);
2421    }
2422
2423    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
2424    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2425        match b.data_type() {
2426            DataType::Dictionary(_, v) => {
2427                assert_eq!(a.data_type(), v.as_ref());
2428                let b = arrow_cast::cast(b, v).unwrap();
2429                assert_eq!(a, b.as_ref())
2430            }
2431            _ => assert_eq!(a, b),
2432        }
2433    }
2434
2435    #[test]
2436    fn test_string_dictionary() {
2437        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2438            Some("foo"),
2439            Some("hello"),
2440            Some("he"),
2441            None,
2442            Some("hello"),
2443            Some(""),
2444            Some("hello"),
2445            Some("hello"),
2446        ])) as ArrayRef;
2447
2448        let field = SortField::new(a.data_type().clone());
2449        let converter = RowConverter::new(vec![field]).unwrap();
2450        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2451
2452        assert!(rows_a.row(3) < rows_a.row(5));
2453        assert!(rows_a.row(2) < rows_a.row(1));
2454        assert!(rows_a.row(0) < rows_a.row(1));
2455        assert!(rows_a.row(3) < rows_a.row(0));
2456
2457        assert_eq!(rows_a.row(1), rows_a.row(4));
2458        assert_eq!(rows_a.row(1), rows_a.row(6));
2459        assert_eq!(rows_a.row(1), rows_a.row(7));
2460
2461        let cols = converter.convert_rows(&rows_a).unwrap();
2462        dictionary_eq(&cols[0], &a);
2463
2464        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2465            Some("hello"),
2466            None,
2467            Some("cupcakes"),
2468        ])) as ArrayRef;
2469
2470        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2471        assert_eq!(rows_a.row(1), rows_b.row(0));
2472        assert_eq!(rows_a.row(3), rows_b.row(1));
2473        assert!(rows_b.row(2) < rows_a.row(0));
2474
2475        let cols = converter.convert_rows(&rows_b).unwrap();
2476        dictionary_eq(&cols[0], &b);
2477
2478        let converter = RowConverter::new(vec![SortField::new_with_options(
2479            a.data_type().clone(),
2480            SortOptions::default().desc().with_nulls_first(false),
2481        )])
2482        .unwrap();
2483
2484        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2485        assert!(rows_c.row(3) > rows_c.row(5));
2486        assert!(rows_c.row(2) > rows_c.row(1));
2487        assert!(rows_c.row(0) > rows_c.row(1));
2488        assert!(rows_c.row(3) > rows_c.row(0));
2489
2490        let cols = converter.convert_rows(&rows_c).unwrap();
2491        dictionary_eq(&cols[0], &a);
2492
2493        let converter = RowConverter::new(vec![SortField::new_with_options(
2494            a.data_type().clone(),
2495            SortOptions::default().desc().with_nulls_first(true),
2496        )])
2497        .unwrap();
2498
2499        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2500        assert!(rows_c.row(3) < rows_c.row(5));
2501        assert!(rows_c.row(2) > rows_c.row(1));
2502        assert!(rows_c.row(0) > rows_c.row(1));
2503        assert!(rows_c.row(3) < rows_c.row(0));
2504
2505        let cols = converter.convert_rows(&rows_c).unwrap();
2506        dictionary_eq(&cols[0], &a);
2507    }
2508
2509    #[test]
2510    fn test_struct() {
2511        // Test basic
2512        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2513        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2514        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2515        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2516        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2517
2518        let sort_fields = vec![SortField::new(s1.data_type().clone())];
2519        let converter = RowConverter::new(sort_fields).unwrap();
2520        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2521
2522        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2523            assert!(a < b);
2524        }
2525
2526        let back = converter.convert_rows(&r1).unwrap();
2527        assert_eq!(back.len(), 1);
2528        assert_eq!(&back[0], &s1);
2529
2530        // Test struct nullability
2531        let data = s1
2532            .to_data()
2533            .into_builder()
2534            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2535            .null_count(2)
2536            .build()
2537            .unwrap();
2538
2539        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2540        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2541        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
2542        assert!(r2.row(0) < r2.row(1)); // Nulls first
2543        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
2544        assert_eq!(r1.row(1), r2.row(1)); // Values equal
2545
2546        let back = converter.convert_rows(&r2).unwrap();
2547        assert_eq!(back.len(), 1);
2548        assert_eq!(&back[0], &s2);
2549
2550        back[0].to_data().validate_full().unwrap();
2551    }
2552
2553    #[test]
2554    fn test_dictionary_in_struct() {
2555        let builder = StringDictionaryBuilder::<Int32Type>::new();
2556        let mut struct_builder = StructBuilder::new(
2557            vec![Field::new_dictionary(
2558                "foo",
2559                DataType::Int32,
2560                DataType::Utf8,
2561                true,
2562            )],
2563            vec![Box::new(builder)],
2564        );
2565
2566        let dict_builder = struct_builder
2567            .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2568            .unwrap();
2569
2570        // Flattened: ["a", null, "a", "b"]
2571        dict_builder.append_value("a");
2572        dict_builder.append_null();
2573        dict_builder.append_value("a");
2574        dict_builder.append_value("b");
2575
2576        for _ in 0..4 {
2577            struct_builder.append(true);
2578        }
2579
2580        let s = Arc::new(struct_builder.finish()) as ArrayRef;
2581        let sort_fields = vec![SortField::new(s.data_type().clone())];
2582        let converter = RowConverter::new(sort_fields).unwrap();
2583        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2584
2585        let back = converter.convert_rows(&r).unwrap();
2586        let [s2] = back.try_into().unwrap();
2587
2588        // RowConverter flattens Dictionary
2589        // s.ty = Struct("foo": Dictionary(Int32, Utf8)), s2.ty = Struct("foo": Utf8)
2590        assert_ne!(&s.data_type(), &s2.data_type());
2591        s2.to_data().validate_full().unwrap();
2592
2593        // Check if the logical data remains the same
2594        // Keys: [0, null, 0, 1]
2595        // Values: ["a", "b"]
2596        let s1_struct = s.as_struct();
2597        let s1_0 = s1_struct.column(0);
2598        let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2599        let keys = s1_idx_0.keys();
2600        let values = s1_idx_0.values().as_string::<i32>();
2601        // Flattened: ["a", null, "a", "b"]
2602        let s2_struct = s2.as_struct();
2603        let s2_0 = s2_struct.column(0);
2604        let s2_idx_0 = s2_0.as_string::<i32>();
2605
2606        for i in 0..keys.len() {
2607            if keys.is_null(i) {
2608                assert!(s2_idx_0.is_null(i));
2609            } else {
2610                let dict_index = keys.value(i) as usize;
2611                assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2612            }
2613        }
2614    }
2615
2616    #[test]
2617    fn test_dictionary_in_struct_empty() {
2618        let ty = DataType::Struct(
2619            vec![Field::new_dictionary(
2620                "foo",
2621                DataType::Int32,
2622                DataType::Int32,
2623                false,
2624            )]
2625            .into(),
2626        );
2627        let s = arrow_array::new_empty_array(&ty);
2628
2629        let sort_fields = vec![SortField::new(s.data_type().clone())];
2630        let converter = RowConverter::new(sort_fields).unwrap();
2631        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2632
2633        let back = converter.convert_rows(&r).unwrap();
2634        let [s2] = back.try_into().unwrap();
2635
2636        // RowConverter flattens Dictionary
2637        // s.ty = Struct("foo": Dictionary(Int32, Int32)), s2.ty = Struct("foo": Int32)
2638        assert_ne!(&s.data_type(), &s2.data_type());
2639        s2.to_data().validate_full().unwrap();
2640        assert_eq!(s.len(), 0);
2641        assert_eq!(s2.len(), 0);
2642    }
2643
2644    #[test]
2645    fn test_list_of_string_dictionary() {
2646        let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2647        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2648        builder.values().append("a").unwrap();
2649        builder.values().append("b").unwrap();
2650        builder.values().append("zero").unwrap();
2651        builder.values().append_null();
2652        builder.values().append("c").unwrap();
2653        builder.values().append("b").unwrap();
2654        builder.values().append("d").unwrap();
2655        builder.append(true);
2656        // List[1] = null
2657        builder.append(false);
2658        // List[2] = ["e", "zero", "a" (dict)]
2659        builder.values().append("e").unwrap();
2660        builder.values().append("zero").unwrap();
2661        builder.values().append("a").unwrap();
2662        builder.append(true);
2663
2664        let a = Arc::new(builder.finish()) as ArrayRef;
2665        let data_type = a.data_type().clone();
2666
2667        let field = SortField::new(data_type.clone());
2668        let converter = RowConverter::new(vec![field]).unwrap();
2669        let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2670
2671        let back = converter.convert_rows(&rows).unwrap();
2672        assert_eq!(back.len(), 1);
2673        let [a2] = back.try_into().unwrap();
2674
2675        // RowConverter flattens Dictionary
2676        // a.ty: List(Dictionary(Int32, Utf8)), a2.ty: List(Utf8)
2677        assert_ne!(&a.data_type(), &a2.data_type());
2678
2679        a2.to_data().validate_full().unwrap();
2680
2681        let a2_list = a2.as_list::<i32>();
2682        let a1_list = a.as_list::<i32>();
2683
2684        // Check if the logical data remains the same
2685        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2686        let a1_0 = a1_list.value(0);
2687        let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2688        let keys = a1_idx_0.keys();
2689        let values = a1_idx_0.values().as_string::<i32>();
2690        let a2_0 = a2_list.value(0);
2691        let a2_idx_0 = a2_0.as_string::<i32>();
2692
2693        for i in 0..keys.len() {
2694            if keys.is_null(i) {
2695                assert!(a2_idx_0.is_null(i));
2696            } else {
2697                let dict_index = keys.value(i) as usize;
2698                assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2699            }
2700        }
2701
2702        // List[1] = null
2703        assert!(a1_list.is_null(1));
2704        assert!(a2_list.is_null(1));
2705
2706        // List[2] = ["e", "zero", "a" (dict)]
2707        let a1_2 = a1_list.value(2);
2708        let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2709        let keys = a1_idx_2.keys();
2710        let values = a1_idx_2.values().as_string::<i32>();
2711        let a2_2 = a2_list.value(2);
2712        let a2_idx_2 = a2_2.as_string::<i32>();
2713
2714        for i in 0..keys.len() {
2715            if keys.is_null(i) {
2716                assert!(a2_idx_2.is_null(i));
2717            } else {
2718                let dict_index = keys.value(i) as usize;
2719                assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2720            }
2721        }
2722    }
2723
2724    #[test]
2725    fn test_primitive_dictionary() {
2726        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2727        builder.append(2).unwrap();
2728        builder.append(3).unwrap();
2729        builder.append(0).unwrap();
2730        builder.append_null();
2731        builder.append(5).unwrap();
2732        builder.append(3).unwrap();
2733        builder.append(-1).unwrap();
2734
2735        let a = builder.finish();
2736        let data_type = a.data_type().clone();
2737        let columns = [Arc::new(a) as ArrayRef];
2738
2739        let field = SortField::new(data_type.clone());
2740        let converter = RowConverter::new(vec![field]).unwrap();
2741        let rows = converter.convert_columns(&columns).unwrap();
2742        assert!(rows.row(0) < rows.row(1));
2743        assert!(rows.row(2) < rows.row(0));
2744        assert!(rows.row(3) < rows.row(2));
2745        assert!(rows.row(6) < rows.row(2));
2746        assert!(rows.row(3) < rows.row(6));
2747
2748        let back = converter.convert_rows(&rows).unwrap();
2749        assert_eq!(back.len(), 1);
2750        back[0].to_data().validate_full().unwrap();
2751    }
2752
2753    #[test]
2754    fn test_dictionary_nulls() {
2755        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2756        let keys =
2757            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2758
2759        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2760        let data = keys
2761            .into_builder()
2762            .data_type(data_type.clone())
2763            .child_data(vec![values])
2764            .build()
2765            .unwrap();
2766
2767        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2768        let field = SortField::new(data_type.clone());
2769        let converter = RowConverter::new(vec![field]).unwrap();
2770        let rows = converter.convert_columns(&columns).unwrap();
2771
2772        assert_eq!(rows.row(0), rows.row(1));
2773        assert_eq!(rows.row(3), rows.row(4));
2774        assert_eq!(rows.row(4), rows.row(5));
2775        assert!(rows.row(3) < rows.row(0));
2776    }
2777
2778    #[test]
2779    fn test_from_binary_shared_buffer() {
2780        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2781        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2782        let rows = converter.convert_columns(&[array]).unwrap();
2783        let binary_rows = rows.try_into_binary().expect("known-small rows");
2784        let _binary_rows_shared_buffer = binary_rows.clone();
2785
2786        let parsed = converter.from_binary(binary_rows);
2787
2788        converter.convert_rows(parsed.iter()).unwrap();
2789    }
2790
2791    #[test]
2792    #[should_panic(expected = "Encountered non UTF-8 data")]
2793    fn test_invalid_utf8() {
2794        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2795        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2796        let rows = converter.convert_columns(&[array]).unwrap();
2797        let binary_row = rows.row(0);
2798
2799        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2800        let parser = converter.parser();
2801        let utf8_row = parser.parse(binary_row.as_ref());
2802
2803        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2804    }
2805
2806    #[test]
2807    #[should_panic(expected = "Encountered non UTF-8 data")]
2808    fn test_invalid_utf8_array() {
2809        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2810        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2811        let rows = converter.convert_columns(&[array]).unwrap();
2812        let binary_rows = rows.try_into_binary().expect("known-small rows");
2813
2814        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2815        let parsed = converter.from_binary(binary_rows);
2816
2817        converter.convert_rows(parsed.iter()).unwrap();
2818    }
2819
2820    #[test]
2821    #[should_panic(expected = "index out of bounds")]
2822    fn test_invalid_empty() {
2823        let binary_row: &[u8] = &[];
2824
2825        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2826        let parser = converter.parser();
2827        let utf8_row = parser.parse(binary_row.as_ref());
2828
2829        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2830    }
2831
2832    #[test]
2833    #[should_panic(expected = "index out of bounds")]
2834    fn test_invalid_empty_array() {
2835        let row: &[u8] = &[];
2836        let binary_rows = BinaryArray::from(vec![row]);
2837
2838        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2839        let parsed = converter.from_binary(binary_rows);
2840
2841        converter.convert_rows(parsed.iter()).unwrap();
2842    }
2843
2844    #[test]
2845    #[should_panic(expected = "index out of bounds")]
2846    fn test_invalid_truncated() {
2847        let binary_row: &[u8] = &[0x02];
2848
2849        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2850        let parser = converter.parser();
2851        let utf8_row = parser.parse(binary_row.as_ref());
2852
2853        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2854    }
2855
2856    #[test]
2857    #[should_panic(expected = "index out of bounds")]
2858    fn test_invalid_truncated_array() {
2859        let row: &[u8] = &[0x02];
2860        let binary_rows = BinaryArray::from(vec![row]);
2861
2862        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2863        let parsed = converter.from_binary(binary_rows);
2864
2865        converter.convert_rows(parsed.iter()).unwrap();
2866    }
2867
2868    #[test]
2869    #[should_panic(expected = "rows were not produced by this RowConverter")]
2870    fn test_different_converter() {
2871        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2872        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2873        let rows = converter.convert_columns(&[values]).unwrap();
2874
2875        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2876        let _ = converter.convert_rows(&rows);
2877    }
2878
2879    fn test_single_list<O: OffsetSizeTrait>() {
2880        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2881        builder.values().append_value(32);
2882        builder.values().append_value(52);
2883        builder.values().append_value(32);
2884        builder.append(true);
2885        builder.values().append_value(32);
2886        builder.values().append_value(52);
2887        builder.values().append_value(12);
2888        builder.append(true);
2889        builder.values().append_value(32);
2890        builder.values().append_value(52);
2891        builder.append(true);
2892        builder.values().append_value(32); // MASKED
2893        builder.values().append_value(52); // MASKED
2894        builder.append(false);
2895        builder.values().append_value(32);
2896        builder.values().append_null();
2897        builder.append(true);
2898        builder.append(true);
2899        builder.values().append_value(17); // MASKED
2900        builder.values().append_null(); // MASKED
2901        builder.append(false);
2902
2903        let list = Arc::new(builder.finish()) as ArrayRef;
2904        let d = list.data_type().clone();
2905
2906        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2907
2908        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2909        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2910        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2911        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2912        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2913        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2914        assert!(rows.row(3) < rows.row(5)); // null < []
2915        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2916
2917        let back = converter.convert_rows(&rows).unwrap();
2918        assert_eq!(back.len(), 1);
2919        back[0].to_data().validate_full().unwrap();
2920        assert_eq!(&back[0], &list);
2921
2922        let options = SortOptions::default().asc().with_nulls_first(false);
2923        let field = SortField::new_with_options(d.clone(), options);
2924        let converter = RowConverter::new(vec![field]).unwrap();
2925        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2926
2927        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2928        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2929        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2930        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2931        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2932        assert!(rows.row(3) > rows.row(5)); // null > []
2933        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2934
2935        let back = converter.convert_rows(&rows).unwrap();
2936        assert_eq!(back.len(), 1);
2937        back[0].to_data().validate_full().unwrap();
2938        assert_eq!(&back[0], &list);
2939
2940        let options = SortOptions::default().desc().with_nulls_first(false);
2941        let field = SortField::new_with_options(d.clone(), options);
2942        let converter = RowConverter::new(vec![field]).unwrap();
2943        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2944
2945        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2946        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2947        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2948        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2949        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2950        assert!(rows.row(3) > rows.row(5)); // null > []
2951        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2952
2953        let back = converter.convert_rows(&rows).unwrap();
2954        assert_eq!(back.len(), 1);
2955        back[0].to_data().validate_full().unwrap();
2956        assert_eq!(&back[0], &list);
2957
2958        let options = SortOptions::default().desc().with_nulls_first(true);
2959        let field = SortField::new_with_options(d, options);
2960        let converter = RowConverter::new(vec![field]).unwrap();
2961        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2962
2963        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2964        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2965        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2966        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2967        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2968        assert!(rows.row(3) < rows.row(5)); // null < []
2969        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2970
2971        let back = converter.convert_rows(&rows).unwrap();
2972        assert_eq!(back.len(), 1);
2973        back[0].to_data().validate_full().unwrap();
2974        assert_eq!(&back[0], &list);
2975
2976        let sliced_list = list.slice(1, 5);
2977        let rows_on_sliced_list = converter
2978            .convert_columns(&[Arc::clone(&sliced_list)])
2979            .unwrap();
2980
2981        assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); // [32, 52] > [32, 52, 12]
2982        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52]
2983        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null] < [32, 52]
2984        assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); // [] > [32, 52]
2985        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < []
2986
2987        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2988        assert_eq!(back.len(), 1);
2989        back[0].to_data().validate_full().unwrap();
2990        assert_eq!(&back[0], &sliced_list);
2991    }
2992
2993    fn test_nested_list<O: OffsetSizeTrait>() {
2994        let mut builder =
2995            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2996
2997        builder.values().values().append_value(1);
2998        builder.values().values().append_value(2);
2999        builder.values().append(true);
3000        builder.values().values().append_value(1);
3001        builder.values().values().append_null();
3002        builder.values().append(true);
3003        builder.append(true);
3004
3005        builder.values().values().append_value(1);
3006        builder.values().values().append_null();
3007        builder.values().append(true);
3008        builder.values().values().append_value(1);
3009        builder.values().values().append_null();
3010        builder.values().append(true);
3011        builder.append(true);
3012
3013        builder.values().values().append_value(1);
3014        builder.values().values().append_null();
3015        builder.values().append(true);
3016        builder.values().append(false);
3017        builder.append(true);
3018        builder.append(false);
3019
3020        builder.values().values().append_value(1);
3021        builder.values().values().append_value(2);
3022        builder.values().append(true);
3023        builder.append(true);
3024
3025        let list = Arc::new(builder.finish()) as ArrayRef;
3026        let d = list.data_type().clone();
3027
3028        // [
3029        //   [[1, 2], [1, null]],
3030        //   [[1, null], [1, null]],
3031        //   [[1, null], null]
3032        //   null
3033        //   [[1, 2]]
3034        // ]
3035        let options = SortOptions::default().asc().with_nulls_first(true);
3036        let field = SortField::new_with_options(d.clone(), options);
3037        let converter = RowConverter::new(vec![field]).unwrap();
3038        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3039
3040        assert!(rows.row(0) > rows.row(1));
3041        assert!(rows.row(1) > rows.row(2));
3042        assert!(rows.row(2) > rows.row(3));
3043        assert!(rows.row(4) < rows.row(0));
3044        assert!(rows.row(4) > rows.row(1));
3045
3046        let back = converter.convert_rows(&rows).unwrap();
3047        assert_eq!(back.len(), 1);
3048        back[0].to_data().validate_full().unwrap();
3049        assert_eq!(&back[0], &list);
3050
3051        let options = SortOptions::default().desc().with_nulls_first(true);
3052        let field = SortField::new_with_options(d.clone(), options);
3053        let converter = RowConverter::new(vec![field]).unwrap();
3054        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3055
3056        assert!(rows.row(0) > rows.row(1));
3057        assert!(rows.row(1) > rows.row(2));
3058        assert!(rows.row(2) > rows.row(3));
3059        assert!(rows.row(4) > rows.row(0));
3060        assert!(rows.row(4) > rows.row(1));
3061
3062        let back = converter.convert_rows(&rows).unwrap();
3063        assert_eq!(back.len(), 1);
3064        back[0].to_data().validate_full().unwrap();
3065        assert_eq!(&back[0], &list);
3066
3067        let options = SortOptions::default().desc().with_nulls_first(false);
3068        let field = SortField::new_with_options(d, options);
3069        let converter = RowConverter::new(vec![field]).unwrap();
3070        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3071
3072        assert!(rows.row(0) < rows.row(1));
3073        assert!(rows.row(1) < rows.row(2));
3074        assert!(rows.row(2) < rows.row(3));
3075        assert!(rows.row(4) > rows.row(0));
3076        assert!(rows.row(4) < rows.row(1));
3077
3078        let back = converter.convert_rows(&rows).unwrap();
3079        assert_eq!(back.len(), 1);
3080        back[0].to_data().validate_full().unwrap();
3081        assert_eq!(&back[0], &list);
3082
3083        let sliced_list = list.slice(1, 3);
3084        let rows = converter
3085            .convert_columns(&[Arc::clone(&sliced_list)])
3086            .unwrap();
3087
3088        assert!(rows.row(0) < rows.row(1));
3089        assert!(rows.row(1) < rows.row(2));
3090
3091        let back = converter.convert_rows(&rows).unwrap();
3092        assert_eq!(back.len(), 1);
3093        back[0].to_data().validate_full().unwrap();
3094        assert_eq!(&back[0], &sliced_list);
3095    }
3096
3097    #[test]
3098    fn test_list() {
3099        test_single_list::<i32>();
3100        test_nested_list::<i32>();
3101    }
3102
3103    #[test]
3104    fn test_large_list() {
3105        test_single_list::<i64>();
3106        test_nested_list::<i64>();
3107    }
3108
3109    #[test]
3110    fn test_fixed_size_list() {
3111        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3112        builder.values().append_value(32);
3113        builder.values().append_value(52);
3114        builder.values().append_value(32);
3115        builder.append(true);
3116        builder.values().append_value(32);
3117        builder.values().append_value(52);
3118        builder.values().append_value(12);
3119        builder.append(true);
3120        builder.values().append_value(32);
3121        builder.values().append_value(52);
3122        builder.values().append_null();
3123        builder.append(true);
3124        builder.values().append_value(32); // MASKED
3125        builder.values().append_value(52); // MASKED
3126        builder.values().append_value(13); // MASKED
3127        builder.append(false);
3128        builder.values().append_value(32);
3129        builder.values().append_null();
3130        builder.values().append_null();
3131        builder.append(true);
3132        builder.values().append_null();
3133        builder.values().append_null();
3134        builder.values().append_null();
3135        builder.append(true);
3136        builder.values().append_value(17); // MASKED
3137        builder.values().append_null(); // MASKED
3138        builder.values().append_value(77); // MASKED
3139        builder.append(false);
3140
3141        let list = Arc::new(builder.finish()) as ArrayRef;
3142        let d = list.data_type().clone();
3143
3144        // Default sorting (ascending, nulls first)
3145        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3146
3147        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3148        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3149        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12]
3150        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
3151        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
3152        assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null]
3153        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
3154        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3155
3156        let back = converter.convert_rows(&rows).unwrap();
3157        assert_eq!(back.len(), 1);
3158        back[0].to_data().validate_full().unwrap();
3159        assert_eq!(&back[0], &list);
3160
3161        // Ascending, null last
3162        let options = SortOptions::default().asc().with_nulls_first(false);
3163        let field = SortField::new_with_options(d.clone(), options);
3164        let converter = RowConverter::new(vec![field]).unwrap();
3165        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3166        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3167        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
3168        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
3169        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
3170        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
3171        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
3172        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3173
3174        let back = converter.convert_rows(&rows).unwrap();
3175        assert_eq!(back.len(), 1);
3176        back[0].to_data().validate_full().unwrap();
3177        assert_eq!(&back[0], &list);
3178
3179        // Descending, nulls last
3180        let options = SortOptions::default().desc().with_nulls_first(false);
3181        let field = SortField::new_with_options(d.clone(), options);
3182        let converter = RowConverter::new(vec![field]).unwrap();
3183        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3184        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3185        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
3186        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
3187        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
3188        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
3189        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
3190        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3191
3192        let back = converter.convert_rows(&rows).unwrap();
3193        assert_eq!(back.len(), 1);
3194        back[0].to_data().validate_full().unwrap();
3195        assert_eq!(&back[0], &list);
3196
3197        // Descending, nulls first
3198        let options = SortOptions::default().desc().with_nulls_first(true);
3199        let field = SortField::new_with_options(d, options);
3200        let converter = RowConverter::new(vec![field]).unwrap();
3201        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3202
3203        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3204        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12]
3205        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
3206        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
3207        assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null]
3208        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
3209        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3210
3211        let back = converter.convert_rows(&rows).unwrap();
3212        assert_eq!(back.len(), 1);
3213        back[0].to_data().validate_full().unwrap();
3214        assert_eq!(&back[0], &list);
3215
3216        let sliced_list = list.slice(1, 5);
3217        let rows_on_sliced_list = converter
3218            .convert_columns(&[Arc::clone(&sliced_list)])
3219            .unwrap();
3220
3221        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52, null]
3222        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null, null] < [32, 52, null]
3223        assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); // [null, null, null] > [32, 52, null]
3224        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < [null, null, null]
3225
3226        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3227        assert_eq!(back.len(), 1);
3228        back[0].to_data().validate_full().unwrap();
3229        assert_eq!(&back[0], &sliced_list);
3230    }
3231
3232    #[test]
3233    fn test_two_fixed_size_lists() {
3234        let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3235        // 0: [100]
3236        first.values().append_value(100);
3237        first.append(true);
3238        // 1: [101]
3239        first.values().append_value(101);
3240        first.append(true);
3241        // 2: [102]
3242        first.values().append_value(102);
3243        first.append(true);
3244        // 3: [null]
3245        first.values().append_null();
3246        first.append(true);
3247        // 4: null
3248        first.values().append_null(); // MASKED
3249        first.append(false);
3250        let first = Arc::new(first.finish()) as ArrayRef;
3251        let first_type = first.data_type().clone();
3252
3253        let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3254        // 0: [200]
3255        second.values().append_value(200);
3256        second.append(true);
3257        // 1: [201]
3258        second.values().append_value(201);
3259        second.append(true);
3260        // 2: [202]
3261        second.values().append_value(202);
3262        second.append(true);
3263        // 3: [null]
3264        second.values().append_null();
3265        second.append(true);
3266        // 4: null
3267        second.values().append_null(); // MASKED
3268        second.append(false);
3269        let second = Arc::new(second.finish()) as ArrayRef;
3270        let second_type = second.data_type().clone();
3271
3272        let converter = RowConverter::new(vec![
3273            SortField::new(first_type.clone()),
3274            SortField::new(second_type.clone()),
3275        ])
3276        .unwrap();
3277
3278        let rows = converter
3279            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3280            .unwrap();
3281
3282        let back = converter.convert_rows(&rows).unwrap();
3283        assert_eq!(back.len(), 2);
3284        back[0].to_data().validate_full().unwrap();
3285        assert_eq!(&back[0], &first);
3286        back[1].to_data().validate_full().unwrap();
3287        assert_eq!(&back[1], &second);
3288    }
3289
3290    #[test]
3291    fn test_fixed_size_list_with_variable_width_content() {
3292        let mut first = FixedSizeListBuilder::new(
3293            StructBuilder::from_fields(
3294                vec![
3295                    Field::new(
3296                        "timestamp",
3297                        DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3298                        false,
3299                    ),
3300                    Field::new("offset_minutes", DataType::Int16, false),
3301                    Field::new("time_zone", DataType::Utf8, false),
3302                ],
3303                1,
3304            ),
3305            1,
3306        );
3307        // 0: null
3308        first
3309            .values()
3310            .field_builder::<TimestampMicrosecondBuilder>(0)
3311            .unwrap()
3312            .append_null();
3313        first
3314            .values()
3315            .field_builder::<Int16Builder>(1)
3316            .unwrap()
3317            .append_null();
3318        first
3319            .values()
3320            .field_builder::<StringBuilder>(2)
3321            .unwrap()
3322            .append_null();
3323        first.values().append(false);
3324        first.append(false);
3325        // 1: [null]
3326        first
3327            .values()
3328            .field_builder::<TimestampMicrosecondBuilder>(0)
3329            .unwrap()
3330            .append_null();
3331        first
3332            .values()
3333            .field_builder::<Int16Builder>(1)
3334            .unwrap()
3335            .append_null();
3336        first
3337            .values()
3338            .field_builder::<StringBuilder>(2)
3339            .unwrap()
3340            .append_null();
3341        first.values().append(false);
3342        first.append(true);
3343        // 2: [1970-01-01 00:00:00.000000 UTC]
3344        first
3345            .values()
3346            .field_builder::<TimestampMicrosecondBuilder>(0)
3347            .unwrap()
3348            .append_value(0);
3349        first
3350            .values()
3351            .field_builder::<Int16Builder>(1)
3352            .unwrap()
3353            .append_value(0);
3354        first
3355            .values()
3356            .field_builder::<StringBuilder>(2)
3357            .unwrap()
3358            .append_value("UTC");
3359        first.values().append(true);
3360        first.append(true);
3361        // 3: [2005-09-10 13:30:00.123456 Europe/Warsaw]
3362        first
3363            .values()
3364            .field_builder::<TimestampMicrosecondBuilder>(0)
3365            .unwrap()
3366            .append_value(1126351800123456);
3367        first
3368            .values()
3369            .field_builder::<Int16Builder>(1)
3370            .unwrap()
3371            .append_value(120);
3372        first
3373            .values()
3374            .field_builder::<StringBuilder>(2)
3375            .unwrap()
3376            .append_value("Europe/Warsaw");
3377        first.values().append(true);
3378        first.append(true);
3379        let first = Arc::new(first.finish()) as ArrayRef;
3380        let first_type = first.data_type().clone();
3381
3382        let mut second = StringBuilder::new();
3383        second.append_value("somewhere near");
3384        second.append_null();
3385        second.append_value("Greenwich");
3386        second.append_value("Warsaw");
3387        let second = Arc::new(second.finish()) as ArrayRef;
3388        let second_type = second.data_type().clone();
3389
3390        let converter = RowConverter::new(vec![
3391            SortField::new(first_type.clone()),
3392            SortField::new(second_type.clone()),
3393        ])
3394        .unwrap();
3395
3396        let rows = converter
3397            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3398            .unwrap();
3399
3400        let back = converter.convert_rows(&rows).unwrap();
3401        assert_eq!(back.len(), 2);
3402        back[0].to_data().validate_full().unwrap();
3403        assert_eq!(&back[0], &first);
3404        back[1].to_data().validate_full().unwrap();
3405        assert_eq!(&back[1], &second);
3406    }
3407
3408    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3409    where
3410        K: ArrowPrimitiveType,
3411        StandardUniform: Distribution<K::Native>,
3412    {
3413        let mut rng = rng();
3414        (0..len)
3415            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3416            .collect()
3417    }
3418
3419    fn generate_strings<O: OffsetSizeTrait>(
3420        len: usize,
3421        valid_percent: f64,
3422    ) -> GenericStringArray<O> {
3423        let mut rng = rng();
3424        (0..len)
3425            .map(|_| {
3426                rng.random_bool(valid_percent).then(|| {
3427                    let len = rng.random_range(0..100);
3428                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3429                    String::from_utf8(bytes).unwrap()
3430                })
3431            })
3432            .collect()
3433    }
3434
3435    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3436        let mut rng = rng();
3437        (0..len)
3438            .map(|_| {
3439                rng.random_bool(valid_percent).then(|| {
3440                    let len = rng.random_range(0..100);
3441                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3442                    String::from_utf8(bytes).unwrap()
3443                })
3444            })
3445            .collect()
3446    }
3447
3448    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3449        let mut rng = rng();
3450        (0..len)
3451            .map(|_| {
3452                rng.random_bool(valid_percent).then(|| {
3453                    let len = rng.random_range(0..100);
3454                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3455                    bytes
3456                })
3457            })
3458            .collect()
3459    }
3460
3461    fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3462        let edge_cases = vec![
3463            Some("bar".to_string()),
3464            Some("bar\0".to_string()),
3465            Some("LongerThan12Bytes".to_string()),
3466            Some("LongerThan12Bytez".to_string()),
3467            Some("LongerThan12Bytes\0".to_string()),
3468            Some("LongerThan12Byt".to_string()),
3469            Some("backend one".to_string()),
3470            Some("backend two".to_string()),
3471            Some("a".repeat(257)),
3472            Some("a".repeat(300)),
3473        ];
3474
3475        // Fill up to `len` by repeating edge cases and trimming
3476        let mut values = Vec::with_capacity(len);
3477        for i in 0..len {
3478            values.push(
3479                edge_cases
3480                    .get(i % edge_cases.len())
3481                    .cloned()
3482                    .unwrap_or(None),
3483            );
3484        }
3485
3486        StringViewArray::from(values)
3487    }
3488
3489    fn generate_dictionary<K>(
3490        values: ArrayRef,
3491        len: usize,
3492        valid_percent: f64,
3493    ) -> DictionaryArray<K>
3494    where
3495        K: ArrowDictionaryKeyType,
3496        K::Native: SampleUniform,
3497    {
3498        let mut rng = rng();
3499        let min_key = K::Native::from_usize(0).unwrap();
3500        let max_key = K::Native::from_usize(values.len()).unwrap();
3501        let keys: PrimitiveArray<K> = (0..len)
3502            .map(|_| {
3503                rng.random_bool(valid_percent)
3504                    .then(|| rng.random_range(min_key..max_key))
3505            })
3506            .collect();
3507
3508        let data_type =
3509            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3510
3511        let data = keys
3512            .into_data()
3513            .into_builder()
3514            .data_type(data_type)
3515            .add_child_data(values.to_data())
3516            .build()
3517            .unwrap();
3518
3519        DictionaryArray::from(data)
3520    }
3521
3522    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3523        let mut rng = rng();
3524        let width = rng.random_range(0..20);
3525        let mut builder = FixedSizeBinaryBuilder::new(width);
3526
3527        let mut b = vec![0; width as usize];
3528        for _ in 0..len {
3529            match rng.random_bool(valid_percent) {
3530                true => {
3531                    b.iter_mut().for_each(|x| *x = rng.random());
3532                    builder.append_value(&b).unwrap();
3533                }
3534                false => builder.append_null(),
3535            }
3536        }
3537
3538        builder.finish()
3539    }
3540
3541    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3542        let mut rng = rng();
3543        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3544        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3545        let b = generate_strings::<i32>(len, valid_percent);
3546        let fields = Fields::from(vec![
3547            Field::new("a", DataType::Int32, true),
3548            Field::new("b", DataType::Utf8, true),
3549        ]);
3550        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3551        StructArray::new(fields, values, Some(nulls))
3552    }
3553
3554    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3555    where
3556        F: FnOnce(usize) -> ArrayRef,
3557    {
3558        let mut rng = rng();
3559        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3560        let values_len = offsets.last().unwrap().to_usize().unwrap();
3561        let values = values(values_len);
3562        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3563        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3564        ListArray::new(field, offsets, values, Some(nulls))
3565    }
3566
3567    fn generate_column(len: usize) -> ArrayRef {
3568        let mut rng = rng();
3569        match rng.random_range(0..18) {
3570            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3571            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3572            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3573            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3574            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3575            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3576            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3577            7 => Arc::new(generate_dictionary::<Int64Type>(
3578                // Cannot test dictionaries containing null values because of #2687
3579                Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3580                len,
3581                0.8,
3582            )),
3583            8 => Arc::new(generate_dictionary::<Int64Type>(
3584                // Cannot test dictionaries containing null values because of #2687
3585                Arc::new(generate_primitive_array::<Int64Type>(
3586                    rng.random_range(1..len),
3587                    1.0,
3588                )),
3589                len,
3590                0.8,
3591            )),
3592            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3593            10 => Arc::new(generate_struct(len, 0.8)),
3594            11 => Arc::new(generate_list(len, 0.8, |values_len| {
3595                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3596            })),
3597            12 => Arc::new(generate_list(len, 0.8, |values_len| {
3598                Arc::new(generate_strings::<i32>(values_len, 0.8))
3599            })),
3600            13 => Arc::new(generate_list(len, 0.8, |values_len| {
3601                Arc::new(generate_struct(values_len, 0.8))
3602            })),
3603            14 => Arc::new(generate_string_view(len, 0.8)),
3604            15 => Arc::new(generate_byte_view(len, 0.8)),
3605            16 => Arc::new(generate_fixed_stringview_column(len)),
3606            17 => Arc::new(
3607                generate_list(len + 1000, 0.8, |values_len| {
3608                    Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3609                })
3610                .slice(500, len),
3611            ),
3612            _ => unreachable!(),
3613        }
3614    }
3615
3616    fn print_row(cols: &[SortColumn], row: usize) -> String {
3617        let t: Vec<_> = cols
3618            .iter()
3619            .map(|x| match x.values.is_valid(row) {
3620                true => {
3621                    let opts = FormatOptions::default().with_null("NULL");
3622                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3623                    formatter.value(row).to_string()
3624                }
3625                false => "NULL".to_string(),
3626            })
3627            .collect();
3628        t.join(",")
3629    }
3630
3631    fn print_col_types(cols: &[SortColumn]) -> String {
3632        let t: Vec<_> = cols
3633            .iter()
3634            .map(|x| x.values.data_type().to_string())
3635            .collect();
3636        t.join(",")
3637    }
3638
3639    #[test]
3640    #[cfg_attr(miri, ignore)]
3641    fn fuzz_test() {
3642        for _ in 0..100 {
3643            let mut rng = rng();
3644            let num_columns = rng.random_range(1..5);
3645            let len = rng.random_range(5..100);
3646            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3647
3648            let options: Vec<_> = (0..num_columns)
3649                .map(|_| SortOptions {
3650                    descending: rng.random_bool(0.5),
3651                    nulls_first: rng.random_bool(0.5),
3652                })
3653                .collect();
3654
3655            let sort_columns: Vec<_> = options
3656                .iter()
3657                .zip(&arrays)
3658                .map(|(o, c)| SortColumn {
3659                    values: Arc::clone(c),
3660                    options: Some(*o),
3661                })
3662                .collect();
3663
3664            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3665
3666            let columns: Vec<SortField> = options
3667                .into_iter()
3668                .zip(&arrays)
3669                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3670                .collect();
3671
3672            let converter = RowConverter::new(columns).unwrap();
3673            let rows = converter.convert_columns(&arrays).unwrap();
3674
3675            for i in 0..len {
3676                for j in 0..len {
3677                    let row_i = rows.row(i);
3678                    let row_j = rows.row(j);
3679                    let row_cmp = row_i.cmp(&row_j);
3680                    let lex_cmp = comparator.compare(i, j);
3681                    assert_eq!(
3682                        row_cmp,
3683                        lex_cmp,
3684                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3685                        print_row(&sort_columns, i),
3686                        print_row(&sort_columns, j),
3687                        row_i,
3688                        row_j,
3689                        print_col_types(&sort_columns)
3690                    );
3691                }
3692            }
3693
3694            // Convert rows produced from convert_columns().
3695            // Note: validate_utf8 is set to false since Row is initialized through empty_rows()
3696            let back = converter.convert_rows(&rows).unwrap();
3697            for (actual, expected) in back.iter().zip(&arrays) {
3698                actual.to_data().validate_full().unwrap();
3699                dictionary_eq(actual, expected)
3700            }
3701
3702            // Check that we can convert rows into ByteArray and then parse, convert it back to array
3703            // Note: validate_utf8 is set to true since Row is initialized through RowParser
3704            let rows = rows.try_into_binary().expect("reasonable size");
3705            let parser = converter.parser();
3706            let back = converter
3707                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3708                .unwrap();
3709            for (actual, expected) in back.iter().zip(&arrays) {
3710                actual.to_data().validate_full().unwrap();
3711                dictionary_eq(actual, expected)
3712            }
3713
3714            let rows = converter.from_binary(rows);
3715            let back = converter.convert_rows(&rows).unwrap();
3716            for (actual, expected) in back.iter().zip(&arrays) {
3717                actual.to_data().validate_full().unwrap();
3718                dictionary_eq(actual, expected)
3719            }
3720        }
3721    }
3722
3723    #[test]
3724    fn test_clear() {
3725        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3726        let mut rows = converter.empty_rows(3, 128);
3727
3728        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3729        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3730        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3731
3732        for array in arrays.iter() {
3733            rows.clear();
3734            converter
3735                .append(&mut rows, std::slice::from_ref(array))
3736                .unwrap();
3737            let back = converter.convert_rows(&rows).unwrap();
3738            assert_eq!(&back[0], array);
3739        }
3740
3741        let mut rows_expected = converter.empty_rows(3, 128);
3742        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3743
3744        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3745            assert_eq!(
3746                actual, expected,
3747                "For row {i}: expected {expected:?}, actual: {actual:?}",
3748            );
3749        }
3750    }
3751
3752    #[test]
3753    fn test_append_codec_dictionary_binary() {
3754        use DataType::*;
3755        // Dictionary RowConverter
3756        let converter = RowConverter::new(vec![SortField::new(Dictionary(
3757            Box::new(Int32),
3758            Box::new(Binary),
3759        ))])
3760        .unwrap();
3761        let mut rows = converter.empty_rows(4, 128);
3762
3763        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3764        let values = BinaryArray::from(vec![
3765            Some("a".as_bytes()),
3766            Some(b"b"),
3767            Some(b"c"),
3768            Some(b"d"),
3769        ]);
3770        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3771
3772        rows.clear();
3773        let array = Arc::new(dict_array) as ArrayRef;
3774        converter
3775            .append(&mut rows, std::slice::from_ref(&array))
3776            .unwrap();
3777        let back = converter.convert_rows(&rows).unwrap();
3778
3779        dictionary_eq(&back[0], &array);
3780    }
3781
3782    #[test]
3783    fn test_list_prefix() {
3784        let mut a = ListBuilder::new(Int8Builder::new());
3785        a.append_value([None]);
3786        a.append_value([None, None]);
3787        let a = a.finish();
3788
3789        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3790        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3791        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3792    }
3793
3794    #[test]
3795    fn map_should_be_marked_as_unsupported() {
3796        let map_data_type = Field::new_map(
3797            "map",
3798            "entries",
3799            Field::new("key", DataType::Utf8, false),
3800            Field::new("value", DataType::Utf8, true),
3801            false,
3802            true,
3803        )
3804        .data_type()
3805        .clone();
3806
3807        let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3808
3809        assert!(!is_supported, "Map should not be supported");
3810    }
3811
3812    #[test]
3813    fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3814        let map_data_type = Field::new_map(
3815            "map",
3816            "entries",
3817            Field::new("key", DataType::Utf8, false),
3818            Field::new("value", DataType::Utf8, true),
3819            false,
3820            true,
3821        )
3822        .data_type()
3823        .clone();
3824
3825        let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3826
3827        match converter {
3828            Err(ArrowError::NotYetImplemented(message)) => {
3829                assert!(
3830                    message.contains("Row format support not yet implemented for"),
3831                    "Expected NotYetImplemented error for map data type, got: {message}",
3832                );
3833            }
3834            Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3835            Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3836        }
3837    }
3838
3839    #[test]
3840    fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3841        fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3842            // 1. Convert cols into rows
3843            let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3844
3845            // 2a. Convert rows into colsa (validate_utf8 = false)
3846            let rows = converter.convert_columns(&[col]).unwrap();
3847            let converted = converter.convert_rows(&rows).unwrap();
3848            let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3849
3850            // 2b. Convert rows into cols (validate_utf8 = true since Row is initialized through RowParser)
3851            let rows = rows.try_into_binary().expect("reasonable size");
3852            let parser = converter.parser();
3853            let converted = converter
3854                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3855                .unwrap();
3856            let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3857            (unchecked_values_len, checked_values_len)
3858        }
3859
3860        // Case1. StringViewArray with inline strings
3861        let col = Arc::new(StringViewArray::from_iter([
3862            Some("hello"), // short(5)
3863            None,          // null
3864            Some("short"), // short(5)
3865            Some("tiny"),  // short(4)
3866        ])) as ArrayRef;
3867
3868        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3869        // Since there are no long (>12) strings, len of values buffer is 0
3870        assert_eq!(unchecked_values_len, 0);
3871        // When utf8 validation enabled, values buffer includes inline strings (5+5+4)
3872        assert_eq!(checked_values_len, 14);
3873
3874        // Case2. StringViewArray with long(>12) strings
3875        let col = Arc::new(StringViewArray::from_iter([
3876            Some("this is a very long string over 12 bytes"),
3877            Some("another long string to test the buffer"),
3878        ])) as ArrayRef;
3879
3880        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3881        // Since there are no inline strings, expected length of values buffer is the same
3882        assert!(unchecked_values_len > 0);
3883        assert_eq!(unchecked_values_len, checked_values_len);
3884
3885        // Case3. StringViewArray with both short and long strings
3886        let col = Arc::new(StringViewArray::from_iter([
3887            Some("tiny"),          // 4 (short)
3888            Some("thisisexact13"), // 13 (long)
3889            None,
3890            Some("short"), // 5 (short)
3891        ])) as ArrayRef;
3892
3893        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3894        // Since there is single long string, len of values buffer is 13
3895        assert_eq!(unchecked_values_len, 13);
3896        assert!(checked_values_len > unchecked_values_len);
3897    }
3898
3899    #[test]
3900    fn test_sparse_union() {
3901        // create a sparse union with Int32 (type_id = 0) and Utf8 (type_id = 1)
3902        let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3903        let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
3904
3905        // [1, "b", 3, "d", 5]
3906        let type_ids = vec![0, 1, 0, 1, 0].into();
3907
3908        let union_fields = [
3909            (0, Arc::new(Field::new("int", DataType::Int32, false))),
3910            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3911        ]
3912        .into_iter()
3913        .collect();
3914
3915        let union_array = UnionArray::try_new(
3916            union_fields,
3917            type_ids,
3918            None,
3919            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3920        )
3921        .unwrap();
3922
3923        let union_type = union_array.data_type().clone();
3924        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3925
3926        let rows = converter
3927            .convert_columns(&[Arc::new(union_array.clone())])
3928            .unwrap();
3929
3930        // round trip
3931        let back = converter.convert_rows(&rows).unwrap();
3932        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3933
3934        assert_eq!(union_array.len(), back_union.len());
3935        for i in 0..union_array.len() {
3936            assert_eq!(union_array.type_id(i), back_union.type_id(i));
3937        }
3938    }
3939
3940    #[test]
3941    fn test_sparse_union_with_nulls() {
3942        // create a sparse union with Int32 (type_id = 0) and Utf8 (type_id = 1)
3943        let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3944        let str_array = StringArray::from(vec![None::<&str>; 5]);
3945
3946        // [1, null (both children null), 3, null (both children null), 5]
3947        let type_ids = vec![0, 1, 0, 1, 0].into();
3948
3949        let union_fields = [
3950            (0, Arc::new(Field::new("int", DataType::Int32, true))),
3951            (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3952        ]
3953        .into_iter()
3954        .collect();
3955
3956        let union_array = UnionArray::try_new(
3957            union_fields,
3958            type_ids,
3959            None,
3960            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3961        )
3962        .unwrap();
3963
3964        let union_type = union_array.data_type().clone();
3965        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3966
3967        let rows = converter
3968            .convert_columns(&[Arc::new(union_array.clone())])
3969            .unwrap();
3970
3971        // round trip
3972        let back = converter.convert_rows(&rows).unwrap();
3973        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3974
3975        assert_eq!(union_array.len(), back_union.len());
3976        for i in 0..union_array.len() {
3977            let expected_null = union_array.is_null(i);
3978            let actual_null = back_union.is_null(i);
3979            assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3980            if !expected_null {
3981                assert_eq!(union_array.type_id(i), back_union.type_id(i));
3982            }
3983        }
3984    }
3985
3986    #[test]
3987    fn test_dense_union() {
3988        // create a dense union with Int32 (type_id = 0) and use Utf8 (type_id = 1)
3989        let int_array = Int32Array::from(vec![1, 3, 5]);
3990        let str_array = StringArray::from(vec!["a", "b"]);
3991
3992        let type_ids = vec![0, 1, 0, 1, 0].into();
3993
3994        // [1, "a", 3, "b", 5]
3995        let offsets = vec![0, 0, 1, 1, 2].into();
3996
3997        let union_fields = [
3998            (0, Arc::new(Field::new("int", DataType::Int32, false))),
3999            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4000        ]
4001        .into_iter()
4002        .collect();
4003
4004        let union_array = UnionArray::try_new(
4005            union_fields,
4006            type_ids,
4007            Some(offsets), // Dense mode
4008            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4009        )
4010        .unwrap();
4011
4012        let union_type = union_array.data_type().clone();
4013        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4014
4015        let rows = converter
4016            .convert_columns(&[Arc::new(union_array.clone())])
4017            .unwrap();
4018
4019        // round trip
4020        let back = converter.convert_rows(&rows).unwrap();
4021        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4022
4023        assert_eq!(union_array.len(), back_union.len());
4024        for i in 0..union_array.len() {
4025            assert_eq!(union_array.type_id(i), back_union.type_id(i));
4026        }
4027    }
4028
4029    #[test]
4030    fn test_dense_union_with_nulls() {
4031        // create a dense union with Int32 (type_id = 0) and Utf8 (type_id = 1)
4032        let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
4033        let str_array = StringArray::from(vec![Some("a"), None]);
4034
4035        // [1, "a", 5, null (str null), null (int null)]
4036        let type_ids = vec![0, 1, 0, 1, 0].into();
4037        let offsets = vec![0, 0, 1, 1, 2].into();
4038
4039        let union_fields = [
4040            (0, Arc::new(Field::new("int", DataType::Int32, true))),
4041            (1, Arc::new(Field::new("str", DataType::Utf8, true))),
4042        ]
4043        .into_iter()
4044        .collect();
4045
4046        let union_array = UnionArray::try_new(
4047            union_fields,
4048            type_ids,
4049            Some(offsets),
4050            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4051        )
4052        .unwrap();
4053
4054        let union_type = union_array.data_type().clone();
4055        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4056
4057        let rows = converter
4058            .convert_columns(&[Arc::new(union_array.clone())])
4059            .unwrap();
4060
4061        // round trip
4062        let back = converter.convert_rows(&rows).unwrap();
4063        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4064
4065        assert_eq!(union_array.len(), back_union.len());
4066        for i in 0..union_array.len() {
4067            let expected_null = union_array.is_null(i);
4068            let actual_null = back_union.is_null(i);
4069            assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
4070            if !expected_null {
4071                assert_eq!(union_array.type_id(i), back_union.type_id(i));
4072            }
4073        }
4074    }
4075
4076    #[test]
4077    fn test_union_ordering() {
4078        let int_array = Int32Array::from(vec![100, 5, 20]);
4079        let str_array = StringArray::from(vec!["z", "a"]);
4080
4081        // [100, "z", 5, "a", 20]
4082        let type_ids = vec![0, 1, 0, 1, 0].into();
4083        let offsets = vec![0, 0, 1, 1, 2].into();
4084
4085        let union_fields = [
4086            (0, Arc::new(Field::new("int", DataType::Int32, false))),
4087            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4088        ]
4089        .into_iter()
4090        .collect();
4091
4092        let union_array = UnionArray::try_new(
4093            union_fields,
4094            type_ids,
4095            Some(offsets),
4096            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4097        )
4098        .unwrap();
4099
4100        let union_type = union_array.data_type().clone();
4101        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4102
4103        let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
4104
4105        /*
4106        expected ordering
4107
4108        row 2: 5    - type_id 0
4109        row 4: 20   - type_id 0
4110        row 0: 100  - type id 0
4111        row 3: "a"  - type id 1
4112        row 1: "z"  - type id 1
4113        */
4114
4115        // 5 < "z"
4116        assert!(rows.row(2) < rows.row(1));
4117
4118        // 100 < "a"
4119        assert!(rows.row(0) < rows.row(3));
4120
4121        // among ints
4122        // 5 < 20
4123        assert!(rows.row(2) < rows.row(4));
4124        // 20 < 100
4125        assert!(rows.row(4) < rows.row(0));
4126
4127        // among strigns
4128        // "a" < "z"
4129        assert!(rows.row(3) < rows.row(1));
4130    }
4131
4132    #[test]
4133    fn test_row_converter_roundtrip_with_many_union_columns() {
4134        // col 1: Union(Int32, Utf8) [67, "hello"]
4135        let fields1 = UnionFields::try_new(
4136            vec![0, 1],
4137            vec![
4138                Field::new("int", DataType::Int32, true),
4139                Field::new("string", DataType::Utf8, true),
4140            ],
4141        )
4142        .unwrap();
4143
4144        let int_array1 = Int32Array::from(vec![Some(67), None]);
4145        let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
4146        let type_ids1 = vec![0i8, 1].into();
4147
4148        let union_array1 = UnionArray::try_new(
4149            fields1.clone(),
4150            type_ids1,
4151            None,
4152            vec![
4153                Arc::new(int_array1) as ArrayRef,
4154                Arc::new(string_array1) as ArrayRef,
4155            ],
4156        )
4157        .unwrap();
4158
4159        // col 2: Union(Int32, Utf8) [100, "world"]
4160        let fields2 = UnionFields::try_new(
4161            vec![0, 1],
4162            vec![
4163                Field::new("int", DataType::Int32, true),
4164                Field::new("string", DataType::Utf8, true),
4165            ],
4166        )
4167        .unwrap();
4168
4169        let int_array2 = Int32Array::from(vec![Some(100), None]);
4170        let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
4171        let type_ids2 = vec![0i8, 1].into();
4172
4173        let union_array2 = UnionArray::try_new(
4174            fields2.clone(),
4175            type_ids2,
4176            None,
4177            vec![
4178                Arc::new(int_array2) as ArrayRef,
4179                Arc::new(string_array2) as ArrayRef,
4180            ],
4181        )
4182        .unwrap();
4183
4184        // create a row converter with 2 union columns
4185        let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
4186        let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
4187
4188        let sort_field1 = SortField::new(field1.data_type().clone());
4189        let sort_field2 = SortField::new(field2.data_type().clone());
4190
4191        let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
4192
4193        let rows = converter
4194            .convert_columns(&[
4195                Arc::new(union_array1.clone()) as ArrayRef,
4196                Arc::new(union_array2.clone()) as ArrayRef,
4197            ])
4198            .unwrap();
4199
4200        // roundtrip
4201        let out = converter.convert_rows(&rows).unwrap();
4202
4203        let [col1, col2] = out.as_slice() else {
4204            panic!("expected 2 columns")
4205        };
4206
4207        let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4208        let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
4209
4210        for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
4211            assert_eq!(expected.len(), got.len());
4212            assert_eq!(expected.type_ids(), got.type_ids());
4213
4214            for i in 0..expected.len() {
4215                assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
4216            }
4217        }
4218    }
4219
4220    #[test]
4221    fn test_row_converter_roundtrip_with_one_union_column() {
4222        let fields = UnionFields::try_new(
4223            vec![0, 1],
4224            vec![
4225                Field::new("int", DataType::Int32, true),
4226                Field::new("string", DataType::Utf8, true),
4227            ],
4228        )
4229        .unwrap();
4230
4231        let int_array = Int32Array::from(vec![Some(67), None]);
4232        let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
4233        let type_ids = vec![0i8, 1].into();
4234
4235        let union_array = UnionArray::try_new(
4236            fields.clone(),
4237            type_ids,
4238            None,
4239            vec![
4240                Arc::new(int_array) as ArrayRef,
4241                Arc::new(string_array) as ArrayRef,
4242            ],
4243        )
4244        .unwrap();
4245
4246        let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
4247        let sort_field = SortField::new(field.data_type().clone());
4248        let converter = RowConverter::new(vec![sort_field]).unwrap();
4249
4250        let rows = converter
4251            .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
4252            .unwrap();
4253
4254        // roundtrip
4255        let out = converter.convert_rows(&rows).unwrap();
4256
4257        let [col1] = out.as_slice() else {
4258            panic!("expected 1 column")
4259        };
4260
4261        let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4262        assert_eq!(col.len(), union_array.len());
4263        assert_eq!(col.type_ids(), union_array.type_ids());
4264
4265        for i in 0..col.len() {
4266            assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
4267        }
4268    }
4269
4270    #[test]
4271    fn rows_size_should_count_for_capacity() {
4272        let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
4273
4274        let empty_rows_size_with_preallocate_rows_and_data = {
4275            let rows = row_converter.empty_rows(1000, 1000);
4276
4277            rows.size()
4278        };
4279        let empty_rows_size_with_preallocate_rows = {
4280            let rows = row_converter.empty_rows(1000, 0);
4281
4282            rows.size()
4283        };
4284        let empty_rows_size_with_preallocate_data = {
4285            let rows = row_converter.empty_rows(0, 1000);
4286
4287            rows.size()
4288        };
4289        let empty_rows_size_without_preallocate = {
4290            let rows = row_converter.empty_rows(0, 0);
4291
4292            rows.size()
4293        };
4294
4295        assert!(
4296            empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_rows,
4297            "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_rows}"
4298        );
4299        assert!(
4300            empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_data,
4301            "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_data}"
4302        );
4303        assert!(
4304            empty_rows_size_with_preallocate_rows > empty_rows_size_without_preallocate,
4305            "{empty_rows_size_with_preallocate_rows} should be larger than {empty_rows_size_without_preallocate}"
4306        );
4307        assert!(
4308            empty_rows_size_with_preallocate_data > empty_rows_size_without_preallocate,
4309            "{empty_rows_size_with_preallocate_data} should be larger than {empty_rows_size_without_preallocate}"
4310        );
4311    }
4312
4313    #[test]
4314    fn reserve_should_increase_capacity_to_the_requested_size() {
4315        let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
4316        let mut empty_rows = row_converter.empty_rows(0, 0);
4317        empty_rows.reserve(50, 50);
4318        let before_size = empty_rows.size();
4319        empty_rows.reserve(50, 50);
4320        assert_eq!(
4321            empty_rows.size(),
4322            before_size,
4323            "Size should not change when reserving already reserved space"
4324        );
4325        empty_rows.reserve(10, 20);
4326        assert_eq!(
4327            empty_rows.size(),
4328            before_size,
4329            "Size should not change when already have space for the expected reserved data"
4330        );
4331
4332        empty_rows.reserve(100, 20);
4333        assert!(
4334            empty_rows.size() > before_size,
4335            "Size should increase when reserving more space than previously reserved"
4336        );
4337
4338        let before_size = empty_rows.size();
4339
4340        empty_rows.reserve(20, 100);
4341        assert!(
4342            empty_rows.size() > before_size,
4343            "Size should increase when reserving more space than previously reserved"
4344        );
4345    }
4346}