arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexsort
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
121//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
122//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
123//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
124//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
125//! [compared]: PartialOrd
126//! [compare]: PartialOrd
127
128#![doc(
129    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
148use crate::variable::{decode_binary, decode_string};
149use arrow_array::types::{Int16Type, Int32Type, Int64Type};
150
151mod fixed;
152mod list;
153mod run;
154mod variable;
155
156/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
157///
158/// *Note: The encoding of the row format may change from release to release.*
159///
160/// ## Overview
161///
162/// The row format is a variable length byte sequence created by
163/// concatenating the encoded form of each column. The encoding for
164/// each column depends on its datatype (and sort options).
165///
166/// The encoding is carefully designed in such a way that escaping is
167/// unnecessary: it is never ambiguous as to whether a byte is part of
168/// a sentinel (e.g. null) or a value.
169///
170/// ## Unsigned Integer Encoding
171///
172/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
173/// to the integer's length.
174///
175/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
176/// integer.
177///
178/// ```text
179///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
180///    3          │03│00│00│00│      │01│00│00│00│03│
181///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
182///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
183///   258         │02│01│00│00│      │01│00│00│01│02│
184///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
185///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
186///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
187///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
188///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
189///  NULL         │??│??│??│??│      │00│00│00│00│00│
190///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
191///
192///              32-bit (4 bytes)        Row Format
193///  Value        Little Endian
194/// ```
195///
196/// ## Signed Integer Encoding
197///
198/// Signed integers have their most significant sign bit flipped, and are then encoded in the
199/// same manner as an unsigned integer.
200///
201/// ```text
202///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
203///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
204///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
205///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
206///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
207///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
208///
209///  Value  32-bit (4 bytes)    High bit flipped      Row Format
210///          Little Endian
211/// ```
212///
213/// ## Float Encoding
214///
215/// Floats are converted from IEEE 754 representation to a signed integer representation
216/// by flipping all bar the sign bit if they are negative.
217///
218/// They are then encoded in the same manner as a signed integer.
219///
220/// ## Fixed Length Bytes Encoding
221///
222/// Fixed length bytes are encoded in the same fashion as primitive types above.
223///
224/// For a fixed length array of length `n`:
225///
226/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
227///
228/// A valid value is encoded as `1_u8` followed by the value bytes
229///
230/// ## Variable Length Bytes (including Strings) Encoding
231///
232/// A null is encoded as a `0_u8`.
233///
234/// An empty byte array is encoded as `1_u8`.
235///
236/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
237/// encoded using a block based scheme described below.
238///
239/// The byte array is broken up into fixed-width blocks, each block is written in turn
240/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
241/// with `0_u8` and written to the output, followed by the un-padded length in bytes
242/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
243/// blocks using a length of 32, this is to reduce space amplification for small strings.
244///
245/// Note the following example encodings use a block size of 4 bytes for brevity:
246///
247/// ```text
248///                       ┌───┬───┬───┬───┬───┬───┐
249///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
250///                       └───┴───┴───┴───┴───┴───┘
251///
252///                       ┌───┐
253///  ""                   │01 |
254///                       └───┘
255///
256///  NULL                 ┌───┐
257///                       │00 │
258///                       └───┘
259///
260/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
261///                       │02 │'D'│'e'│'f'│'e'│FF │
262///                       └───┼───┼───┼───┼───┼───┤
263///                           │'n'│'e'│'s'│'t'│FF │
264///                           ├───┼───┼───┼───┼───┤
265///                           │'r'│'a'│'t'│'r'│FF │
266///                           ├───┼───┼───┼───┼───┤
267///                           │'a'│'t'│'i'│'o'│FF │
268///                           ├───┼───┼───┼───┼───┤
269///                           │'n'│00 │00 │00 │01 │
270///                           └───┴───┴───┴───┴───┘
271/// ```
272///
273/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
274/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
275///
276/// ## Dictionary Encoding
277///
278/// Dictionary encoded arrays are hydrated to their underlying values
279///
280/// ## REE Encoding
281///
282/// REE (Run End Encoding) arrays, A form of Run Length Encoding, are hydrated to their underlying values.
283///
284/// ## Struct Encoding
285///
286/// A null is encoded as a `0_u8`.
287///
288/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
289///
290/// This encoding effectively flattens the schema in a depth-first fashion.
291///
292/// For example
293///
294/// ```text
295/// ┌───────┬────────────────────────┬───────┐
296/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
297/// └───────┴────────────────────────┴───────┘
298/// ```
299///
300/// Is encoded as
301///
302/// ```text
303/// ┌───────┬───────────────┬───────┬─────────┬───────┐
304/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
305/// └───────┴───────────────┴───────┴─────────┴───────┘
306/// ```
307///
308/// ## List Encoding
309///
310/// Lists are encoded by first encoding all child elements to the row format.
311///
312/// A list value is then encoded as the concatenation of each of the child elements,
313/// separately encoded using the variable length encoding described above, followed
314/// by the variable length encoding of an empty byte array.
315///
316/// For example given:
317///
318/// ```text
319/// [1_u8, 2_u8, 3_u8]
320/// [1_u8, null]
321/// []
322/// null
323/// ```
324///
325/// The elements would be converted to:
326///
327/// ```text
328///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
329///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
330///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
331///```
332///
333/// Which would be encoded as
334///
335/// ```text
336///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
337///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
338///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
339///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
340///
341///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
342///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
343///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
344///                          └──── 1_u8 ────┘   └──── null ────┘
345///
346///```
347///
348/// With `[]` represented by an empty byte array, and `null` a null byte array.
349///
350/// ## Fixed Size List Encoding
351///
352/// Fixed Size Lists are encoded by first encoding all child elements to the row format.
353///
354/// A non-null list value is then encoded as 0x01 followed by the concatenation of each
355/// of the child elements. A null list value is encoded as a null marker.
356///
357/// For example given:
358///
359/// ```text
360/// [1_u8, 2_u8]
361/// [3_u8, null]
362/// null
363/// ```
364///
365/// The elements would be converted to:
366///
367/// ```text
368///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
369///  1  │01│01│  2  │01│02│  3  │01│03│  null  │00│00│
370///     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
371///```
372///
373/// Which would be encoded as
374///
375/// ```text
376///                 ┌──┬──┬──┬──┬──┐
377///  [1_u8, 2_u8]   │01│01│01│01│02│
378///                 └──┴──┴──┴──┴──┘
379///                     └ 1 ┘ └ 2 ┘
380///                 ┌──┬──┬──┬──┬──┐
381///  [3_u8, null]   │01│01│03│00│00│
382///                 └──┴──┴──┴──┴──┘
383///                     └ 1 ┘ └null┘
384///                 ┌──┐
385///  null           │00│
386///                 └──┘
387///
388///```
389///
390/// # Ordering
391///
392/// ## Float Ordering
393///
394/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
395/// in the IEEE 754 (2008 revision) floating point standard.
396///
397/// The ordering established by this does not always agree with the
398/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
399/// they consider negative and positive zero equal, while this does not
400///
401/// ## Null Ordering
402///
403/// The encoding described above will order nulls first, this can be inverted by representing
404/// nulls as `0xFF_u8` instead of `0_u8`
405///
406/// ## Reverse Column Ordering
407///
408/// The order of a given column can be reversed by negating the encoded bytes of non-null values
409///
410/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
411/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
412#[derive(Debug)]
413pub struct RowConverter {
414    fields: Arc<[SortField]>,
415    /// State for codecs
416    codecs: Vec<Codec>,
417}
418
419#[derive(Debug)]
420enum Codec {
421    /// No additional codec state is necessary
422    Stateless,
423    /// A row converter for the dictionary values
424    /// and the encoding of a row containing only nulls
425    Dictionary(RowConverter, OwnedRow),
426    /// A row converter for the child fields
427    /// and the encoding of a row containing only nulls
428    Struct(RowConverter, OwnedRow),
429    /// A row converter for the child field
430    List(RowConverter),
431    /// A row converter for the values array of a run-end encoded array
432    RunEndEncoded(RowConverter),
433}
434
435impl Codec {
436    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
437        match &sort_field.data_type {
438            DataType::Dictionary(_, values) => {
439                let sort_field =
440                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
441
442                let converter = RowConverter::new(vec![sort_field])?;
443                let null_array = new_null_array(values.as_ref(), 1);
444                let nulls = converter.convert_columns(&[null_array])?;
445
446                let owned = OwnedRow {
447                    data: nulls.buffer.into(),
448                    config: nulls.config,
449                };
450                Ok(Self::Dictionary(converter, owned))
451            }
452            DataType::RunEndEncoded(_, values) => {
453                // Similar to List implementation
454                let options = SortOptions {
455                    descending: false,
456                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
457                };
458
459                let field = SortField::new_with_options(values.data_type().clone(), options);
460                let converter = RowConverter::new(vec![field])?;
461                Ok(Self::RunEndEncoded(converter))
462            }
463            d if !d.is_nested() => Ok(Self::Stateless),
464            DataType::List(f) | DataType::LargeList(f) => {
465                // The encoded contents will be inverted if descending is set to true
466                // As such we set `descending` to false and negate nulls first if it
467                // it set to true
468                let options = SortOptions {
469                    descending: false,
470                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
471                };
472
473                let field = SortField::new_with_options(f.data_type().clone(), options);
474                let converter = RowConverter::new(vec![field])?;
475                Ok(Self::List(converter))
476            }
477            DataType::FixedSizeList(f, _) => {
478                let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
479                let converter = RowConverter::new(vec![field])?;
480                Ok(Self::List(converter))
481            }
482            DataType::Struct(f) => {
483                let sort_fields = f
484                    .iter()
485                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
486                    .collect();
487
488                let converter = RowConverter::new(sort_fields)?;
489                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
490
491                let nulls = converter.convert_columns(&nulls)?;
492                let owned = OwnedRow {
493                    data: nulls.buffer.into(),
494                    config: nulls.config,
495                };
496
497                Ok(Self::Struct(converter, owned))
498            }
499            _ => Err(ArrowError::NotYetImplemented(format!(
500                "not yet implemented: {:?}",
501                sort_field.data_type
502            ))),
503        }
504    }
505
506    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
507        match self {
508            Codec::Stateless => Ok(Encoder::Stateless),
509            Codec::Dictionary(converter, nulls) => {
510                let values = array.as_any_dictionary().values().clone();
511                let rows = converter.convert_columns(&[values])?;
512                Ok(Encoder::Dictionary(rows, nulls.row()))
513            }
514            Codec::Struct(converter, null) => {
515                let v = as_struct_array(array);
516                let rows = converter.convert_columns(v.columns())?;
517                Ok(Encoder::Struct(rows, null.row()))
518            }
519            Codec::List(converter) => {
520                let values = match array.data_type() {
521                    DataType::List(_) => {
522                        let list_array = as_list_array(array);
523                        let first_offset = list_array.offsets()[0] as usize;
524                        let last_offset =
525                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
526
527                        // values can include more data than referenced in the ListArray, only encode
528                        // the referenced values.
529                        list_array
530                            .values()
531                            .slice(first_offset, last_offset - first_offset)
532                    }
533                    DataType::LargeList(_) => {
534                        let list_array = as_large_list_array(array);
535
536                        let first_offset = list_array.offsets()[0] as usize;
537                        let last_offset =
538                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
539
540                        // values can include more data than referenced in the LargeListArray, only encode
541                        // the referenced values.
542                        list_array
543                            .values()
544                            .slice(first_offset, last_offset - first_offset)
545                    }
546                    DataType::FixedSizeList(_, _) => {
547                        as_fixed_size_list_array(array).values().clone()
548                    }
549                    _ => unreachable!(),
550                };
551                let rows = converter.convert_columns(&[values])?;
552                Ok(Encoder::List(rows))
553            }
554            Codec::RunEndEncoded(converter) => {
555                let values = match array.data_type() {
556                    DataType::RunEndEncoded(r, _) => match r.data_type() {
557                        DataType::Int16 => array.as_run::<Int16Type>().values(),
558                        DataType::Int32 => array.as_run::<Int32Type>().values(),
559                        DataType::Int64 => array.as_run::<Int64Type>().values(),
560                        _ => unreachable!("Unsupported run end index type: {r:?}"),
561                    },
562                    _ => unreachable!(),
563                };
564                let rows = converter.convert_columns(&[values.clone()])?;
565                Ok(Encoder::RunEndEncoded(rows))
566            }
567        }
568    }
569
570    fn size(&self) -> usize {
571        match self {
572            Codec::Stateless => 0,
573            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
574            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
575            Codec::List(converter) => converter.size(),
576            Codec::RunEndEncoded(converter) => converter.size(),
577        }
578    }
579}
580
581#[derive(Debug)]
582enum Encoder<'a> {
583    /// No additional encoder state is necessary
584    Stateless,
585    /// The encoding of the child array and the encoding of a null row
586    Dictionary(Rows, Row<'a>),
587    /// The row encoding of the child arrays and the encoding of a null row
588    ///
589    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
590    /// values that are masked by a null in the parent StructArray, otherwise
591    /// this would establish an ordering between semantically null values
592    Struct(Rows, Row<'a>),
593    /// The row encoding of the child array
594    List(Rows),
595    /// The row encoding of the values array
596    RunEndEncoded(Rows),
597}
598
599/// Configure the data type and sort order for a given column
600#[derive(Debug, Clone, PartialEq, Eq)]
601pub struct SortField {
602    /// Sort options
603    options: SortOptions,
604    /// Data type
605    data_type: DataType,
606}
607
608impl SortField {
609    /// Create a new column with the given data type
610    pub fn new(data_type: DataType) -> Self {
611        Self::new_with_options(data_type, Default::default())
612    }
613
614    /// Create a new column with the given data type and [`SortOptions`]
615    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
616        Self { options, data_type }
617    }
618
619    /// Return size of this instance in bytes.
620    ///
621    /// Includes the size of `Self`.
622    pub fn size(&self) -> usize {
623        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
624    }
625}
626
627impl RowConverter {
628    /// Create a new [`RowConverter`] with the provided schema
629    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
630        if !Self::supports_fields(&fields) {
631            return Err(ArrowError::NotYetImplemented(format!(
632                "Row format support not yet implemented for: {fields:?}"
633            )));
634        }
635
636        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
637        Ok(Self {
638            fields: fields.into(),
639            codecs,
640        })
641    }
642
643    /// Check if the given fields are supported by the row format.
644    pub fn supports_fields(fields: &[SortField]) -> bool {
645        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
646    }
647
648    fn supports_datatype(d: &DataType) -> bool {
649        match d {
650            _ if !d.is_nested() => true,
651            DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
652                Self::supports_datatype(f.data_type())
653            }
654            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
655            DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
656            _ => false,
657        }
658    }
659
660    /// Convert [`ArrayRef`] columns into [`Rows`]
661    ///
662    /// See [`Row`] for information on when [`Row`] can be compared
663    ///
664    /// # Panics
665    ///
666    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
667    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
668        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
669        let mut rows = self.empty_rows(num_rows, 0);
670        self.append(&mut rows, columns)?;
671        Ok(rows)
672    }
673
674    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
675    ///
676    /// See [`Row`] for information on when [`Row`] can be compared
677    ///
678    /// # Panics
679    ///
680    /// Panics if
681    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
682    /// * The provided [`Rows`] were not created by this [`RowConverter`]
683    ///
684    /// ```
685    /// # use std::sync::Arc;
686    /// # use std::collections::HashSet;
687    /// # use arrow_array::cast::AsArray;
688    /// # use arrow_array::StringArray;
689    /// # use arrow_row::{Row, RowConverter, SortField};
690    /// # use arrow_schema::DataType;
691    /// #
692    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
693    /// let a1 = StringArray::from(vec!["hello", "world"]);
694    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
695    ///
696    /// let mut rows = converter.empty_rows(5, 128);
697    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
698    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
699    ///
700    /// let back = converter.convert_rows(&rows).unwrap();
701    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
702    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
703    /// ```
704    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
705        assert!(
706            Arc::ptr_eq(&rows.config.fields, &self.fields),
707            "rows were not produced by this RowConverter"
708        );
709
710        if columns.len() != self.fields.len() {
711            return Err(ArrowError::InvalidArgumentError(format!(
712                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
713                self.fields.len(),
714                columns.len()
715            )));
716        }
717        for colum in columns.iter().skip(1) {
718            if colum.len() != columns[0].len() {
719                return Err(ArrowError::InvalidArgumentError(format!(
720                    "RowConverter columns must all have the same length, expected {} got {}",
721                    columns[0].len(),
722                    colum.len()
723                )));
724            }
725        }
726
727        let encoders = columns
728            .iter()
729            .zip(&self.codecs)
730            .zip(self.fields.iter())
731            .map(|((column, codec), field)| {
732                if !column.data_type().equals_datatype(&field.data_type) {
733                    return Err(ArrowError::InvalidArgumentError(format!(
734                        "RowConverter column schema mismatch, expected {} got {}",
735                        field.data_type,
736                        column.data_type()
737                    )));
738                }
739                codec.encoder(column.as_ref())
740            })
741            .collect::<Result<Vec<_>, _>>()?;
742
743        let write_offset = rows.num_rows();
744        let lengths = row_lengths(columns, &encoders);
745        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
746        rows.buffer.resize(total, 0);
747
748        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
749            // We encode a column at a time to minimise dispatch overheads
750            encode_column(
751                &mut rows.buffer,
752                &mut rows.offsets[write_offset..],
753                column.as_ref(),
754                field.options,
755                &encoder,
756            )
757        }
758
759        if cfg!(debug_assertions) {
760            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
761            rows.offsets
762                .windows(2)
763                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
764        }
765
766        Ok(())
767    }
768
769    /// Convert [`Rows`] columns into [`ArrayRef`]
770    ///
771    /// # Panics
772    ///
773    /// Panics if the rows were not produced by this [`RowConverter`]
774    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
775    where
776        I: IntoIterator<Item = Row<'a>>,
777    {
778        let mut validate_utf8 = false;
779        let mut rows: Vec<_> = rows
780            .into_iter()
781            .map(|row| {
782                assert!(
783                    Arc::ptr_eq(&row.config.fields, &self.fields),
784                    "rows were not produced by this RowConverter"
785                );
786                validate_utf8 |= row.config.validate_utf8;
787                row.data
788            })
789            .collect();
790
791        // SAFETY
792        // We have validated that the rows came from this [`RowConverter`]
793        // and therefore must be valid
794        let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
795
796        if cfg!(test) {
797            for (i, row) in rows.iter().enumerate() {
798                if !row.is_empty() {
799                    return Err(ArrowError::InvalidArgumentError(format!(
800                        "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
801                        codecs = &self.codecs
802                    )));
803                }
804            }
805        }
806
807        Ok(result)
808    }
809
810    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
811    /// a total length of `data_capacity`
812    ///
813    /// This can be used to buffer a selection of [`Row`]
814    ///
815    /// ```
816    /// # use std::sync::Arc;
817    /// # use std::collections::HashSet;
818    /// # use arrow_array::cast::AsArray;
819    /// # use arrow_array::StringArray;
820    /// # use arrow_row::{Row, RowConverter, SortField};
821    /// # use arrow_schema::DataType;
822    /// #
823    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
824    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
825    ///
826    /// // Convert to row format and deduplicate
827    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
828    /// let mut distinct_rows = converter.empty_rows(3, 100);
829    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
830    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
831    ///
832    /// // Note: we could skip buffering and feed the filtered iterator directly
833    /// // into convert_rows, this is done for demonstration purposes only
834    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
835    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
836    /// assert_eq!(&values, &["hello", "world", "a"]);
837    /// ```
838    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
839        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
840        offsets.push(0);
841
842        Rows {
843            offsets,
844            buffer: Vec::with_capacity(data_capacity),
845            config: RowConfig {
846                fields: self.fields.clone(),
847                validate_utf8: false,
848            },
849        }
850    }
851
852    /// Create a new [Rows] instance from the given binary data.
853    ///
854    /// ```
855    /// # use std::sync::Arc;
856    /// # use std::collections::HashSet;
857    /// # use arrow_array::cast::AsArray;
858    /// # use arrow_array::StringArray;
859    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
860    /// # use arrow_schema::DataType;
861    /// #
862    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
863    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
864    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
865    ///
866    /// // We can convert rows into binary format and back in batch.
867    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
868    /// let binary = rows.try_into_binary().expect("known-small array");
869    /// let converted = converter.from_binary(binary.clone());
870    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
871    /// ```
872    ///
873    /// # Panics
874    ///
875    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
876    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
877    /// panic if the data is malformed.
878    pub fn from_binary(&self, array: BinaryArray) -> Rows {
879        assert_eq!(
880            array.null_count(),
881            0,
882            "can't construct Rows instance from array with nulls"
883        );
884        Rows {
885            buffer: array.values().to_vec(),
886            offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
887            config: RowConfig {
888                fields: Arc::clone(&self.fields),
889                validate_utf8: true,
890            },
891        }
892    }
893
894    /// Convert raw bytes into [`ArrayRef`]
895    ///
896    /// # Safety
897    ///
898    /// `rows` must contain valid data for this [`RowConverter`]
899    unsafe fn convert_raw(
900        &self,
901        rows: &mut [&[u8]],
902        validate_utf8: bool,
903    ) -> Result<Vec<ArrayRef>, ArrowError> {
904        self.fields
905            .iter()
906            .zip(&self.codecs)
907            .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
908            .collect()
909    }
910
911    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
912    pub fn parser(&self) -> RowParser {
913        RowParser::new(Arc::clone(&self.fields))
914    }
915
916    /// Returns the size of this instance in bytes
917    ///
918    /// Includes the size of `Self`.
919    pub fn size(&self) -> usize {
920        std::mem::size_of::<Self>()
921            + self.fields.iter().map(|x| x.size()).sum::<usize>()
922            + self.codecs.capacity() * std::mem::size_of::<Codec>()
923            + self.codecs.iter().map(Codec::size).sum::<usize>()
924    }
925}
926
927/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
928#[derive(Debug)]
929pub struct RowParser {
930    config: RowConfig,
931}
932
933impl RowParser {
934    fn new(fields: Arc<[SortField]>) -> Self {
935        Self {
936            config: RowConfig {
937                fields,
938                validate_utf8: true,
939            },
940        }
941    }
942
943    /// Creates a [`Row`] from the provided `bytes`.
944    ///
945    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
946    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
947    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
948        Row {
949            data: bytes,
950            config: &self.config,
951        }
952    }
953}
954
955/// The config of a given set of [`Row`]
956#[derive(Debug, Clone)]
957struct RowConfig {
958    /// The schema for these rows
959    fields: Arc<[SortField]>,
960    /// Whether to run UTF-8 validation when converting to arrow arrays
961    validate_utf8: bool,
962}
963
964/// A row-oriented representation of arrow data, that is normalized for comparison.
965///
966/// See the [module level documentation](self) and [`RowConverter`] for more details.
967#[derive(Debug)]
968pub struct Rows {
969    /// Underlying row bytes
970    buffer: Vec<u8>,
971    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
972    offsets: Vec<usize>,
973    /// The config for these rows
974    config: RowConfig,
975}
976
977impl Rows {
978    /// Append a [`Row`] to this [`Rows`]
979    pub fn push(&mut self, row: Row<'_>) {
980        assert!(
981            Arc::ptr_eq(&row.config.fields, &self.config.fields),
982            "row was not produced by this RowConverter"
983        );
984        self.config.validate_utf8 |= row.config.validate_utf8;
985        self.buffer.extend_from_slice(row.data);
986        self.offsets.push(self.buffer.len())
987    }
988
989    /// Returns the row at index `row`
990    pub fn row(&self, row: usize) -> Row<'_> {
991        assert!(row + 1 < self.offsets.len());
992        unsafe { self.row_unchecked(row) }
993    }
994
995    /// Returns the row at `index` without bounds checking
996    ///
997    /// # Safety
998    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
999    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1000        let end = unsafe { self.offsets.get_unchecked(index + 1) };
1001        let start = unsafe { self.offsets.get_unchecked(index) };
1002        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1003        Row {
1004            data,
1005            config: &self.config,
1006        }
1007    }
1008
1009    /// Sets the length of this [`Rows`] to 0
1010    pub fn clear(&mut self) {
1011        self.offsets.truncate(1);
1012        self.buffer.clear();
1013    }
1014
1015    /// Returns the number of [`Row`] in this [`Rows`]
1016    pub fn num_rows(&self) -> usize {
1017        self.offsets.len() - 1
1018    }
1019
1020    /// Returns an iterator over the [`Row`] in this [`Rows`]
1021    pub fn iter(&self) -> RowsIter<'_> {
1022        self.into_iter()
1023    }
1024
1025    /// Returns the size of this instance in bytes
1026    ///
1027    /// Includes the size of `Self`.
1028    pub fn size(&self) -> usize {
1029        // Size of fields is accounted for as part of RowConverter
1030        std::mem::size_of::<Self>()
1031            + self.buffer.len()
1032            + self.offsets.len() * std::mem::size_of::<usize>()
1033    }
1034
1035    /// Create a [BinaryArray] from the [Rows] data without reallocating the
1036    /// underlying bytes.
1037    ///
1038    ///
1039    /// ```
1040    /// # use std::sync::Arc;
1041    /// # use std::collections::HashSet;
1042    /// # use arrow_array::cast::AsArray;
1043    /// # use arrow_array::StringArray;
1044    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
1045    /// # use arrow_schema::DataType;
1046    /// #
1047    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1048    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1049    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1050    ///
1051    /// // We can convert rows into binary format and back.
1052    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1053    /// let binary = rows.try_into_binary().expect("known-small array");
1054    /// let parser = converter.parser();
1055    /// let parsed: Vec<OwnedRow> =
1056    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
1057    /// assert_eq!(values, parsed);
1058    /// ```
1059    ///
1060    /// # Errors
1061    ///
1062    /// This function will return an error if there is more data than can be stored in
1063    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
1064    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1065        if self.buffer.len() > i32::MAX as usize {
1066            return Err(ArrowError::InvalidArgumentError(format!(
1067                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1068                self.buffer.len()
1069            )));
1070        }
1071        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
1072        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1073        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
1074        let array = unsafe {
1075            BinaryArray::new_unchecked(
1076                OffsetBuffer::new_unchecked(offsets_scalar),
1077                Buffer::from_vec(self.buffer),
1078                None,
1079            )
1080        };
1081        Ok(array)
1082    }
1083}
1084
1085impl<'a> IntoIterator for &'a Rows {
1086    type Item = Row<'a>;
1087    type IntoIter = RowsIter<'a>;
1088
1089    fn into_iter(self) -> Self::IntoIter {
1090        RowsIter {
1091            rows: self,
1092            start: 0,
1093            end: self.num_rows(),
1094        }
1095    }
1096}
1097
1098/// An iterator over [`Rows`]
1099#[derive(Debug)]
1100pub struct RowsIter<'a> {
1101    rows: &'a Rows,
1102    start: usize,
1103    end: usize,
1104}
1105
1106impl<'a> Iterator for RowsIter<'a> {
1107    type Item = Row<'a>;
1108
1109    fn next(&mut self) -> Option<Self::Item> {
1110        if self.end == self.start {
1111            return None;
1112        }
1113
1114        // SAFETY: We have checked that `start` is less than `end`
1115        let row = unsafe { self.rows.row_unchecked(self.start) };
1116        self.start += 1;
1117        Some(row)
1118    }
1119
1120    fn size_hint(&self) -> (usize, Option<usize>) {
1121        let len = self.len();
1122        (len, Some(len))
1123    }
1124}
1125
1126impl ExactSizeIterator for RowsIter<'_> {
1127    fn len(&self) -> usize {
1128        self.end - self.start
1129    }
1130}
1131
1132impl DoubleEndedIterator for RowsIter<'_> {
1133    fn next_back(&mut self) -> Option<Self::Item> {
1134        if self.end == self.start {
1135            return None;
1136        }
1137        // Safety: We have checked that `start` is less than `end`
1138        let row = unsafe { self.rows.row_unchecked(self.end) };
1139        self.end -= 1;
1140        Some(row)
1141    }
1142}
1143
1144/// A comparable representation of a row.
1145///
1146/// See the [module level documentation](self) for more details.
1147///
1148/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1149/// returned by calls to [`RowConverter::convert_columns`] on the same
1150/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1151/// ordering established by comparing the [`Row`] is arbitrary.
1152#[derive(Debug, Copy, Clone)]
1153pub struct Row<'a> {
1154    data: &'a [u8],
1155    config: &'a RowConfig,
1156}
1157
1158impl<'a> Row<'a> {
1159    /// Create owned version of the row to detach it from the shared [`Rows`].
1160    pub fn owned(&self) -> OwnedRow {
1161        OwnedRow {
1162            data: self.data.into(),
1163            config: self.config.clone(),
1164        }
1165    }
1166
1167    /// The row's bytes, with the lifetime of the underlying data.
1168    pub fn data(&self) -> &'a [u8] {
1169        self.data
1170    }
1171}
1172
1173// Manually derive these as don't wish to include `fields`
1174
1175impl PartialEq for Row<'_> {
1176    #[inline]
1177    fn eq(&self, other: &Self) -> bool {
1178        self.data.eq(other.data)
1179    }
1180}
1181
1182impl Eq for Row<'_> {}
1183
1184impl PartialOrd for Row<'_> {
1185    #[inline]
1186    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1187        Some(self.cmp(other))
1188    }
1189}
1190
1191impl Ord for Row<'_> {
1192    #[inline]
1193    fn cmp(&self, other: &Self) -> Ordering {
1194        self.data.cmp(other.data)
1195    }
1196}
1197
1198impl Hash for Row<'_> {
1199    #[inline]
1200    fn hash<H: Hasher>(&self, state: &mut H) {
1201        self.data.hash(state)
1202    }
1203}
1204
1205impl AsRef<[u8]> for Row<'_> {
1206    #[inline]
1207    fn as_ref(&self) -> &[u8] {
1208        self.data
1209    }
1210}
1211
1212/// Owned version of a [`Row`] that can be moved/cloned freely.
1213///
1214/// This contains the data for the one specific row (not the entire buffer of all rows).
1215#[derive(Debug, Clone)]
1216pub struct OwnedRow {
1217    data: Box<[u8]>,
1218    config: RowConfig,
1219}
1220
1221impl OwnedRow {
1222    /// Get borrowed [`Row`] from owned version.
1223    ///
1224    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1225    pub fn row(&self) -> Row<'_> {
1226        Row {
1227            data: &self.data,
1228            config: &self.config,
1229        }
1230    }
1231}
1232
1233// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1234
1235impl PartialEq for OwnedRow {
1236    #[inline]
1237    fn eq(&self, other: &Self) -> bool {
1238        self.row().eq(&other.row())
1239    }
1240}
1241
1242impl Eq for OwnedRow {}
1243
1244impl PartialOrd for OwnedRow {
1245    #[inline]
1246    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1247        Some(self.cmp(other))
1248    }
1249}
1250
1251impl Ord for OwnedRow {
1252    #[inline]
1253    fn cmp(&self, other: &Self) -> Ordering {
1254        self.row().cmp(&other.row())
1255    }
1256}
1257
1258impl Hash for OwnedRow {
1259    #[inline]
1260    fn hash<H: Hasher>(&self, state: &mut H) {
1261        self.row().hash(state)
1262    }
1263}
1264
1265impl AsRef<[u8]> for OwnedRow {
1266    #[inline]
1267    fn as_ref(&self) -> &[u8] {
1268        &self.data
1269    }
1270}
1271
1272/// Returns the null sentinel, negated if `invert` is true
1273#[inline]
1274fn null_sentinel(options: SortOptions) -> u8 {
1275    match options.nulls_first {
1276        true => 0,
1277        false => 0xFF,
1278    }
1279}
1280
1281/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types.
1282enum LengthTracker {
1283    /// Fixed state: All rows have length `length`
1284    Fixed { length: usize, num_rows: usize },
1285    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
1286    Variable {
1287        fixed_length: usize,
1288        lengths: Vec<usize>,
1289    },
1290}
1291
1292impl LengthTracker {
1293    fn new(num_rows: usize) -> Self {
1294        Self::Fixed {
1295            length: 0,
1296            num_rows,
1297        }
1298    }
1299
1300    /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker
1301    fn push_fixed(&mut self, new_length: usize) {
1302        match self {
1303            LengthTracker::Fixed { length, .. } => *length += new_length,
1304            LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1305        }
1306    }
1307
1308    /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)`
1309    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1310        match self {
1311            LengthTracker::Fixed { length, .. } => {
1312                *self = LengthTracker::Variable {
1313                    fixed_length: *length,
1314                    lengths: new_lengths.collect(),
1315                }
1316            }
1317            LengthTracker::Variable { lengths, .. } => {
1318                assert_eq!(lengths.len(), new_lengths.len());
1319                lengths
1320                    .iter_mut()
1321                    .zip(new_lengths)
1322                    .for_each(|(length, new_length)| *length += new_length);
1323            }
1324        }
1325    }
1326
1327    /// Returns the tracked row lengths as a slice
1328    fn materialized(&mut self) -> &mut [usize] {
1329        if let LengthTracker::Fixed { length, num_rows } = *self {
1330            *self = LengthTracker::Variable {
1331                fixed_length: length,
1332                lengths: vec![0; num_rows],
1333            };
1334        }
1335
1336        match self {
1337            LengthTracker::Variable { lengths, .. } => lengths,
1338            LengthTracker::Fixed { .. } => unreachable!(),
1339        }
1340    }
1341
1342    /// Initializes the offsets using the tracked lengths. Returns the sum of the
1343    /// lengths of the rows added.
1344    ///
1345    /// We initialize the offsets shifted down by one row index.
1346    ///
1347    /// As the rows are appended to the offsets will be incremented to match
1348    ///
1349    /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
1350    /// The offsets would be initialized to `0, 0, 3, 7`
1351    ///
1352    /// Writing the first row entirely would yield `0, 3, 3, 7`
1353    /// The second, `0, 3, 7, 7`
1354    /// The third, `0, 3, 7, 13`
1355    //
1356    /// This would be the final offsets for reading
1357    //
1358    /// In this way offsets tracks the position during writing whilst eventually serving
1359    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1360        match self {
1361            LengthTracker::Fixed { length, num_rows } => {
1362                offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1363
1364                initial_offset + num_rows * length
1365            }
1366            LengthTracker::Variable {
1367                fixed_length,
1368                lengths,
1369            } => {
1370                let mut acc = initial_offset;
1371
1372                offsets.extend(lengths.iter().map(|length| {
1373                    let current = acc;
1374                    acc += length + fixed_length;
1375                    current
1376                }));
1377
1378                acc
1379            }
1380        }
1381    }
1382}
1383
1384/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1385fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1386    use fixed::FixedLengthEncoding;
1387
1388    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1389    let mut tracker = LengthTracker::new(num_rows);
1390
1391    for (array, encoder) in cols.iter().zip(encoders) {
1392        match encoder {
1393            Encoder::Stateless => {
1394                downcast_primitive_array! {
1395                    array => tracker.push_fixed(fixed::encoded_len(array)),
1396                    DataType::Null => {},
1397                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1398                    DataType::Binary => tracker.push_variable(
1399                        as_generic_binary_array::<i32>(array)
1400                            .iter()
1401                            .map(|slice| variable::encoded_len(slice))
1402                    ),
1403                    DataType::LargeBinary => tracker.push_variable(
1404                        as_generic_binary_array::<i64>(array)
1405                            .iter()
1406                            .map(|slice| variable::encoded_len(slice))
1407                    ),
1408                    DataType::BinaryView => tracker.push_variable(
1409                        array.as_binary_view()
1410                            .iter()
1411                            .map(|slice| variable::encoded_len(slice))
1412                    ),
1413                    DataType::Utf8 => tracker.push_variable(
1414                        array.as_string::<i32>()
1415                            .iter()
1416                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1417                    ),
1418                    DataType::LargeUtf8 => tracker.push_variable(
1419                        array.as_string::<i64>()
1420                            .iter()
1421                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1422                    ),
1423                    DataType::Utf8View => tracker.push_variable(
1424                        array.as_string_view()
1425                            .iter()
1426                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1427                    ),
1428                    DataType::FixedSizeBinary(len) => {
1429                        let len = len.to_usize().unwrap();
1430                        tracker.push_fixed(1 + len)
1431                    }
1432                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1433                }
1434            }
1435            Encoder::Dictionary(values, null) => {
1436                downcast_dictionary_array! {
1437                    array => {
1438                        tracker.push_variable(
1439                            array.keys().iter().map(|v| match v {
1440                                Some(k) => values.row(k.as_usize()).data.len(),
1441                                None => null.data.len(),
1442                            })
1443                        )
1444                    }
1445                    _ => unreachable!(),
1446                }
1447            }
1448            Encoder::Struct(rows, null) => {
1449                let array = as_struct_array(array);
1450                tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1451                    true => 1 + rows.row(idx).as_ref().len(),
1452                    false => 1 + null.data.len(),
1453                }));
1454            }
1455            Encoder::List(rows) => match array.data_type() {
1456                DataType::List(_) => {
1457                    list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1458                }
1459                DataType::LargeList(_) => {
1460                    list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1461                }
1462                DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1463                    &mut tracker,
1464                    rows,
1465                    as_fixed_size_list_array(array),
1466                ),
1467                _ => unreachable!(),
1468            },
1469            Encoder::RunEndEncoded(rows) => match array.data_type() {
1470                DataType::RunEndEncoded(r, _) => match r.data_type() {
1471                    DataType::Int16 => run::compute_lengths(
1472                        tracker.materialized(),
1473                        rows,
1474                        array.as_run::<Int16Type>(),
1475                    ),
1476                    DataType::Int32 => run::compute_lengths(
1477                        tracker.materialized(),
1478                        rows,
1479                        array.as_run::<Int32Type>(),
1480                    ),
1481                    DataType::Int64 => run::compute_lengths(
1482                        tracker.materialized(),
1483                        rows,
1484                        array.as_run::<Int64Type>(),
1485                    ),
1486                    _ => unreachable!("Unsupported run end index type: {r:?}"),
1487                },
1488                _ => unreachable!(),
1489            },
1490        }
1491    }
1492
1493    tracker
1494}
1495
1496/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1497fn encode_column(
1498    data: &mut [u8],
1499    offsets: &mut [usize],
1500    column: &dyn Array,
1501    opts: SortOptions,
1502    encoder: &Encoder<'_>,
1503) {
1504    match encoder {
1505        Encoder::Stateless => {
1506            downcast_primitive_array! {
1507                column => {
1508                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1509                        fixed::encode(data, offsets, column.values(), nulls, opts)
1510                    } else {
1511                        fixed::encode_not_null(data, offsets, column.values(), opts)
1512                    }
1513                }
1514                DataType::Null => {}
1515                DataType::Boolean => {
1516                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1517                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1518                    } else {
1519                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1520                    }
1521                }
1522                DataType::Binary => {
1523                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1524                }
1525                DataType::BinaryView => {
1526                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1527                }
1528                DataType::LargeBinary => {
1529                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1530                }
1531                DataType::Utf8 => variable::encode(
1532                    data, offsets,
1533                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1534                    opts,
1535                ),
1536                DataType::LargeUtf8 => variable::encode(
1537                    data, offsets,
1538                    column.as_string::<i64>()
1539                        .iter()
1540                        .map(|x| x.map(|x| x.as_bytes())),
1541                    opts,
1542                ),
1543                DataType::Utf8View => variable::encode(
1544                    data, offsets,
1545                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1546                    opts,
1547                ),
1548                DataType::FixedSizeBinary(_) => {
1549                    let array = column.as_any().downcast_ref().unwrap();
1550                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1551                }
1552                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1553            }
1554        }
1555        Encoder::Dictionary(values, nulls) => {
1556            downcast_dictionary_array! {
1557                column => encode_dictionary_values(data, offsets, column, values, nulls),
1558                _ => unreachable!()
1559            }
1560        }
1561        Encoder::Struct(rows, null) => {
1562            let array = as_struct_array(column);
1563            let null_sentinel = null_sentinel(opts);
1564            offsets
1565                .iter_mut()
1566                .skip(1)
1567                .enumerate()
1568                .for_each(|(idx, offset)| {
1569                    let (row, sentinel) = match array.is_valid(idx) {
1570                        true => (rows.row(idx), 0x01),
1571                        false => (*null, null_sentinel),
1572                    };
1573                    let end_offset = *offset + 1 + row.as_ref().len();
1574                    data[*offset] = sentinel;
1575                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1576                    *offset = end_offset;
1577                })
1578        }
1579        Encoder::List(rows) => match column.data_type() {
1580            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1581            DataType::LargeList(_) => {
1582                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1583            }
1584            DataType::FixedSizeList(_, _) => {
1585                encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1586            }
1587            _ => unreachable!(),
1588        },
1589        Encoder::RunEndEncoded(rows) => match column.data_type() {
1590            DataType::RunEndEncoded(r, _) => match r.data_type() {
1591                DataType::Int16 => {
1592                    run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1593                }
1594                DataType::Int32 => {
1595                    run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1596                }
1597                DataType::Int64 => {
1598                    run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1599                }
1600                _ => unreachable!("Unsupported run end index type: {r:?}"),
1601            },
1602            _ => unreachable!(),
1603        },
1604    }
1605}
1606
1607/// Encode dictionary values not preserving the dictionary encoding
1608pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1609    data: &mut [u8],
1610    offsets: &mut [usize],
1611    column: &DictionaryArray<K>,
1612    values: &Rows,
1613    null: &Row<'_>,
1614) {
1615    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1616        let row = match k {
1617            Some(k) => values.row(k.as_usize()).data,
1618            None => null.data,
1619        };
1620        let end_offset = *offset + row.len();
1621        data[*offset..end_offset].copy_from_slice(row);
1622        *offset = end_offset;
1623    }
1624}
1625
1626macro_rules! decode_primitive_helper {
1627    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1628        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1629    };
1630}
1631
1632/// Decodes a the provided `field` from `rows`
1633///
1634/// # Safety
1635///
1636/// Rows must contain valid data for the provided field
1637unsafe fn decode_column(
1638    field: &SortField,
1639    rows: &mut [&[u8]],
1640    codec: &Codec,
1641    validate_utf8: bool,
1642) -> Result<ArrayRef, ArrowError> {
1643    let options = field.options;
1644
1645    let array: ArrayRef = match codec {
1646        Codec::Stateless => {
1647            let data_type = field.data_type.clone();
1648            downcast_primitive! {
1649                data_type => (decode_primitive_helper, rows, data_type, options),
1650                DataType::Null => Arc::new(NullArray::new(rows.len())),
1651                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1652                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1653                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1654                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1655                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1656                DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1657                DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1658                DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1659                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1660            }
1661        }
1662        Codec::Dictionary(converter, _) => {
1663            let cols = converter.convert_raw(rows, validate_utf8)?;
1664            cols.into_iter().next().unwrap()
1665        }
1666        Codec::Struct(converter, _) => {
1667            let (null_count, nulls) = fixed::decode_nulls(rows);
1668            rows.iter_mut().for_each(|row| *row = &row[1..]);
1669            let children = converter.convert_raw(rows, validate_utf8)?;
1670
1671            let child_data = children.iter().map(|c| c.to_data()).collect();
1672            let builder = ArrayDataBuilder::new(field.data_type.clone())
1673                .len(rows.len())
1674                .null_count(null_count)
1675                .null_bit_buffer(Some(nulls))
1676                .child_data(child_data);
1677
1678            Arc::new(StructArray::from(builder.build_unchecked()))
1679        }
1680        Codec::List(converter) => match &field.data_type {
1681            DataType::List(_) => {
1682                Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1683            }
1684            DataType::LargeList(_) => {
1685                Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1686            }
1687            DataType::FixedSizeList(_, value_length) => Arc::new(list::decode_fixed_size_list(
1688                converter,
1689                rows,
1690                field,
1691                validate_utf8,
1692                value_length.as_usize(),
1693            )?),
1694            _ => unreachable!(),
1695        },
1696        Codec::RunEndEncoded(converter) => match &field.data_type {
1697            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1698                DataType::Int16 => Arc::new(run::decode::<Int16Type>(
1699                    converter,
1700                    rows,
1701                    field,
1702                    validate_utf8,
1703                )?),
1704                DataType::Int32 => Arc::new(run::decode::<Int32Type>(
1705                    converter,
1706                    rows,
1707                    field,
1708                    validate_utf8,
1709                )?),
1710                DataType::Int64 => Arc::new(run::decode::<Int64Type>(
1711                    converter,
1712                    rows,
1713                    field,
1714                    validate_utf8,
1715                )?),
1716                _ => unreachable!(),
1717            },
1718            _ => unreachable!(),
1719        },
1720    };
1721    Ok(array)
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726    use rand::distr::uniform::SampleUniform;
1727    use rand::distr::{Distribution, StandardUniform};
1728    use rand::{rng, Rng};
1729
1730    use arrow_array::builder::*;
1731    use arrow_array::types::*;
1732    use arrow_array::*;
1733    use arrow_buffer::{i256, NullBuffer};
1734    use arrow_buffer::{Buffer, OffsetBuffer};
1735    use arrow_cast::display::{ArrayFormatter, FormatOptions};
1736    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1737
1738    use super::*;
1739
1740    #[test]
1741    fn test_fixed_width() {
1742        let cols = [
1743            Arc::new(Int16Array::from_iter([
1744                Some(1),
1745                Some(2),
1746                None,
1747                Some(-5),
1748                Some(2),
1749                Some(2),
1750                Some(0),
1751            ])) as ArrayRef,
1752            Arc::new(Float32Array::from_iter([
1753                Some(1.3),
1754                Some(2.5),
1755                None,
1756                Some(4.),
1757                Some(0.1),
1758                Some(-4.),
1759                Some(-0.),
1760            ])) as ArrayRef,
1761        ];
1762
1763        let converter = RowConverter::new(vec![
1764            SortField::new(DataType::Int16),
1765            SortField::new(DataType::Float32),
1766        ])
1767        .unwrap();
1768        let rows = converter.convert_columns(&cols).unwrap();
1769
1770        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1771        assert_eq!(
1772            rows.buffer,
1773            &[
1774                1, 128, 1, //
1775                1, 191, 166, 102, 102, //
1776                1, 128, 2, //
1777                1, 192, 32, 0, 0, //
1778                0, 0, 0, //
1779                0, 0, 0, 0, 0, //
1780                1, 127, 251, //
1781                1, 192, 128, 0, 0, //
1782                1, 128, 2, //
1783                1, 189, 204, 204, 205, //
1784                1, 128, 2, //
1785                1, 63, 127, 255, 255, //
1786                1, 128, 0, //
1787                1, 127, 255, 255, 255 //
1788            ]
1789        );
1790
1791        assert!(rows.row(3) < rows.row(6));
1792        assert!(rows.row(0) < rows.row(1));
1793        assert!(rows.row(3) < rows.row(0));
1794        assert!(rows.row(4) < rows.row(1));
1795        assert!(rows.row(5) < rows.row(4));
1796
1797        let back = converter.convert_rows(&rows).unwrap();
1798        for (expected, actual) in cols.iter().zip(&back) {
1799            assert_eq!(expected, actual);
1800        }
1801    }
1802
1803    #[test]
1804    fn test_decimal128() {
1805        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1806            DECIMAL128_MAX_PRECISION,
1807            7,
1808        ))])
1809        .unwrap();
1810        let col = Arc::new(
1811            Decimal128Array::from_iter([
1812                None,
1813                Some(i128::MIN),
1814                Some(-13),
1815                Some(46_i128),
1816                Some(5456_i128),
1817                Some(i128::MAX),
1818            ])
1819            .with_precision_and_scale(38, 7)
1820            .unwrap(),
1821        ) as ArrayRef;
1822
1823        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1824        for i in 0..rows.num_rows() - 1 {
1825            assert!(rows.row(i) < rows.row(i + 1));
1826        }
1827
1828        let back = converter.convert_rows(&rows).unwrap();
1829        assert_eq!(back.len(), 1);
1830        assert_eq!(col.as_ref(), back[0].as_ref())
1831    }
1832
1833    #[test]
1834    fn test_decimal256() {
1835        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1836            DECIMAL256_MAX_PRECISION,
1837            7,
1838        ))])
1839        .unwrap();
1840        let col = Arc::new(
1841            Decimal256Array::from_iter([
1842                None,
1843                Some(i256::MIN),
1844                Some(i256::from_parts(0, -1)),
1845                Some(i256::from_parts(u128::MAX, -1)),
1846                Some(i256::from_parts(u128::MAX, 0)),
1847                Some(i256::from_parts(0, 46_i128)),
1848                Some(i256::from_parts(5, 46_i128)),
1849                Some(i256::MAX),
1850            ])
1851            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1852            .unwrap(),
1853        ) as ArrayRef;
1854
1855        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1856        for i in 0..rows.num_rows() - 1 {
1857            assert!(rows.row(i) < rows.row(i + 1));
1858        }
1859
1860        let back = converter.convert_rows(&rows).unwrap();
1861        assert_eq!(back.len(), 1);
1862        assert_eq!(col.as_ref(), back[0].as_ref())
1863    }
1864
1865    #[test]
1866    fn test_bool() {
1867        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1868
1869        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1870
1871        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1872        assert!(rows.row(2) > rows.row(1));
1873        assert!(rows.row(2) > rows.row(0));
1874        assert!(rows.row(1) > rows.row(0));
1875
1876        let cols = converter.convert_rows(&rows).unwrap();
1877        assert_eq!(&cols[0], &col);
1878
1879        let converter = RowConverter::new(vec![SortField::new_with_options(
1880            DataType::Boolean,
1881            SortOptions::default().desc().with_nulls_first(false),
1882        )])
1883        .unwrap();
1884
1885        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1886        assert!(rows.row(2) < rows.row(1));
1887        assert!(rows.row(2) < rows.row(0));
1888        assert!(rows.row(1) < rows.row(0));
1889        let cols = converter.convert_rows(&rows).unwrap();
1890        assert_eq!(&cols[0], &col);
1891    }
1892
1893    #[test]
1894    fn test_timezone() {
1895        let a =
1896            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1897        let d = a.data_type().clone();
1898
1899        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1900        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1901        let back = converter.convert_rows(&rows).unwrap();
1902        assert_eq!(back.len(), 1);
1903        assert_eq!(back[0].data_type(), &d);
1904
1905        // Test dictionary
1906        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1907        a.append(34).unwrap();
1908        a.append_null();
1909        a.append(345).unwrap();
1910
1911        // Construct dictionary with a timezone
1912        let dict = a.finish();
1913        let values = TimestampNanosecondArray::from(dict.values().to_data());
1914        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1915        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1916        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1917
1918        assert_eq!(dict_with_tz.data_type(), &d);
1919        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1920        let rows = converter
1921            .convert_columns(&[Arc::new(dict_with_tz) as _])
1922            .unwrap();
1923        let back = converter.convert_rows(&rows).unwrap();
1924        assert_eq!(back.len(), 1);
1925        assert_eq!(back[0].data_type(), &v);
1926    }
1927
1928    #[test]
1929    fn test_null_encoding() {
1930        let col = Arc::new(NullArray::new(10));
1931        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1932        let rows = converter.convert_columns(&[col]).unwrap();
1933        assert_eq!(rows.num_rows(), 10);
1934        assert_eq!(rows.row(1).data.len(), 0);
1935    }
1936
1937    #[test]
1938    fn test_variable_width() {
1939        let col = Arc::new(StringArray::from_iter([
1940            Some("hello"),
1941            Some("he"),
1942            None,
1943            Some("foo"),
1944            Some(""),
1945        ])) as ArrayRef;
1946
1947        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1948        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1949
1950        assert!(rows.row(1) < rows.row(0));
1951        assert!(rows.row(2) < rows.row(4));
1952        assert!(rows.row(3) < rows.row(0));
1953        assert!(rows.row(3) < rows.row(1));
1954
1955        let cols = converter.convert_rows(&rows).unwrap();
1956        assert_eq!(&cols[0], &col);
1957
1958        let col = Arc::new(BinaryArray::from_iter([
1959            None,
1960            Some(vec![0_u8; 0]),
1961            Some(vec![0_u8; 6]),
1962            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1963            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1964            Some(vec![0_u8; variable::BLOCK_SIZE]),
1965            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1966            Some(vec![1_u8; 6]),
1967            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1968            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1969            Some(vec![1_u8; variable::BLOCK_SIZE]),
1970            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1971            Some(vec![0xFF_u8; 6]),
1972            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1973            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1974            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1975            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1976        ])) as ArrayRef;
1977
1978        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1979        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1980
1981        for i in 0..rows.num_rows() {
1982            for j in i + 1..rows.num_rows() {
1983                assert!(
1984                    rows.row(i) < rows.row(j),
1985                    "{} < {} - {:?} < {:?}",
1986                    i,
1987                    j,
1988                    rows.row(i),
1989                    rows.row(j)
1990                );
1991            }
1992        }
1993
1994        let cols = converter.convert_rows(&rows).unwrap();
1995        assert_eq!(&cols[0], &col);
1996
1997        let converter = RowConverter::new(vec![SortField::new_with_options(
1998            DataType::Binary,
1999            SortOptions::default().desc().with_nulls_first(false),
2000        )])
2001        .unwrap();
2002        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2003
2004        for i in 0..rows.num_rows() {
2005            for j in i + 1..rows.num_rows() {
2006                assert!(
2007                    rows.row(i) > rows.row(j),
2008                    "{} > {} - {:?} > {:?}",
2009                    i,
2010                    j,
2011                    rows.row(i),
2012                    rows.row(j)
2013                );
2014            }
2015        }
2016
2017        let cols = converter.convert_rows(&rows).unwrap();
2018        assert_eq!(&cols[0], &col);
2019    }
2020
2021    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
2022    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2023        match b.data_type() {
2024            DataType::Dictionary(_, v) => {
2025                assert_eq!(a.data_type(), v.as_ref());
2026                let b = arrow_cast::cast(b, v).unwrap();
2027                assert_eq!(a, b.as_ref())
2028            }
2029            _ => assert_eq!(a, b),
2030        }
2031    }
2032
2033    #[test]
2034    fn test_string_dictionary() {
2035        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2036            Some("foo"),
2037            Some("hello"),
2038            Some("he"),
2039            None,
2040            Some("hello"),
2041            Some(""),
2042            Some("hello"),
2043            Some("hello"),
2044        ])) as ArrayRef;
2045
2046        let field = SortField::new(a.data_type().clone());
2047        let converter = RowConverter::new(vec![field]).unwrap();
2048        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2049
2050        assert!(rows_a.row(3) < rows_a.row(5));
2051        assert!(rows_a.row(2) < rows_a.row(1));
2052        assert!(rows_a.row(0) < rows_a.row(1));
2053        assert!(rows_a.row(3) < rows_a.row(0));
2054
2055        assert_eq!(rows_a.row(1), rows_a.row(4));
2056        assert_eq!(rows_a.row(1), rows_a.row(6));
2057        assert_eq!(rows_a.row(1), rows_a.row(7));
2058
2059        let cols = converter.convert_rows(&rows_a).unwrap();
2060        dictionary_eq(&cols[0], &a);
2061
2062        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2063            Some("hello"),
2064            None,
2065            Some("cupcakes"),
2066        ])) as ArrayRef;
2067
2068        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2069        assert_eq!(rows_a.row(1), rows_b.row(0));
2070        assert_eq!(rows_a.row(3), rows_b.row(1));
2071        assert!(rows_b.row(2) < rows_a.row(0));
2072
2073        let cols = converter.convert_rows(&rows_b).unwrap();
2074        dictionary_eq(&cols[0], &b);
2075
2076        let converter = RowConverter::new(vec![SortField::new_with_options(
2077            a.data_type().clone(),
2078            SortOptions::default().desc().with_nulls_first(false),
2079        )])
2080        .unwrap();
2081
2082        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2083        assert!(rows_c.row(3) > rows_c.row(5));
2084        assert!(rows_c.row(2) > rows_c.row(1));
2085        assert!(rows_c.row(0) > rows_c.row(1));
2086        assert!(rows_c.row(3) > rows_c.row(0));
2087
2088        let cols = converter.convert_rows(&rows_c).unwrap();
2089        dictionary_eq(&cols[0], &a);
2090
2091        let converter = RowConverter::new(vec![SortField::new_with_options(
2092            a.data_type().clone(),
2093            SortOptions::default().desc().with_nulls_first(true),
2094        )])
2095        .unwrap();
2096
2097        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2098        assert!(rows_c.row(3) < rows_c.row(5));
2099        assert!(rows_c.row(2) > rows_c.row(1));
2100        assert!(rows_c.row(0) > rows_c.row(1));
2101        assert!(rows_c.row(3) < rows_c.row(0));
2102
2103        let cols = converter.convert_rows(&rows_c).unwrap();
2104        dictionary_eq(&cols[0], &a);
2105    }
2106
2107    #[test]
2108    fn test_struct() {
2109        // Test basic
2110        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2111        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2112        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2113        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2114        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2115
2116        let sort_fields = vec![SortField::new(s1.data_type().clone())];
2117        let converter = RowConverter::new(sort_fields).unwrap();
2118        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2119
2120        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2121            assert!(a < b);
2122        }
2123
2124        let back = converter.convert_rows(&r1).unwrap();
2125        assert_eq!(back.len(), 1);
2126        assert_eq!(&back[0], &s1);
2127
2128        // Test struct nullability
2129        let data = s1
2130            .to_data()
2131            .into_builder()
2132            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2133            .null_count(2)
2134            .build()
2135            .unwrap();
2136
2137        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2138        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2139        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
2140        assert!(r2.row(0) < r2.row(1)); // Nulls first
2141        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
2142        assert_eq!(r1.row(1), r2.row(1)); // Values equal
2143
2144        let back = converter.convert_rows(&r2).unwrap();
2145        assert_eq!(back.len(), 1);
2146        assert_eq!(&back[0], &s2);
2147
2148        back[0].to_data().validate_full().unwrap();
2149    }
2150
2151    #[test]
2152    fn test_primitive_dictionary() {
2153        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2154        builder.append(2).unwrap();
2155        builder.append(3).unwrap();
2156        builder.append(0).unwrap();
2157        builder.append_null();
2158        builder.append(5).unwrap();
2159        builder.append(3).unwrap();
2160        builder.append(-1).unwrap();
2161
2162        let a = builder.finish();
2163        let data_type = a.data_type().clone();
2164        let columns = [Arc::new(a) as ArrayRef];
2165
2166        let field = SortField::new(data_type.clone());
2167        let converter = RowConverter::new(vec![field]).unwrap();
2168        let rows = converter.convert_columns(&columns).unwrap();
2169        assert!(rows.row(0) < rows.row(1));
2170        assert!(rows.row(2) < rows.row(0));
2171        assert!(rows.row(3) < rows.row(2));
2172        assert!(rows.row(6) < rows.row(2));
2173        assert!(rows.row(3) < rows.row(6));
2174    }
2175
2176    #[test]
2177    fn test_dictionary_nulls() {
2178        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2179        let keys =
2180            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2181
2182        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2183        let data = keys
2184            .into_builder()
2185            .data_type(data_type.clone())
2186            .child_data(vec![values])
2187            .build()
2188            .unwrap();
2189
2190        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2191        let field = SortField::new(data_type.clone());
2192        let converter = RowConverter::new(vec![field]).unwrap();
2193        let rows = converter.convert_columns(&columns).unwrap();
2194
2195        assert_eq!(rows.row(0), rows.row(1));
2196        assert_eq!(rows.row(3), rows.row(4));
2197        assert_eq!(rows.row(4), rows.row(5));
2198        assert!(rows.row(3) < rows.row(0));
2199    }
2200
2201    #[test]
2202    #[should_panic(expected = "Encountered non UTF-8 data")]
2203    fn test_invalid_utf8() {
2204        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2205        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2206        let rows = converter.convert_columns(&[array]).unwrap();
2207        let binary_row = rows.row(0);
2208
2209        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2210        let parser = converter.parser();
2211        let utf8_row = parser.parse(binary_row.as_ref());
2212
2213        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2214    }
2215
2216    #[test]
2217    #[should_panic(expected = "Encountered non UTF-8 data")]
2218    fn test_invalid_utf8_array() {
2219        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2220        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2221        let rows = converter.convert_columns(&[array]).unwrap();
2222        let binary_rows = rows.try_into_binary().expect("known-small rows");
2223
2224        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2225        let parsed = converter.from_binary(binary_rows);
2226
2227        converter.convert_rows(parsed.iter()).unwrap();
2228    }
2229
2230    #[test]
2231    #[should_panic(expected = "index out of bounds")]
2232    fn test_invalid_empty() {
2233        let binary_row: &[u8] = &[];
2234
2235        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2236        let parser = converter.parser();
2237        let utf8_row = parser.parse(binary_row.as_ref());
2238
2239        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2240    }
2241
2242    #[test]
2243    #[should_panic(expected = "index out of bounds")]
2244    fn test_invalid_empty_array() {
2245        let row: &[u8] = &[];
2246        let binary_rows = BinaryArray::from(vec![row]);
2247
2248        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2249        let parsed = converter.from_binary(binary_rows);
2250
2251        converter.convert_rows(parsed.iter()).unwrap();
2252    }
2253
2254    #[test]
2255    #[should_panic(expected = "index out of bounds")]
2256    fn test_invalid_truncated() {
2257        let binary_row: &[u8] = &[0x02];
2258
2259        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2260        let parser = converter.parser();
2261        let utf8_row = parser.parse(binary_row.as_ref());
2262
2263        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2264    }
2265
2266    #[test]
2267    #[should_panic(expected = "index out of bounds")]
2268    fn test_invalid_truncated_array() {
2269        let row: &[u8] = &[0x02];
2270        let binary_rows = BinaryArray::from(vec![row]);
2271
2272        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2273        let parsed = converter.from_binary(binary_rows);
2274
2275        converter.convert_rows(parsed.iter()).unwrap();
2276    }
2277
2278    #[test]
2279    #[should_panic(expected = "rows were not produced by this RowConverter")]
2280    fn test_different_converter() {
2281        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2282        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2283        let rows = converter.convert_columns(&[values]).unwrap();
2284
2285        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2286        let _ = converter.convert_rows(&rows);
2287    }
2288
2289    fn test_single_list<O: OffsetSizeTrait>() {
2290        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2291        builder.values().append_value(32);
2292        builder.values().append_value(52);
2293        builder.values().append_value(32);
2294        builder.append(true);
2295        builder.values().append_value(32);
2296        builder.values().append_value(52);
2297        builder.values().append_value(12);
2298        builder.append(true);
2299        builder.values().append_value(32);
2300        builder.values().append_value(52);
2301        builder.append(true);
2302        builder.values().append_value(32); // MASKED
2303        builder.values().append_value(52); // MASKED
2304        builder.append(false);
2305        builder.values().append_value(32);
2306        builder.values().append_null();
2307        builder.append(true);
2308        builder.append(true);
2309        builder.values().append_value(17); // MASKED
2310        builder.values().append_null(); // MASKED
2311        builder.append(false);
2312
2313        let list = Arc::new(builder.finish()) as ArrayRef;
2314        let d = list.data_type().clone();
2315
2316        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2317
2318        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2319        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2320        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2321        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2322        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2323        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2324        assert!(rows.row(3) < rows.row(5)); // null < []
2325        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2326
2327        let back = converter.convert_rows(&rows).unwrap();
2328        assert_eq!(back.len(), 1);
2329        back[0].to_data().validate_full().unwrap();
2330        assert_eq!(&back[0], &list);
2331
2332        let options = SortOptions::default().asc().with_nulls_first(false);
2333        let field = SortField::new_with_options(d.clone(), options);
2334        let converter = RowConverter::new(vec![field]).unwrap();
2335        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2336
2337        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2338        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2339        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2340        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2341        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2342        assert!(rows.row(3) > rows.row(5)); // null > []
2343        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2344
2345        let back = converter.convert_rows(&rows).unwrap();
2346        assert_eq!(back.len(), 1);
2347        back[0].to_data().validate_full().unwrap();
2348        assert_eq!(&back[0], &list);
2349
2350        let options = SortOptions::default().desc().with_nulls_first(false);
2351        let field = SortField::new_with_options(d.clone(), options);
2352        let converter = RowConverter::new(vec![field]).unwrap();
2353        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2354
2355        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2356        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2357        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2358        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2359        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2360        assert!(rows.row(3) > rows.row(5)); // null > []
2361        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2362
2363        let back = converter.convert_rows(&rows).unwrap();
2364        assert_eq!(back.len(), 1);
2365        back[0].to_data().validate_full().unwrap();
2366        assert_eq!(&back[0], &list);
2367
2368        let options = SortOptions::default().desc().with_nulls_first(true);
2369        let field = SortField::new_with_options(d, options);
2370        let converter = RowConverter::new(vec![field]).unwrap();
2371        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2372
2373        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2374        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2375        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2376        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2377        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2378        assert!(rows.row(3) < rows.row(5)); // null < []
2379        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2380
2381        let back = converter.convert_rows(&rows).unwrap();
2382        assert_eq!(back.len(), 1);
2383        back[0].to_data().validate_full().unwrap();
2384        assert_eq!(&back[0], &list);
2385
2386        let sliced_list = list.slice(1, 5);
2387        let rows_on_sliced_list = converter
2388            .convert_columns(&[Arc::clone(&sliced_list)])
2389            .unwrap();
2390
2391        assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); // [32, 52] > [32, 52, 12]
2392        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52]
2393        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null] < [32, 52]
2394        assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); // [] > [32, 52]
2395        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < []
2396
2397        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2398        assert_eq!(back.len(), 1);
2399        back[0].to_data().validate_full().unwrap();
2400        assert_eq!(&back[0], &sliced_list);
2401    }
2402
2403    fn test_nested_list<O: OffsetSizeTrait>() {
2404        let mut builder =
2405            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2406
2407        builder.values().values().append_value(1);
2408        builder.values().values().append_value(2);
2409        builder.values().append(true);
2410        builder.values().values().append_value(1);
2411        builder.values().values().append_null();
2412        builder.values().append(true);
2413        builder.append(true);
2414
2415        builder.values().values().append_value(1);
2416        builder.values().values().append_null();
2417        builder.values().append(true);
2418        builder.values().values().append_value(1);
2419        builder.values().values().append_null();
2420        builder.values().append(true);
2421        builder.append(true);
2422
2423        builder.values().values().append_value(1);
2424        builder.values().values().append_null();
2425        builder.values().append(true);
2426        builder.values().append(false);
2427        builder.append(true);
2428        builder.append(false);
2429
2430        builder.values().values().append_value(1);
2431        builder.values().values().append_value(2);
2432        builder.values().append(true);
2433        builder.append(true);
2434
2435        let list = Arc::new(builder.finish()) as ArrayRef;
2436        let d = list.data_type().clone();
2437
2438        // [
2439        //   [[1, 2], [1, null]],
2440        //   [[1, null], [1, null]],
2441        //   [[1, null], null]
2442        //   null
2443        //   [[1, 2]]
2444        // ]
2445        let options = SortOptions::default().asc().with_nulls_first(true);
2446        let field = SortField::new_with_options(d.clone(), options);
2447        let converter = RowConverter::new(vec![field]).unwrap();
2448        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2449
2450        assert!(rows.row(0) > rows.row(1));
2451        assert!(rows.row(1) > rows.row(2));
2452        assert!(rows.row(2) > rows.row(3));
2453        assert!(rows.row(4) < rows.row(0));
2454        assert!(rows.row(4) > rows.row(1));
2455
2456        let back = converter.convert_rows(&rows).unwrap();
2457        assert_eq!(back.len(), 1);
2458        back[0].to_data().validate_full().unwrap();
2459        assert_eq!(&back[0], &list);
2460
2461        let options = SortOptions::default().desc().with_nulls_first(true);
2462        let field = SortField::new_with_options(d.clone(), options);
2463        let converter = RowConverter::new(vec![field]).unwrap();
2464        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2465
2466        assert!(rows.row(0) > rows.row(1));
2467        assert!(rows.row(1) > rows.row(2));
2468        assert!(rows.row(2) > rows.row(3));
2469        assert!(rows.row(4) > rows.row(0));
2470        assert!(rows.row(4) > rows.row(1));
2471
2472        let back = converter.convert_rows(&rows).unwrap();
2473        assert_eq!(back.len(), 1);
2474        back[0].to_data().validate_full().unwrap();
2475        assert_eq!(&back[0], &list);
2476
2477        let options = SortOptions::default().desc().with_nulls_first(false);
2478        let field = SortField::new_with_options(d, options);
2479        let converter = RowConverter::new(vec![field]).unwrap();
2480        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2481
2482        assert!(rows.row(0) < rows.row(1));
2483        assert!(rows.row(1) < rows.row(2));
2484        assert!(rows.row(2) < rows.row(3));
2485        assert!(rows.row(4) > rows.row(0));
2486        assert!(rows.row(4) < rows.row(1));
2487
2488        let back = converter.convert_rows(&rows).unwrap();
2489        assert_eq!(back.len(), 1);
2490        back[0].to_data().validate_full().unwrap();
2491        assert_eq!(&back[0], &list);
2492
2493        let sliced_list = list.slice(1, 3);
2494        let rows = converter
2495            .convert_columns(&[Arc::clone(&sliced_list)])
2496            .unwrap();
2497
2498        assert!(rows.row(0) < rows.row(1));
2499        assert!(rows.row(1) < rows.row(2));
2500
2501        let back = converter.convert_rows(&rows).unwrap();
2502        assert_eq!(back.len(), 1);
2503        back[0].to_data().validate_full().unwrap();
2504        assert_eq!(&back[0], &sliced_list);
2505    }
2506
2507    #[test]
2508    fn test_list() {
2509        test_single_list::<i32>();
2510        test_nested_list::<i32>();
2511    }
2512
2513    #[test]
2514    fn test_large_list() {
2515        test_single_list::<i64>();
2516        test_nested_list::<i64>();
2517    }
2518
2519    #[test]
2520    fn test_fixed_size_list() {
2521        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
2522        builder.values().append_value(32);
2523        builder.values().append_value(52);
2524        builder.values().append_value(32);
2525        builder.append(true);
2526        builder.values().append_value(32);
2527        builder.values().append_value(52);
2528        builder.values().append_value(12);
2529        builder.append(true);
2530        builder.values().append_value(32);
2531        builder.values().append_value(52);
2532        builder.values().append_null();
2533        builder.append(true);
2534        builder.values().append_value(32); // MASKED
2535        builder.values().append_value(52); // MASKED
2536        builder.values().append_value(13); // MASKED
2537        builder.append(false);
2538        builder.values().append_value(32);
2539        builder.values().append_null();
2540        builder.values().append_null();
2541        builder.append(true);
2542        builder.values().append_null();
2543        builder.values().append_null();
2544        builder.values().append_null();
2545        builder.append(true);
2546        builder.values().append_value(17); // MASKED
2547        builder.values().append_null(); // MASKED
2548        builder.values().append_value(77); // MASKED
2549        builder.append(false);
2550
2551        let list = Arc::new(builder.finish()) as ArrayRef;
2552        let d = list.data_type().clone();
2553
2554        // Default sorting (ascending, nulls first)
2555        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2556
2557        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2558        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2559        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12]
2560        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
2561        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
2562        assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null]
2563        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
2564        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2565
2566        let back = converter.convert_rows(&rows).unwrap();
2567        assert_eq!(back.len(), 1);
2568        back[0].to_data().validate_full().unwrap();
2569        assert_eq!(&back[0], &list);
2570
2571        // Ascending, null last
2572        let options = SortOptions::default().asc().with_nulls_first(false);
2573        let field = SortField::new_with_options(d.clone(), options);
2574        let converter = RowConverter::new(vec![field]).unwrap();
2575        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2576        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2577        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
2578        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
2579        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
2580        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
2581        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
2582        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2583
2584        let back = converter.convert_rows(&rows).unwrap();
2585        assert_eq!(back.len(), 1);
2586        back[0].to_data().validate_full().unwrap();
2587        assert_eq!(&back[0], &list);
2588
2589        // Descending, nulls last
2590        let options = SortOptions::default().desc().with_nulls_first(false);
2591        let field = SortField::new_with_options(d.clone(), options);
2592        let converter = RowConverter::new(vec![field]).unwrap();
2593        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2594        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2595        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
2596        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
2597        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
2598        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
2599        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
2600        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2601
2602        let back = converter.convert_rows(&rows).unwrap();
2603        assert_eq!(back.len(), 1);
2604        back[0].to_data().validate_full().unwrap();
2605        assert_eq!(&back[0], &list);
2606
2607        // Descending, nulls first
2608        let options = SortOptions::default().desc().with_nulls_first(true);
2609        let field = SortField::new_with_options(d, options);
2610        let converter = RowConverter::new(vec![field]).unwrap();
2611        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2612
2613        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2614        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12]
2615        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
2616        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
2617        assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null]
2618        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
2619        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2620
2621        let back = converter.convert_rows(&rows).unwrap();
2622        assert_eq!(back.len(), 1);
2623        back[0].to_data().validate_full().unwrap();
2624        assert_eq!(&back[0], &list);
2625
2626        let sliced_list = list.slice(1, 5);
2627        let rows_on_sliced_list = converter
2628            .convert_columns(&[Arc::clone(&sliced_list)])
2629            .unwrap();
2630
2631        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52, null]
2632        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null, null] < [32, 52, null]
2633        assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); // [null, null, null] > [32, 52, null]
2634        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < [null, null, null]
2635
2636        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2637        assert_eq!(back.len(), 1);
2638        back[0].to_data().validate_full().unwrap();
2639        assert_eq!(&back[0], &sliced_list);
2640    }
2641
2642    #[test]
2643    fn test_two_fixed_size_lists() {
2644        let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2645        // 0: [100]
2646        first.values().append_value(100);
2647        first.append(true);
2648        // 1: [101]
2649        first.values().append_value(101);
2650        first.append(true);
2651        // 2: [102]
2652        first.values().append_value(102);
2653        first.append(true);
2654        // 3: [null]
2655        first.values().append_null();
2656        first.append(true);
2657        // 4: null
2658        first.values().append_null(); // MASKED
2659        first.append(false);
2660        let first = Arc::new(first.finish()) as ArrayRef;
2661        let first_type = first.data_type().clone();
2662
2663        let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2664        // 0: [200]
2665        second.values().append_value(200);
2666        second.append(true);
2667        // 1: [201]
2668        second.values().append_value(201);
2669        second.append(true);
2670        // 2: [202]
2671        second.values().append_value(202);
2672        second.append(true);
2673        // 3: [null]
2674        second.values().append_null();
2675        second.append(true);
2676        // 4: null
2677        second.values().append_null(); // MASKED
2678        second.append(false);
2679        let second = Arc::new(second.finish()) as ArrayRef;
2680        let second_type = second.data_type().clone();
2681
2682        let converter = RowConverter::new(vec![
2683            SortField::new(first_type.clone()),
2684            SortField::new(second_type.clone()),
2685        ])
2686        .unwrap();
2687
2688        let rows = converter
2689            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
2690            .unwrap();
2691
2692        let back = converter.convert_rows(&rows).unwrap();
2693        assert_eq!(back.len(), 2);
2694        back[0].to_data().validate_full().unwrap();
2695        assert_eq!(&back[0], &first);
2696        back[1].to_data().validate_full().unwrap();
2697        assert_eq!(&back[1], &second);
2698    }
2699
2700    #[test]
2701    fn test_fixed_size_list_with_variable_width_content() {
2702        let mut first = FixedSizeListBuilder::new(
2703            StructBuilder::from_fields(
2704                vec![
2705                    Field::new(
2706                        "timestamp",
2707                        DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
2708                        false,
2709                    ),
2710                    Field::new("offset_minutes", DataType::Int16, false),
2711                    Field::new("time_zone", DataType::Utf8, false),
2712                ],
2713                1,
2714            ),
2715            1,
2716        );
2717        // 0: null
2718        first
2719            .values()
2720            .field_builder::<TimestampMicrosecondBuilder>(0)
2721            .unwrap()
2722            .append_null();
2723        first
2724            .values()
2725            .field_builder::<Int16Builder>(1)
2726            .unwrap()
2727            .append_null();
2728        first
2729            .values()
2730            .field_builder::<StringBuilder>(2)
2731            .unwrap()
2732            .append_null();
2733        first.values().append(false);
2734        first.append(false);
2735        // 1: [null]
2736        first
2737            .values()
2738            .field_builder::<TimestampMicrosecondBuilder>(0)
2739            .unwrap()
2740            .append_null();
2741        first
2742            .values()
2743            .field_builder::<Int16Builder>(1)
2744            .unwrap()
2745            .append_null();
2746        first
2747            .values()
2748            .field_builder::<StringBuilder>(2)
2749            .unwrap()
2750            .append_null();
2751        first.values().append(false);
2752        first.append(true);
2753        // 2: [1970-01-01 00:00:00.000000 UTC]
2754        first
2755            .values()
2756            .field_builder::<TimestampMicrosecondBuilder>(0)
2757            .unwrap()
2758            .append_value(0);
2759        first
2760            .values()
2761            .field_builder::<Int16Builder>(1)
2762            .unwrap()
2763            .append_value(0);
2764        first
2765            .values()
2766            .field_builder::<StringBuilder>(2)
2767            .unwrap()
2768            .append_value("UTC");
2769        first.values().append(true);
2770        first.append(true);
2771        // 3: [2005-09-10 13:30:00.123456 Europe/Warsaw]
2772        first
2773            .values()
2774            .field_builder::<TimestampMicrosecondBuilder>(0)
2775            .unwrap()
2776            .append_value(1126351800123456);
2777        first
2778            .values()
2779            .field_builder::<Int16Builder>(1)
2780            .unwrap()
2781            .append_value(120);
2782        first
2783            .values()
2784            .field_builder::<StringBuilder>(2)
2785            .unwrap()
2786            .append_value("Europe/Warsaw");
2787        first.values().append(true);
2788        first.append(true);
2789        let first = Arc::new(first.finish()) as ArrayRef;
2790        let first_type = first.data_type().clone();
2791
2792        let mut second = StringBuilder::new();
2793        second.append_value("somewhere near");
2794        second.append_null();
2795        second.append_value("Greenwich");
2796        second.append_value("Warsaw");
2797        let second = Arc::new(second.finish()) as ArrayRef;
2798        let second_type = second.data_type().clone();
2799
2800        let converter = RowConverter::new(vec![
2801            SortField::new(first_type.clone()),
2802            SortField::new(second_type.clone()),
2803        ])
2804        .unwrap();
2805
2806        let rows = converter
2807            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
2808            .unwrap();
2809
2810        let back = converter.convert_rows(&rows).unwrap();
2811        assert_eq!(back.len(), 2);
2812        back[0].to_data().validate_full().unwrap();
2813        assert_eq!(&back[0], &first);
2814        back[1].to_data().validate_full().unwrap();
2815        assert_eq!(&back[1], &second);
2816    }
2817
2818    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2819    where
2820        K: ArrowPrimitiveType,
2821        StandardUniform: Distribution<K::Native>,
2822    {
2823        let mut rng = rng();
2824        (0..len)
2825            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2826            .collect()
2827    }
2828
2829    fn generate_strings<O: OffsetSizeTrait>(
2830        len: usize,
2831        valid_percent: f64,
2832    ) -> GenericStringArray<O> {
2833        let mut rng = rng();
2834        (0..len)
2835            .map(|_| {
2836                rng.random_bool(valid_percent).then(|| {
2837                    let len = rng.random_range(0..100);
2838                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2839                    String::from_utf8(bytes).unwrap()
2840                })
2841            })
2842            .collect()
2843    }
2844
2845    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2846        let mut rng = rng();
2847        (0..len)
2848            .map(|_| {
2849                rng.random_bool(valid_percent).then(|| {
2850                    let len = rng.random_range(0..100);
2851                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2852                    String::from_utf8(bytes).unwrap()
2853                })
2854            })
2855            .collect()
2856    }
2857
2858    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2859        let mut rng = rng();
2860        (0..len)
2861            .map(|_| {
2862                rng.random_bool(valid_percent).then(|| {
2863                    let len = rng.random_range(0..100);
2864                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2865                    bytes
2866                })
2867            })
2868            .collect()
2869    }
2870
2871    fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
2872        let edge_cases = vec![
2873            Some("bar".to_string()),
2874            Some("bar\0".to_string()),
2875            Some("LongerThan12Bytes".to_string()),
2876            Some("LongerThan12Bytez".to_string()),
2877            Some("LongerThan12Bytes\0".to_string()),
2878            Some("LongerThan12Byt".to_string()),
2879            Some("backend one".to_string()),
2880            Some("backend two".to_string()),
2881            Some("a".repeat(257)),
2882            Some("a".repeat(300)),
2883        ];
2884
2885        // Fill up to `len` by repeating edge cases and trimming
2886        let mut values = Vec::with_capacity(len);
2887        for i in 0..len {
2888            values.push(
2889                edge_cases
2890                    .get(i % edge_cases.len())
2891                    .cloned()
2892                    .unwrap_or(None),
2893            );
2894        }
2895
2896        StringViewArray::from(values)
2897    }
2898
2899    fn generate_dictionary<K>(
2900        values: ArrayRef,
2901        len: usize,
2902        valid_percent: f64,
2903    ) -> DictionaryArray<K>
2904    where
2905        K: ArrowDictionaryKeyType,
2906        K::Native: SampleUniform,
2907    {
2908        let mut rng = rng();
2909        let min_key = K::Native::from_usize(0).unwrap();
2910        let max_key = K::Native::from_usize(values.len()).unwrap();
2911        let keys: PrimitiveArray<K> = (0..len)
2912            .map(|_| {
2913                rng.random_bool(valid_percent)
2914                    .then(|| rng.random_range(min_key..max_key))
2915            })
2916            .collect();
2917
2918        let data_type =
2919            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2920
2921        let data = keys
2922            .into_data()
2923            .into_builder()
2924            .data_type(data_type)
2925            .add_child_data(values.to_data())
2926            .build()
2927            .unwrap();
2928
2929        DictionaryArray::from(data)
2930    }
2931
2932    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2933        let mut rng = rng();
2934        let width = rng.random_range(0..20);
2935        let mut builder = FixedSizeBinaryBuilder::new(width);
2936
2937        let mut b = vec![0; width as usize];
2938        for _ in 0..len {
2939            match rng.random_bool(valid_percent) {
2940                true => {
2941                    b.iter_mut().for_each(|x| *x = rng.random());
2942                    builder.append_value(&b).unwrap();
2943                }
2944                false => builder.append_null(),
2945            }
2946        }
2947
2948        builder.finish()
2949    }
2950
2951    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2952        let mut rng = rng();
2953        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2954        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2955        let b = generate_strings::<i32>(len, valid_percent);
2956        let fields = Fields::from(vec![
2957            Field::new("a", DataType::Int32, true),
2958            Field::new("b", DataType::Utf8, true),
2959        ]);
2960        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2961        StructArray::new(fields, values, Some(nulls))
2962    }
2963
2964    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2965    where
2966        F: FnOnce(usize) -> ArrayRef,
2967    {
2968        let mut rng = rng();
2969        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2970        let values_len = offsets.last().unwrap().to_usize().unwrap();
2971        let values = values(values_len);
2972        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2973        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2974        ListArray::new(field, offsets, values, Some(nulls))
2975    }
2976
2977    fn generate_column(len: usize) -> ArrayRef {
2978        let mut rng = rng();
2979        match rng.random_range(0..18) {
2980            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2981            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2982            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2983            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2984            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2985            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2986            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2987            7 => Arc::new(generate_dictionary::<Int64Type>(
2988                // Cannot test dictionaries containing null values because of #2687
2989                Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2990                len,
2991                0.8,
2992            )),
2993            8 => Arc::new(generate_dictionary::<Int64Type>(
2994                // Cannot test dictionaries containing null values because of #2687
2995                Arc::new(generate_primitive_array::<Int64Type>(
2996                    rng.random_range(1..len),
2997                    1.0,
2998                )),
2999                len,
3000                0.8,
3001            )),
3002            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3003            10 => Arc::new(generate_struct(len, 0.8)),
3004            11 => Arc::new(generate_list(len, 0.8, |values_len| {
3005                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3006            })),
3007            12 => Arc::new(generate_list(len, 0.8, |values_len| {
3008                Arc::new(generate_strings::<i32>(values_len, 0.8))
3009            })),
3010            13 => Arc::new(generate_list(len, 0.8, |values_len| {
3011                Arc::new(generate_struct(values_len, 0.8))
3012            })),
3013            14 => Arc::new(generate_string_view(len, 0.8)),
3014            15 => Arc::new(generate_byte_view(len, 0.8)),
3015            16 => Arc::new(generate_fixed_stringview_column(len)),
3016            17 => Arc::new(
3017                generate_list(len + 1000, 0.8, |values_len| {
3018                    Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3019                })
3020                .slice(500, len),
3021            ),
3022            _ => unreachable!(),
3023        }
3024    }
3025
3026    fn print_row(cols: &[SortColumn], row: usize) -> String {
3027        let t: Vec<_> = cols
3028            .iter()
3029            .map(|x| match x.values.is_valid(row) {
3030                true => {
3031                    let opts = FormatOptions::default().with_null("NULL");
3032                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3033                    formatter.value(row).to_string()
3034                }
3035                false => "NULL".to_string(),
3036            })
3037            .collect();
3038        t.join(",")
3039    }
3040
3041    fn print_col_types(cols: &[SortColumn]) -> String {
3042        let t: Vec<_> = cols
3043            .iter()
3044            .map(|x| x.values.data_type().to_string())
3045            .collect();
3046        t.join(",")
3047    }
3048
3049    #[test]
3050    #[cfg_attr(miri, ignore)]
3051    fn fuzz_test() {
3052        for _ in 0..100 {
3053            let mut rng = rng();
3054            let num_columns = rng.random_range(1..5);
3055            let len = rng.random_range(5..100);
3056            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3057
3058            let options: Vec<_> = (0..num_columns)
3059                .map(|_| SortOptions {
3060                    descending: rng.random_bool(0.5),
3061                    nulls_first: rng.random_bool(0.5),
3062                })
3063                .collect();
3064
3065            let sort_columns: Vec<_> = options
3066                .iter()
3067                .zip(&arrays)
3068                .map(|(o, c)| SortColumn {
3069                    values: Arc::clone(c),
3070                    options: Some(*o),
3071                })
3072                .collect();
3073
3074            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3075
3076            let columns: Vec<SortField> = options
3077                .into_iter()
3078                .zip(&arrays)
3079                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3080                .collect();
3081
3082            let converter = RowConverter::new(columns).unwrap();
3083            let rows = converter.convert_columns(&arrays).unwrap();
3084
3085            for i in 0..len {
3086                for j in 0..len {
3087                    let row_i = rows.row(i);
3088                    let row_j = rows.row(j);
3089                    let row_cmp = row_i.cmp(&row_j);
3090                    let lex_cmp = comparator.compare(i, j);
3091                    assert_eq!(
3092                        row_cmp,
3093                        lex_cmp,
3094                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3095                        print_row(&sort_columns, i),
3096                        print_row(&sort_columns, j),
3097                        row_i,
3098                        row_j,
3099                        print_col_types(&sort_columns)
3100                    );
3101                }
3102            }
3103
3104            // Convert rows produced from convert_columns().
3105            // Note: validate_utf8 is set to false since Row is initialized through empty_rows()
3106            let back = converter.convert_rows(&rows).unwrap();
3107            for (actual, expected) in back.iter().zip(&arrays) {
3108                actual.to_data().validate_full().unwrap();
3109                dictionary_eq(actual, expected)
3110            }
3111
3112            // Check that we can convert rows into ByteArray and then parse, convert it back to array
3113            // Note: validate_utf8 is set to true since Row is initialized through RowParser
3114            let rows = rows.try_into_binary().expect("reasonable size");
3115            let parser = converter.parser();
3116            let back = converter
3117                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3118                .unwrap();
3119            for (actual, expected) in back.iter().zip(&arrays) {
3120                actual.to_data().validate_full().unwrap();
3121                dictionary_eq(actual, expected)
3122            }
3123
3124            let rows = converter.from_binary(rows);
3125            let back = converter.convert_rows(&rows).unwrap();
3126            for (actual, expected) in back.iter().zip(&arrays) {
3127                actual.to_data().validate_full().unwrap();
3128                dictionary_eq(actual, expected)
3129            }
3130        }
3131    }
3132
3133    #[test]
3134    fn test_clear() {
3135        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3136        let mut rows = converter.empty_rows(3, 128);
3137
3138        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3139        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3140        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3141
3142        for array in arrays.iter() {
3143            rows.clear();
3144            converter.append(&mut rows, &[array.clone()]).unwrap();
3145            let back = converter.convert_rows(&rows).unwrap();
3146            assert_eq!(&back[0], array);
3147        }
3148
3149        let mut rows_expected = converter.empty_rows(3, 128);
3150        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3151
3152        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3153            assert_eq!(
3154                actual, expected,
3155                "For row {i}: expected {expected:?}, actual: {actual:?}",
3156            );
3157        }
3158    }
3159
3160    #[test]
3161    fn test_append_codec_dictionary_binary() {
3162        use DataType::*;
3163        // Dictionary RowConverter
3164        let converter = RowConverter::new(vec![SortField::new(Dictionary(
3165            Box::new(Int32),
3166            Box::new(Binary),
3167        ))])
3168        .unwrap();
3169        let mut rows = converter.empty_rows(4, 128);
3170
3171        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3172        let values = BinaryArray::from(vec![
3173            Some("a".as_bytes()),
3174            Some(b"b"),
3175            Some(b"c"),
3176            Some(b"d"),
3177        ]);
3178        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3179
3180        rows.clear();
3181        let array = Arc::new(dict_array) as ArrayRef;
3182        converter.append(&mut rows, &[array.clone()]).unwrap();
3183        let back = converter.convert_rows(&rows).unwrap();
3184
3185        dictionary_eq(&back[0], &array);
3186    }
3187
3188    #[test]
3189    fn test_list_prefix() {
3190        let mut a = ListBuilder::new(Int8Builder::new());
3191        a.append_value([None]);
3192        a.append_value([None, None]);
3193        let a = a.finish();
3194
3195        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3196        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3197        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3198    }
3199
3200    #[test]
3201    fn map_should_be_marked_as_unsupported() {
3202        let map_data_type = Field::new_map(
3203            "map",
3204            "entries",
3205            Field::new("key", DataType::Utf8, false),
3206            Field::new("value", DataType::Utf8, true),
3207            false,
3208            true,
3209        )
3210        .data_type()
3211        .clone();
3212
3213        let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3214
3215        assert!(!is_supported, "Map should not be supported");
3216    }
3217
3218    #[test]
3219    fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3220        let map_data_type = Field::new_map(
3221            "map",
3222            "entries",
3223            Field::new("key", DataType::Utf8, false),
3224            Field::new("value", DataType::Utf8, true),
3225            false,
3226            true,
3227        )
3228        .data_type()
3229        .clone();
3230
3231        let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3232
3233        match converter {
3234            Err(ArrowError::NotYetImplemented(message)) => {
3235                assert!(
3236                    message.contains("Row format support not yet implemented for"),
3237                    "Expected NotYetImplemented error for map data type, got: {message}",
3238                );
3239            }
3240            Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3241            Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3242        }
3243    }
3244
3245    #[test]
3246    fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3247        fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3248            // 1. Convert cols into rows
3249            let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3250
3251            // 2a. Convert rows into colsa (validate_utf8 = false)
3252            let rows = converter.convert_columns(&[col]).unwrap();
3253            let converted = converter.convert_rows(&rows).unwrap();
3254            let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3255
3256            // 2b. Convert rows into cols (validate_utf8 = true since Row is initialized through RowParser)
3257            let rows = rows.try_into_binary().expect("reasonable size");
3258            let parser = converter.parser();
3259            let converted = converter
3260                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3261                .unwrap();
3262            let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3263            (unchecked_values_len, checked_values_len)
3264        }
3265
3266        // Case1. StringViewArray with inline strings
3267        let col = Arc::new(StringViewArray::from_iter([
3268            Some("hello"), // short(5)
3269            None,          // null
3270            Some("short"), // short(5)
3271            Some("tiny"),  // short(4)
3272        ])) as ArrayRef;
3273
3274        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3275        // Since there are no long (>12) strings, len of values buffer is 0
3276        assert_eq!(unchecked_values_len, 0);
3277        // When utf8 validation enabled, values buffer includes inline strings (5+5+4)
3278        assert_eq!(checked_values_len, 14);
3279
3280        // Case2. StringViewArray with long(>12) strings
3281        let col = Arc::new(StringViewArray::from_iter([
3282            Some("this is a very long string over 12 bytes"),
3283            Some("another long string to test the buffer"),
3284        ])) as ArrayRef;
3285
3286        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3287        // Since there are no inline strings, expected length of values buffer is the same
3288        assert!(unchecked_values_len > 0);
3289        assert_eq!(unchecked_values_len, checked_values_len);
3290
3291        // Case3. StringViewArray with both short and long strings
3292        let col = Arc::new(StringViewArray::from_iter([
3293            Some("tiny"),          // 4 (short)
3294            Some("thisisexact13"), // 13 (long)
3295            None,
3296            Some("short"), // 5 (short)
3297        ])) as ArrayRef;
3298
3299        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3300        // Since there is single long string, len of values buffer is 13
3301        assert_eq!(unchecked_values_len, 13);
3302        assert!(checked_values_len > unchecked_values_len);
3303    }
3304}