arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexsort
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
121//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
122//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
123//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
124//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
125//! [compared]: PartialOrd
126//! [compare]: PartialOrd
127
128#![doc(
129    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
148use crate::variable::{decode_binary, decode_string};
149use arrow_array::types::{Int16Type, Int32Type, Int64Type};
150
151mod fixed;
152mod list;
153mod run;
154mod variable;
155
156/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
157///
158/// *Note: The encoding of the row format may change from release to release.*
159///
160/// ## Overview
161///
162/// The row format is a variable length byte sequence created by
163/// concatenating the encoded form of each column. The encoding for
164/// each column depends on its datatype (and sort options).
165///
166/// The encoding is carefully designed in such a way that escaping is
167/// unnecessary: it is never ambiguous as to whether a byte is part of
168/// a sentinel (e.g. null) or a value.
169///
170/// ## Unsigned Integer Encoding
171///
172/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
173/// to the integer's length.
174///
175/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
176/// integer.
177///
178/// ```text
179///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
180///    3          │03│00│00│00│      │01│00│00│00│03│
181///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
182///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
183///   258         │02│01│00│00│      │01│00│00│01│02│
184///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
185///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
186///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
187///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
188///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
189///  NULL         │??│??│??│??│      │00│00│00│00│00│
190///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
191///
192///              32-bit (4 bytes)        Row Format
193///  Value        Little Endian
194/// ```
195///
196/// ## Signed Integer Encoding
197///
198/// Signed integers have their most significant sign bit flipped, and are then encoded in the
199/// same manner as an unsigned integer.
200///
201/// ```text
202///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
203///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
204///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
205///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
206///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
207///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
208///
209///  Value  32-bit (4 bytes)    High bit flipped      Row Format
210///          Little Endian
211/// ```
212///
213/// ## Float Encoding
214///
215/// Floats are converted from IEEE 754 representation to a signed integer representation
216/// by flipping all bar the sign bit if they are negative.
217///
218/// They are then encoded in the same manner as a signed integer.
219///
220/// ## Fixed Length Bytes Encoding
221///
222/// Fixed length bytes are encoded in the same fashion as primitive types above.
223///
224/// For a fixed length array of length `n`:
225///
226/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
227///
228/// A valid value is encoded as `1_u8` followed by the value bytes
229///
230/// ## Variable Length Bytes (including Strings) Encoding
231///
232/// A null is encoded as a `0_u8`.
233///
234/// An empty byte array is encoded as `1_u8`.
235///
236/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
237/// encoded using a block based scheme described below.
238///
239/// The byte array is broken up into fixed-width blocks, each block is written in turn
240/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
241/// with `0_u8` and written to the output, followed by the un-padded length in bytes
242/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
243/// blocks using a length of 32, this is to reduce space amplification for small strings.
244///
245/// Note the following example encodings use a block size of 4 bytes for brevity:
246///
247/// ```text
248///                       ┌───┬───┬───┬───┬───┬───┐
249///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
250///                       └───┴───┴───┴───┴───┴───┘
251///
252///                       ┌───┐
253///  ""                   │01 |
254///                       └───┘
255///
256///  NULL                 ┌───┐
257///                       │00 │
258///                       └───┘
259///
260/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
261///                       │02 │'D'│'e'│'f'│'e'│FF │
262///                       └───┼───┼───┼───┼───┼───┤
263///                           │'n'│'e'│'s'│'t'│FF │
264///                           ├───┼───┼───┼───┼───┤
265///                           │'r'│'a'│'t'│'r'│FF │
266///                           ├───┼───┼───┼───┼───┤
267///                           │'a'│'t'│'i'│'o'│FF │
268///                           ├───┼───┼───┼───┼───┤
269///                           │'n'│00 │00 │00 │01 │
270///                           └───┴───┴───┴───┴───┘
271/// ```
272///
273/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
274/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
275///
276/// ## Dictionary Encoding
277///
278/// Dictionary encoded arrays are hydrated to their underlying values
279///
280/// ## REE Encoding
281///
282/// REE (Run End Encoding) arrays, A form of Run Length Encoding, are hydrated to their underlying values.
283///
284/// ## Struct Encoding
285///
286/// A null is encoded as a `0_u8`.
287///
288/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
289///
290/// This encoding effectively flattens the schema in a depth-first fashion.
291///
292/// For example
293///
294/// ```text
295/// ┌───────┬────────────────────────┬───────┐
296/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
297/// └───────┴────────────────────────┴───────┘
298/// ```
299///
300/// Is encoded as
301///
302/// ```text
303/// ┌───────┬───────────────┬───────┬─────────┬───────┐
304/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
305/// └───────┴───────────────┴───────┴─────────┴───────┘
306/// ```
307///
308/// ## List Encoding
309///
310/// Lists are encoded by first encoding all child elements to the row format.
311///
312/// A list value is then encoded as the concatenation of each of the child elements,
313/// separately encoded using the variable length encoding described above, followed
314/// by the variable length encoding of an empty byte array.
315///
316/// For example given:
317///
318/// ```text
319/// [1_u8, 2_u8, 3_u8]
320/// [1_u8, null]
321/// []
322/// null
323/// ```
324///
325/// The elements would be converted to:
326///
327/// ```text
328///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
329///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
330///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
331///```
332///
333/// Which would be encoded as
334///
335/// ```text
336///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
337///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
338///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
339///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
340///
341///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
342///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
343///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
344///                          └──── 1_u8 ────┘   └──── null ────┘
345///
346///```
347///
348/// With `[]` represented by an empty byte array, and `null` a null byte array.
349///
350/// ## Fixed Size List Encoding
351///
352/// Fixed Size Lists are encoded by first encoding all child elements to the row format.
353///
354/// A non-null list value is then encoded as 0x01 followed by the concatenation of each
355/// of the child elements. A null list value is encoded as a null marker.
356///
357/// For example given:
358///
359/// ```text
360/// [1_u8, 2_u8]
361/// [3_u8, null]
362/// null
363/// ```
364///
365/// The elements would be converted to:
366///
367/// ```text
368///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
369///  1  │01│01│  2  │01│02│  3  │01│03│  null  │00│00│
370///     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
371///```
372///
373/// Which would be encoded as
374///
375/// ```text
376///                 ┌──┬──┬──┬──┬──┐
377///  [1_u8, 2_u8]   │01│01│01│01│02│
378///                 └──┴──┴──┴──┴──┘
379///                     └ 1 ┘ └ 2 ┘
380///                 ┌──┬──┬──┬──┬──┐
381///  [3_u8, null]   │01│01│03│00│00│
382///                 └──┴──┴──┴──┴──┘
383///                     └ 1 ┘ └null┘
384///                 ┌──┐
385///  null           │00│
386///                 └──┘
387///
388///```
389///
390/// # Ordering
391///
392/// ## Float Ordering
393///
394/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
395/// in the IEEE 754 (2008 revision) floating point standard.
396///
397/// The ordering established by this does not always agree with the
398/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
399/// they consider negative and positive zero equal, while this does not
400///
401/// ## Null Ordering
402///
403/// The encoding described above will order nulls first, this can be inverted by representing
404/// nulls as `0xFF_u8` instead of `0_u8`
405///
406/// ## Reverse Column Ordering
407///
408/// The order of a given column can be reversed by negating the encoded bytes of non-null values
409///
410/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
411/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
412#[derive(Debug)]
413pub struct RowConverter {
414    fields: Arc<[SortField]>,
415    /// State for codecs
416    codecs: Vec<Codec>,
417}
418
419#[derive(Debug)]
420enum Codec {
421    /// No additional codec state is necessary
422    Stateless,
423    /// A row converter for the dictionary values
424    /// and the encoding of a row containing only nulls
425    Dictionary(RowConverter, OwnedRow),
426    /// A row converter for the child fields
427    /// and the encoding of a row containing only nulls
428    Struct(RowConverter, OwnedRow),
429    /// A row converter for the child field
430    List(RowConverter),
431    /// A row converter for the values array of a run-end encoded array
432    RunEndEncoded(RowConverter),
433}
434
435impl Codec {
436    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
437        match &sort_field.data_type {
438            DataType::Dictionary(_, values) => {
439                let sort_field =
440                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
441
442                let converter = RowConverter::new(vec![sort_field])?;
443                let null_array = new_null_array(values.as_ref(), 1);
444                let nulls = converter.convert_columns(&[null_array])?;
445
446                let owned = OwnedRow {
447                    data: nulls.buffer.into(),
448                    config: nulls.config,
449                };
450                Ok(Self::Dictionary(converter, owned))
451            }
452            DataType::RunEndEncoded(_, values) => {
453                // Similar to List implementation
454                let options = SortOptions {
455                    descending: false,
456                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
457                };
458
459                let field = SortField::new_with_options(values.data_type().clone(), options);
460                let converter = RowConverter::new(vec![field])?;
461                Ok(Self::RunEndEncoded(converter))
462            }
463            d if !d.is_nested() => Ok(Self::Stateless),
464            DataType::List(f) | DataType::LargeList(f) => {
465                // The encoded contents will be inverted if descending is set to true
466                // As such we set `descending` to false and negate nulls first if it
467                // it set to true
468                let options = SortOptions {
469                    descending: false,
470                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
471                };
472
473                let field = SortField::new_with_options(f.data_type().clone(), options);
474                let converter = RowConverter::new(vec![field])?;
475                Ok(Self::List(converter))
476            }
477            DataType::FixedSizeList(f, _) => {
478                let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
479                let converter = RowConverter::new(vec![field])?;
480                Ok(Self::List(converter))
481            }
482            DataType::Struct(f) => {
483                let sort_fields = f
484                    .iter()
485                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
486                    .collect();
487
488                let converter = RowConverter::new(sort_fields)?;
489                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
490
491                let nulls = converter.convert_columns(&nulls)?;
492                let owned = OwnedRow {
493                    data: nulls.buffer.into(),
494                    config: nulls.config,
495                };
496
497                Ok(Self::Struct(converter, owned))
498            }
499            _ => Err(ArrowError::NotYetImplemented(format!(
500                "not yet implemented: {:?}",
501                sort_field.data_type
502            ))),
503        }
504    }
505
506    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
507        match self {
508            Codec::Stateless => Ok(Encoder::Stateless),
509            Codec::Dictionary(converter, nulls) => {
510                let values = array.as_any_dictionary().values().clone();
511                let rows = converter.convert_columns(&[values])?;
512                Ok(Encoder::Dictionary(rows, nulls.row()))
513            }
514            Codec::Struct(converter, null) => {
515                let v = as_struct_array(array);
516                let rows = converter.convert_columns(v.columns())?;
517                Ok(Encoder::Struct(rows, null.row()))
518            }
519            Codec::List(converter) => {
520                let values = match array.data_type() {
521                    DataType::List(_) => as_list_array(array).values(),
522                    DataType::LargeList(_) => as_large_list_array(array).values(),
523                    DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array).values(),
524                    _ => unreachable!(),
525                };
526                let rows = converter.convert_columns(&[values.clone()])?;
527                Ok(Encoder::List(rows))
528            }
529            Codec::RunEndEncoded(converter) => {
530                let values = match array.data_type() {
531                    DataType::RunEndEncoded(r, _) => match r.data_type() {
532                        DataType::Int16 => array.as_run::<Int16Type>().values(),
533                        DataType::Int32 => array.as_run::<Int32Type>().values(),
534                        DataType::Int64 => array.as_run::<Int64Type>().values(),
535                        _ => unreachable!("Unsupported run end index type: {r:?}"),
536                    },
537                    _ => unreachable!(),
538                };
539                let rows = converter.convert_columns(&[values.clone()])?;
540                Ok(Encoder::RunEndEncoded(rows))
541            }
542        }
543    }
544
545    fn size(&self) -> usize {
546        match self {
547            Codec::Stateless => 0,
548            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
549            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
550            Codec::List(converter) => converter.size(),
551            Codec::RunEndEncoded(converter) => converter.size(),
552        }
553    }
554}
555
556#[derive(Debug)]
557enum Encoder<'a> {
558    /// No additional encoder state is necessary
559    Stateless,
560    /// The encoding of the child array and the encoding of a null row
561    Dictionary(Rows, Row<'a>),
562    /// The row encoding of the child arrays and the encoding of a null row
563    ///
564    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
565    /// values that are masked by a null in the parent StructArray, otherwise
566    /// this would establish an ordering between semantically null values
567    Struct(Rows, Row<'a>),
568    /// The row encoding of the child array
569    List(Rows),
570    /// The row encoding of the values array
571    RunEndEncoded(Rows),
572}
573
574/// Configure the data type and sort order for a given column
575#[derive(Debug, Clone, PartialEq, Eq)]
576pub struct SortField {
577    /// Sort options
578    options: SortOptions,
579    /// Data type
580    data_type: DataType,
581}
582
583impl SortField {
584    /// Create a new column with the given data type
585    pub fn new(data_type: DataType) -> Self {
586        Self::new_with_options(data_type, Default::default())
587    }
588
589    /// Create a new column with the given data type and [`SortOptions`]
590    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
591        Self { options, data_type }
592    }
593
594    /// Return size of this instance in bytes.
595    ///
596    /// Includes the size of `Self`.
597    pub fn size(&self) -> usize {
598        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
599    }
600}
601
602impl RowConverter {
603    /// Create a new [`RowConverter`] with the provided schema
604    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
605        if !Self::supports_fields(&fields) {
606            return Err(ArrowError::NotYetImplemented(format!(
607                "Row format support not yet implemented for: {fields:?}"
608            )));
609        }
610
611        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
612        Ok(Self {
613            fields: fields.into(),
614            codecs,
615        })
616    }
617
618    /// Check if the given fields are supported by the row format.
619    pub fn supports_fields(fields: &[SortField]) -> bool {
620        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
621    }
622
623    fn supports_datatype(d: &DataType) -> bool {
624        match d {
625            _ if !d.is_nested() => true,
626            DataType::List(f)
627            | DataType::LargeList(f)
628            | DataType::FixedSizeList(f, _)
629            | DataType::Map(f, _) => Self::supports_datatype(f.data_type()),
630            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
631            DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
632            _ => false,
633        }
634    }
635
636    /// Convert [`ArrayRef`] columns into [`Rows`]
637    ///
638    /// See [`Row`] for information on when [`Row`] can be compared
639    ///
640    /// # Panics
641    ///
642    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
643    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
644        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
645        let mut rows = self.empty_rows(num_rows, 0);
646        self.append(&mut rows, columns)?;
647        Ok(rows)
648    }
649
650    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
651    ///
652    /// See [`Row`] for information on when [`Row`] can be compared
653    ///
654    /// # Panics
655    ///
656    /// Panics if
657    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
658    /// * The provided [`Rows`] were not created by this [`RowConverter`]
659    ///
660    /// ```
661    /// # use std::sync::Arc;
662    /// # use std::collections::HashSet;
663    /// # use arrow_array::cast::AsArray;
664    /// # use arrow_array::StringArray;
665    /// # use arrow_row::{Row, RowConverter, SortField};
666    /// # use arrow_schema::DataType;
667    /// #
668    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
669    /// let a1 = StringArray::from(vec!["hello", "world"]);
670    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
671    ///
672    /// let mut rows = converter.empty_rows(5, 128);
673    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
674    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
675    ///
676    /// let back = converter.convert_rows(&rows).unwrap();
677    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
678    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
679    /// ```
680    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
681        assert!(
682            Arc::ptr_eq(&rows.config.fields, &self.fields),
683            "rows were not produced by this RowConverter"
684        );
685
686        if columns.len() != self.fields.len() {
687            return Err(ArrowError::InvalidArgumentError(format!(
688                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
689                self.fields.len(),
690                columns.len()
691            )));
692        }
693
694        let encoders = columns
695            .iter()
696            .zip(&self.codecs)
697            .zip(self.fields.iter())
698            .map(|((column, codec), field)| {
699                if !column.data_type().equals_datatype(&field.data_type) {
700                    return Err(ArrowError::InvalidArgumentError(format!(
701                        "RowConverter column schema mismatch, expected {} got {}",
702                        field.data_type,
703                        column.data_type()
704                    )));
705                }
706                codec.encoder(column.as_ref())
707            })
708            .collect::<Result<Vec<_>, _>>()?;
709
710        let write_offset = rows.num_rows();
711        let lengths = row_lengths(columns, &encoders);
712        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
713        rows.buffer.resize(total, 0);
714
715        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
716            // We encode a column at a time to minimise dispatch overheads
717            encode_column(
718                &mut rows.buffer,
719                &mut rows.offsets[write_offset..],
720                column.as_ref(),
721                field.options,
722                &encoder,
723            )
724        }
725
726        if cfg!(debug_assertions) {
727            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
728            rows.offsets
729                .windows(2)
730                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
731        }
732
733        Ok(())
734    }
735
736    /// Convert [`Rows`] columns into [`ArrayRef`]
737    ///
738    /// # Panics
739    ///
740    /// Panics if the rows were not produced by this [`RowConverter`]
741    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
742    where
743        I: IntoIterator<Item = Row<'a>>,
744    {
745        let mut validate_utf8 = false;
746        let mut rows: Vec<_> = rows
747            .into_iter()
748            .map(|row| {
749                assert!(
750                    Arc::ptr_eq(&row.config.fields, &self.fields),
751                    "rows were not produced by this RowConverter"
752                );
753                validate_utf8 |= row.config.validate_utf8;
754                row.data
755            })
756            .collect();
757
758        // SAFETY
759        // We have validated that the rows came from this [`RowConverter`]
760        // and therefore must be valid
761        unsafe { self.convert_raw(&mut rows, validate_utf8) }
762    }
763
764    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
765    /// a total length of `data_capacity`
766    ///
767    /// This can be used to buffer a selection of [`Row`]
768    ///
769    /// ```
770    /// # use std::sync::Arc;
771    /// # use std::collections::HashSet;
772    /// # use arrow_array::cast::AsArray;
773    /// # use arrow_array::StringArray;
774    /// # use arrow_row::{Row, RowConverter, SortField};
775    /// # use arrow_schema::DataType;
776    /// #
777    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
778    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
779    ///
780    /// // Convert to row format and deduplicate
781    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
782    /// let mut distinct_rows = converter.empty_rows(3, 100);
783    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
784    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
785    ///
786    /// // Note: we could skip buffering and feed the filtered iterator directly
787    /// // into convert_rows, this is done for demonstration purposes only
788    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
789    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
790    /// assert_eq!(&values, &["hello", "world", "a"]);
791    /// ```
792    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
793        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
794        offsets.push(0);
795
796        Rows {
797            offsets,
798            buffer: Vec::with_capacity(data_capacity),
799            config: RowConfig {
800                fields: self.fields.clone(),
801                validate_utf8: false,
802            },
803        }
804    }
805
806    /// Create a new [Rows] instance from the given binary data.
807    ///
808    /// ```
809    /// # use std::sync::Arc;
810    /// # use std::collections::HashSet;
811    /// # use arrow_array::cast::AsArray;
812    /// # use arrow_array::StringArray;
813    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
814    /// # use arrow_schema::DataType;
815    /// #
816    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
817    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
818    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
819    ///
820    /// // We can convert rows into binary format and back in batch.
821    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
822    /// let binary = rows.try_into_binary().expect("known-small array");
823    /// let converted = converter.from_binary(binary.clone());
824    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
825    /// ```
826    ///
827    /// # Panics
828    ///
829    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
830    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
831    /// panic if the data is malformed.
832    pub fn from_binary(&self, array: BinaryArray) -> Rows {
833        assert_eq!(
834            array.null_count(),
835            0,
836            "can't construct Rows instance from array with nulls"
837        );
838        Rows {
839            buffer: array.values().to_vec(),
840            offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
841            config: RowConfig {
842                fields: Arc::clone(&self.fields),
843                validate_utf8: true,
844            },
845        }
846    }
847
848    /// Convert raw bytes into [`ArrayRef`]
849    ///
850    /// # Safety
851    ///
852    /// `rows` must contain valid data for this [`RowConverter`]
853    unsafe fn convert_raw(
854        &self,
855        rows: &mut [&[u8]],
856        validate_utf8: bool,
857    ) -> Result<Vec<ArrayRef>, ArrowError> {
858        self.fields
859            .iter()
860            .zip(&self.codecs)
861            .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
862            .collect()
863    }
864
865    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
866    pub fn parser(&self) -> RowParser {
867        RowParser::new(Arc::clone(&self.fields))
868    }
869
870    /// Returns the size of this instance in bytes
871    ///
872    /// Includes the size of `Self`.
873    pub fn size(&self) -> usize {
874        std::mem::size_of::<Self>()
875            + self.fields.iter().map(|x| x.size()).sum::<usize>()
876            + self.codecs.capacity() * std::mem::size_of::<Codec>()
877            + self.codecs.iter().map(Codec::size).sum::<usize>()
878    }
879}
880
881/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
882#[derive(Debug)]
883pub struct RowParser {
884    config: RowConfig,
885}
886
887impl RowParser {
888    fn new(fields: Arc<[SortField]>) -> Self {
889        Self {
890            config: RowConfig {
891                fields,
892                validate_utf8: true,
893            },
894        }
895    }
896
897    /// Creates a [`Row`] from the provided `bytes`.
898    ///
899    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
900    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
901    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
902        Row {
903            data: bytes,
904            config: &self.config,
905        }
906    }
907}
908
909/// The config of a given set of [`Row`]
910#[derive(Debug, Clone)]
911struct RowConfig {
912    /// The schema for these rows
913    fields: Arc<[SortField]>,
914    /// Whether to run UTF-8 validation when converting to arrow arrays
915    validate_utf8: bool,
916}
917
918/// A row-oriented representation of arrow data, that is normalized for comparison.
919///
920/// See the [module level documentation](self) and [`RowConverter`] for more details.
921#[derive(Debug)]
922pub struct Rows {
923    /// Underlying row bytes
924    buffer: Vec<u8>,
925    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
926    offsets: Vec<usize>,
927    /// The config for these rows
928    config: RowConfig,
929}
930
931impl Rows {
932    /// Append a [`Row`] to this [`Rows`]
933    pub fn push(&mut self, row: Row<'_>) {
934        assert!(
935            Arc::ptr_eq(&row.config.fields, &self.config.fields),
936            "row was not produced by this RowConverter"
937        );
938        self.config.validate_utf8 |= row.config.validate_utf8;
939        self.buffer.extend_from_slice(row.data);
940        self.offsets.push(self.buffer.len())
941    }
942
943    /// Returns the row at index `row`
944    pub fn row(&self, row: usize) -> Row<'_> {
945        assert!(row + 1 < self.offsets.len());
946        unsafe { self.row_unchecked(row) }
947    }
948
949    /// Returns the row at `index` without bounds checking
950    ///
951    /// # Safety
952    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
953    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
954        let end = unsafe { self.offsets.get_unchecked(index + 1) };
955        let start = unsafe { self.offsets.get_unchecked(index) };
956        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
957        Row {
958            data,
959            config: &self.config,
960        }
961    }
962
963    /// Sets the length of this [`Rows`] to 0
964    pub fn clear(&mut self) {
965        self.offsets.truncate(1);
966        self.buffer.clear();
967    }
968
969    /// Returns the number of [`Row`] in this [`Rows`]
970    pub fn num_rows(&self) -> usize {
971        self.offsets.len() - 1
972    }
973
974    /// Returns an iterator over the [`Row`] in this [`Rows`]
975    pub fn iter(&self) -> RowsIter<'_> {
976        self.into_iter()
977    }
978
979    /// Returns the size of this instance in bytes
980    ///
981    /// Includes the size of `Self`.
982    pub fn size(&self) -> usize {
983        // Size of fields is accounted for as part of RowConverter
984        std::mem::size_of::<Self>()
985            + self.buffer.len()
986            + self.offsets.len() * std::mem::size_of::<usize>()
987    }
988
989    /// Create a [BinaryArray] from the [Rows] data without reallocating the
990    /// underlying bytes.
991    ///
992    ///
993    /// ```
994    /// # use std::sync::Arc;
995    /// # use std::collections::HashSet;
996    /// # use arrow_array::cast::AsArray;
997    /// # use arrow_array::StringArray;
998    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
999    /// # use arrow_schema::DataType;
1000    /// #
1001    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1002    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1003    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1004    ///
1005    /// // We can convert rows into binary format and back.
1006    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1007    /// let binary = rows.try_into_binary().expect("known-small array");
1008    /// let parser = converter.parser();
1009    /// let parsed: Vec<OwnedRow> =
1010    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
1011    /// assert_eq!(values, parsed);
1012    /// ```
1013    ///
1014    /// # Errors
1015    ///
1016    /// This function will return an error if there is more data than can be stored in
1017    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
1018    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1019        if self.buffer.len() > i32::MAX as usize {
1020            return Err(ArrowError::InvalidArgumentError(format!(
1021                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1022                self.buffer.len()
1023            )));
1024        }
1025        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
1026        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1027        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
1028        let array = unsafe {
1029            BinaryArray::new_unchecked(
1030                OffsetBuffer::new_unchecked(offsets_scalar),
1031                Buffer::from_vec(self.buffer),
1032                None,
1033            )
1034        };
1035        Ok(array)
1036    }
1037}
1038
1039impl<'a> IntoIterator for &'a Rows {
1040    type Item = Row<'a>;
1041    type IntoIter = RowsIter<'a>;
1042
1043    fn into_iter(self) -> Self::IntoIter {
1044        RowsIter {
1045            rows: self,
1046            start: 0,
1047            end: self.num_rows(),
1048        }
1049    }
1050}
1051
1052/// An iterator over [`Rows`]
1053#[derive(Debug)]
1054pub struct RowsIter<'a> {
1055    rows: &'a Rows,
1056    start: usize,
1057    end: usize,
1058}
1059
1060impl<'a> Iterator for RowsIter<'a> {
1061    type Item = Row<'a>;
1062
1063    fn next(&mut self) -> Option<Self::Item> {
1064        if self.end == self.start {
1065            return None;
1066        }
1067
1068        // SAFETY: We have checked that `start` is less than `end`
1069        let row = unsafe { self.rows.row_unchecked(self.start) };
1070        self.start += 1;
1071        Some(row)
1072    }
1073
1074    fn size_hint(&self) -> (usize, Option<usize>) {
1075        let len = self.len();
1076        (len, Some(len))
1077    }
1078}
1079
1080impl ExactSizeIterator for RowsIter<'_> {
1081    fn len(&self) -> usize {
1082        self.end - self.start
1083    }
1084}
1085
1086impl DoubleEndedIterator for RowsIter<'_> {
1087    fn next_back(&mut self) -> Option<Self::Item> {
1088        if self.end == self.start {
1089            return None;
1090        }
1091        // Safety: We have checked that `start` is less than `end`
1092        let row = unsafe { self.rows.row_unchecked(self.end) };
1093        self.end -= 1;
1094        Some(row)
1095    }
1096}
1097
1098/// A comparable representation of a row.
1099///
1100/// See the [module level documentation](self) for more details.
1101///
1102/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1103/// returned by calls to [`RowConverter::convert_columns`] on the same
1104/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1105/// ordering established by comparing the [`Row`] is arbitrary.
1106#[derive(Debug, Copy, Clone)]
1107pub struct Row<'a> {
1108    data: &'a [u8],
1109    config: &'a RowConfig,
1110}
1111
1112impl<'a> Row<'a> {
1113    /// Create owned version of the row to detach it from the shared [`Rows`].
1114    pub fn owned(&self) -> OwnedRow {
1115        OwnedRow {
1116            data: self.data.into(),
1117            config: self.config.clone(),
1118        }
1119    }
1120
1121    /// The row's bytes, with the lifetime of the underlying data.
1122    pub fn data(&self) -> &'a [u8] {
1123        self.data
1124    }
1125}
1126
1127// Manually derive these as don't wish to include `fields`
1128
1129impl PartialEq for Row<'_> {
1130    #[inline]
1131    fn eq(&self, other: &Self) -> bool {
1132        self.data.eq(other.data)
1133    }
1134}
1135
1136impl Eq for Row<'_> {}
1137
1138impl PartialOrd for Row<'_> {
1139    #[inline]
1140    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1141        Some(self.cmp(other))
1142    }
1143}
1144
1145impl Ord for Row<'_> {
1146    #[inline]
1147    fn cmp(&self, other: &Self) -> Ordering {
1148        self.data.cmp(other.data)
1149    }
1150}
1151
1152impl Hash for Row<'_> {
1153    #[inline]
1154    fn hash<H: Hasher>(&self, state: &mut H) {
1155        self.data.hash(state)
1156    }
1157}
1158
1159impl AsRef<[u8]> for Row<'_> {
1160    #[inline]
1161    fn as_ref(&self) -> &[u8] {
1162        self.data
1163    }
1164}
1165
1166/// Owned version of a [`Row`] that can be moved/cloned freely.
1167///
1168/// This contains the data for the one specific row (not the entire buffer of all rows).
1169#[derive(Debug, Clone)]
1170pub struct OwnedRow {
1171    data: Box<[u8]>,
1172    config: RowConfig,
1173}
1174
1175impl OwnedRow {
1176    /// Get borrowed [`Row`] from owned version.
1177    ///
1178    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1179    pub fn row(&self) -> Row<'_> {
1180        Row {
1181            data: &self.data,
1182            config: &self.config,
1183        }
1184    }
1185}
1186
1187// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1188
1189impl PartialEq for OwnedRow {
1190    #[inline]
1191    fn eq(&self, other: &Self) -> bool {
1192        self.row().eq(&other.row())
1193    }
1194}
1195
1196impl Eq for OwnedRow {}
1197
1198impl PartialOrd for OwnedRow {
1199    #[inline]
1200    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1201        Some(self.cmp(other))
1202    }
1203}
1204
1205impl Ord for OwnedRow {
1206    #[inline]
1207    fn cmp(&self, other: &Self) -> Ordering {
1208        self.row().cmp(&other.row())
1209    }
1210}
1211
1212impl Hash for OwnedRow {
1213    #[inline]
1214    fn hash<H: Hasher>(&self, state: &mut H) {
1215        self.row().hash(state)
1216    }
1217}
1218
1219impl AsRef<[u8]> for OwnedRow {
1220    #[inline]
1221    fn as_ref(&self) -> &[u8] {
1222        &self.data
1223    }
1224}
1225
1226/// Returns the null sentinel, negated if `invert` is true
1227#[inline]
1228fn null_sentinel(options: SortOptions) -> u8 {
1229    match options.nulls_first {
1230        true => 0,
1231        false => 0xFF,
1232    }
1233}
1234
1235/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types.
1236enum LengthTracker {
1237    /// Fixed state: All rows have length `length`
1238    Fixed { length: usize, num_rows: usize },
1239    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
1240    Variable {
1241        fixed_length: usize,
1242        lengths: Vec<usize>,
1243    },
1244}
1245
1246impl LengthTracker {
1247    fn new(num_rows: usize) -> Self {
1248        Self::Fixed {
1249            length: 0,
1250            num_rows,
1251        }
1252    }
1253
1254    /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker
1255    fn push_fixed(&mut self, new_length: usize) {
1256        match self {
1257            LengthTracker::Fixed { length, .. } => *length += new_length,
1258            LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1259        }
1260    }
1261
1262    /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)`
1263    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1264        match self {
1265            LengthTracker::Fixed { length, .. } => {
1266                *self = LengthTracker::Variable {
1267                    fixed_length: *length,
1268                    lengths: new_lengths.collect(),
1269                }
1270            }
1271            LengthTracker::Variable { lengths, .. } => {
1272                assert_eq!(lengths.len(), new_lengths.len());
1273                lengths
1274                    .iter_mut()
1275                    .zip(new_lengths)
1276                    .for_each(|(length, new_length)| *length += new_length);
1277            }
1278        }
1279    }
1280
1281    /// Returns the tracked row lengths as a slice
1282    fn materialized(&mut self) -> &mut [usize] {
1283        if let LengthTracker::Fixed { length, num_rows } = *self {
1284            *self = LengthTracker::Variable {
1285                fixed_length: length,
1286                lengths: vec![0; num_rows],
1287            };
1288        }
1289
1290        match self {
1291            LengthTracker::Variable { lengths, .. } => lengths,
1292            LengthTracker::Fixed { .. } => unreachable!(),
1293        }
1294    }
1295
1296    /// Initializes the offsets using the tracked lengths. Returns the sum of the
1297    /// lengths of the rows added.
1298    ///
1299    /// We initialize the offsets shifted down by one row index.
1300    ///
1301    /// As the rows are appended to the offsets will be incremented to match
1302    ///
1303    /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
1304    /// The offsets would be initialized to `0, 0, 3, 7`
1305    ///
1306    /// Writing the first row entirely would yield `0, 3, 3, 7`
1307    /// The second, `0, 3, 7, 7`
1308    /// The third, `0, 3, 7, 13`
1309    //
1310    /// This would be the final offsets for reading
1311    //
1312    /// In this way offsets tracks the position during writing whilst eventually serving
1313    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1314        match self {
1315            LengthTracker::Fixed { length, num_rows } => {
1316                offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1317
1318                initial_offset + num_rows * length
1319            }
1320            LengthTracker::Variable {
1321                fixed_length,
1322                lengths,
1323            } => {
1324                let mut acc = initial_offset;
1325
1326                offsets.extend(lengths.iter().map(|length| {
1327                    let current = acc;
1328                    acc += length + fixed_length;
1329                    current
1330                }));
1331
1332                acc
1333            }
1334        }
1335    }
1336}
1337
1338/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1339fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1340    use fixed::FixedLengthEncoding;
1341
1342    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1343    let mut tracker = LengthTracker::new(num_rows);
1344
1345    for (array, encoder) in cols.iter().zip(encoders) {
1346        match encoder {
1347            Encoder::Stateless => {
1348                downcast_primitive_array! {
1349                    array => tracker.push_fixed(fixed::encoded_len(array)),
1350                    DataType::Null => {},
1351                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1352                    DataType::Binary => tracker.push_variable(
1353                        as_generic_binary_array::<i32>(array)
1354                            .iter()
1355                            .map(|slice| variable::encoded_len(slice))
1356                    ),
1357                    DataType::LargeBinary => tracker.push_variable(
1358                        as_generic_binary_array::<i64>(array)
1359                            .iter()
1360                            .map(|slice| variable::encoded_len(slice))
1361                    ),
1362                    DataType::BinaryView => tracker.push_variable(
1363                        array.as_binary_view()
1364                            .iter()
1365                            .map(|slice| variable::encoded_len(slice))
1366                    ),
1367                    DataType::Utf8 => tracker.push_variable(
1368                        array.as_string::<i32>()
1369                            .iter()
1370                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1371                    ),
1372                    DataType::LargeUtf8 => tracker.push_variable(
1373                        array.as_string::<i64>()
1374                            .iter()
1375                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1376                    ),
1377                    DataType::Utf8View => tracker.push_variable(
1378                        array.as_string_view()
1379                            .iter()
1380                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1381                    ),
1382                    DataType::FixedSizeBinary(len) => {
1383                        let len = len.to_usize().unwrap();
1384                        tracker.push_fixed(1 + len)
1385                    }
1386                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1387                }
1388            }
1389            Encoder::Dictionary(values, null) => {
1390                downcast_dictionary_array! {
1391                    array => {
1392                        tracker.push_variable(
1393                            array.keys().iter().map(|v| match v {
1394                                Some(k) => values.row(k.as_usize()).data.len(),
1395                                None => null.data.len(),
1396                            })
1397                        )
1398                    }
1399                    _ => unreachable!(),
1400                }
1401            }
1402            Encoder::Struct(rows, null) => {
1403                let array = as_struct_array(array);
1404                tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1405                    true => 1 + rows.row(idx).as_ref().len(),
1406                    false => 1 + null.data.len(),
1407                }));
1408            }
1409            Encoder::List(rows) => match array.data_type() {
1410                DataType::List(_) => {
1411                    list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1412                }
1413                DataType::LargeList(_) => {
1414                    list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1415                }
1416                DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1417                    &mut tracker,
1418                    rows,
1419                    as_fixed_size_list_array(array),
1420                ),
1421                _ => unreachable!(),
1422            },
1423            Encoder::RunEndEncoded(rows) => match array.data_type() {
1424                DataType::RunEndEncoded(r, _) => match r.data_type() {
1425                    DataType::Int16 => run::compute_lengths(
1426                        tracker.materialized(),
1427                        rows,
1428                        array.as_run::<Int16Type>(),
1429                    ),
1430                    DataType::Int32 => run::compute_lengths(
1431                        tracker.materialized(),
1432                        rows,
1433                        array.as_run::<Int32Type>(),
1434                    ),
1435                    DataType::Int64 => run::compute_lengths(
1436                        tracker.materialized(),
1437                        rows,
1438                        array.as_run::<Int64Type>(),
1439                    ),
1440                    _ => unreachable!("Unsupported run end index type: {r:?}"),
1441                },
1442                _ => unreachable!(),
1443            },
1444        }
1445    }
1446
1447    tracker
1448}
1449
1450/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1451fn encode_column(
1452    data: &mut [u8],
1453    offsets: &mut [usize],
1454    column: &dyn Array,
1455    opts: SortOptions,
1456    encoder: &Encoder<'_>,
1457) {
1458    match encoder {
1459        Encoder::Stateless => {
1460            downcast_primitive_array! {
1461                column => {
1462                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1463                        fixed::encode(data, offsets, column.values(), nulls, opts)
1464                    } else {
1465                        fixed::encode_not_null(data, offsets, column.values(), opts)
1466                    }
1467                }
1468                DataType::Null => {}
1469                DataType::Boolean => {
1470                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1471                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1472                    } else {
1473                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1474                    }
1475                }
1476                DataType::Binary => {
1477                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1478                }
1479                DataType::BinaryView => {
1480                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1481                }
1482                DataType::LargeBinary => {
1483                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1484                }
1485                DataType::Utf8 => variable::encode(
1486                    data, offsets,
1487                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1488                    opts,
1489                ),
1490                DataType::LargeUtf8 => variable::encode(
1491                    data, offsets,
1492                    column.as_string::<i64>()
1493                        .iter()
1494                        .map(|x| x.map(|x| x.as_bytes())),
1495                    opts,
1496                ),
1497                DataType::Utf8View => variable::encode(
1498                    data, offsets,
1499                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1500                    opts,
1501                ),
1502                DataType::FixedSizeBinary(_) => {
1503                    let array = column.as_any().downcast_ref().unwrap();
1504                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1505                }
1506                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1507            }
1508        }
1509        Encoder::Dictionary(values, nulls) => {
1510            downcast_dictionary_array! {
1511                column => encode_dictionary_values(data, offsets, column, values, nulls),
1512                _ => unreachable!()
1513            }
1514        }
1515        Encoder::Struct(rows, null) => {
1516            let array = as_struct_array(column);
1517            let null_sentinel = null_sentinel(opts);
1518            offsets
1519                .iter_mut()
1520                .skip(1)
1521                .enumerate()
1522                .for_each(|(idx, offset)| {
1523                    let (row, sentinel) = match array.is_valid(idx) {
1524                        true => (rows.row(idx), 0x01),
1525                        false => (*null, null_sentinel),
1526                    };
1527                    let end_offset = *offset + 1 + row.as_ref().len();
1528                    data[*offset] = sentinel;
1529                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1530                    *offset = end_offset;
1531                })
1532        }
1533        Encoder::List(rows) => match column.data_type() {
1534            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1535            DataType::LargeList(_) => {
1536                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1537            }
1538            DataType::FixedSizeList(_, _) => {
1539                encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1540            }
1541            _ => unreachable!(),
1542        },
1543        Encoder::RunEndEncoded(rows) => match column.data_type() {
1544            DataType::RunEndEncoded(r, _) => match r.data_type() {
1545                DataType::Int16 => {
1546                    run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1547                }
1548                DataType::Int32 => {
1549                    run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1550                }
1551                DataType::Int64 => {
1552                    run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1553                }
1554                _ => unreachable!("Unsupported run end index type: {r:?}"),
1555            },
1556            _ => unreachable!(),
1557        },
1558    }
1559}
1560
1561/// Encode dictionary values not preserving the dictionary encoding
1562pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1563    data: &mut [u8],
1564    offsets: &mut [usize],
1565    column: &DictionaryArray<K>,
1566    values: &Rows,
1567    null: &Row<'_>,
1568) {
1569    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1570        let row = match k {
1571            Some(k) => values.row(k.as_usize()).data,
1572            None => null.data,
1573        };
1574        let end_offset = *offset + row.len();
1575        data[*offset..end_offset].copy_from_slice(row);
1576        *offset = end_offset;
1577    }
1578}
1579
1580macro_rules! decode_primitive_helper {
1581    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1582        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1583    };
1584}
1585
1586/// Decodes a the provided `field` from `rows`
1587///
1588/// # Safety
1589///
1590/// Rows must contain valid data for the provided field
1591unsafe fn decode_column(
1592    field: &SortField,
1593    rows: &mut [&[u8]],
1594    codec: &Codec,
1595    validate_utf8: bool,
1596) -> Result<ArrayRef, ArrowError> {
1597    let options = field.options;
1598
1599    let array: ArrayRef = match codec {
1600        Codec::Stateless => {
1601            let data_type = field.data_type.clone();
1602            downcast_primitive! {
1603                data_type => (decode_primitive_helper, rows, data_type, options),
1604                DataType::Null => Arc::new(NullArray::new(rows.len())),
1605                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1606                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1607                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1608                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1609                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1610                DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1611                DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1612                DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1613                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1614            }
1615        }
1616        Codec::Dictionary(converter, _) => {
1617            let cols = converter.convert_raw(rows, validate_utf8)?;
1618            cols.into_iter().next().unwrap()
1619        }
1620        Codec::Struct(converter, _) => {
1621            let (null_count, nulls) = fixed::decode_nulls(rows);
1622            rows.iter_mut().for_each(|row| *row = &row[1..]);
1623            let children = converter.convert_raw(rows, validate_utf8)?;
1624
1625            let child_data = children.iter().map(|c| c.to_data()).collect();
1626            let builder = ArrayDataBuilder::new(field.data_type.clone())
1627                .len(rows.len())
1628                .null_count(null_count)
1629                .null_bit_buffer(Some(nulls))
1630                .child_data(child_data);
1631
1632            Arc::new(StructArray::from(builder.build_unchecked()))
1633        }
1634        Codec::List(converter) => match &field.data_type {
1635            DataType::List(_) => {
1636                Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1637            }
1638            DataType::LargeList(_) => {
1639                Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1640            }
1641            DataType::FixedSizeList(_, value_length) => Arc::new(list::decode_fixed_size_list(
1642                converter,
1643                rows,
1644                field,
1645                validate_utf8,
1646                value_length.as_usize(),
1647            )?),
1648            _ => unreachable!(),
1649        },
1650        Codec::RunEndEncoded(converter) => match &field.data_type {
1651            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1652                DataType::Int16 => Arc::new(run::decode::<Int16Type>(
1653                    converter,
1654                    rows,
1655                    field,
1656                    validate_utf8,
1657                )?),
1658                DataType::Int32 => Arc::new(run::decode::<Int32Type>(
1659                    converter,
1660                    rows,
1661                    field,
1662                    validate_utf8,
1663                )?),
1664                DataType::Int64 => Arc::new(run::decode::<Int64Type>(
1665                    converter,
1666                    rows,
1667                    field,
1668                    validate_utf8,
1669                )?),
1670                _ => unreachable!(),
1671            },
1672            _ => unreachable!(),
1673        },
1674    };
1675    Ok(array)
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680    use rand::distr::uniform::SampleUniform;
1681    use rand::distr::{Distribution, StandardUniform};
1682    use rand::{rng, Rng};
1683
1684    use arrow_array::builder::*;
1685    use arrow_array::types::*;
1686    use arrow_array::*;
1687    use arrow_buffer::{i256, NullBuffer};
1688    use arrow_buffer::{Buffer, OffsetBuffer};
1689    use arrow_cast::display::{ArrayFormatter, FormatOptions};
1690    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1691
1692    use super::*;
1693
1694    #[test]
1695    fn test_fixed_width() {
1696        let cols = [
1697            Arc::new(Int16Array::from_iter([
1698                Some(1),
1699                Some(2),
1700                None,
1701                Some(-5),
1702                Some(2),
1703                Some(2),
1704                Some(0),
1705            ])) as ArrayRef,
1706            Arc::new(Float32Array::from_iter([
1707                Some(1.3),
1708                Some(2.5),
1709                None,
1710                Some(4.),
1711                Some(0.1),
1712                Some(-4.),
1713                Some(-0.),
1714            ])) as ArrayRef,
1715        ];
1716
1717        let converter = RowConverter::new(vec![
1718            SortField::new(DataType::Int16),
1719            SortField::new(DataType::Float32),
1720        ])
1721        .unwrap();
1722        let rows = converter.convert_columns(&cols).unwrap();
1723
1724        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1725        assert_eq!(
1726            rows.buffer,
1727            &[
1728                1, 128, 1, //
1729                1, 191, 166, 102, 102, //
1730                1, 128, 2, //
1731                1, 192, 32, 0, 0, //
1732                0, 0, 0, //
1733                0, 0, 0, 0, 0, //
1734                1, 127, 251, //
1735                1, 192, 128, 0, 0, //
1736                1, 128, 2, //
1737                1, 189, 204, 204, 205, //
1738                1, 128, 2, //
1739                1, 63, 127, 255, 255, //
1740                1, 128, 0, //
1741                1, 127, 255, 255, 255 //
1742            ]
1743        );
1744
1745        assert!(rows.row(3) < rows.row(6));
1746        assert!(rows.row(0) < rows.row(1));
1747        assert!(rows.row(3) < rows.row(0));
1748        assert!(rows.row(4) < rows.row(1));
1749        assert!(rows.row(5) < rows.row(4));
1750
1751        let back = converter.convert_rows(&rows).unwrap();
1752        for (expected, actual) in cols.iter().zip(&back) {
1753            assert_eq!(expected, actual);
1754        }
1755    }
1756
1757    #[test]
1758    fn test_decimal128() {
1759        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1760            DECIMAL128_MAX_PRECISION,
1761            7,
1762        ))])
1763        .unwrap();
1764        let col = Arc::new(
1765            Decimal128Array::from_iter([
1766                None,
1767                Some(i128::MIN),
1768                Some(-13),
1769                Some(46_i128),
1770                Some(5456_i128),
1771                Some(i128::MAX),
1772            ])
1773            .with_precision_and_scale(38, 7)
1774            .unwrap(),
1775        ) as ArrayRef;
1776
1777        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1778        for i in 0..rows.num_rows() - 1 {
1779            assert!(rows.row(i) < rows.row(i + 1));
1780        }
1781
1782        let back = converter.convert_rows(&rows).unwrap();
1783        assert_eq!(back.len(), 1);
1784        assert_eq!(col.as_ref(), back[0].as_ref())
1785    }
1786
1787    #[test]
1788    fn test_decimal256() {
1789        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1790            DECIMAL256_MAX_PRECISION,
1791            7,
1792        ))])
1793        .unwrap();
1794        let col = Arc::new(
1795            Decimal256Array::from_iter([
1796                None,
1797                Some(i256::MIN),
1798                Some(i256::from_parts(0, -1)),
1799                Some(i256::from_parts(u128::MAX, -1)),
1800                Some(i256::from_parts(u128::MAX, 0)),
1801                Some(i256::from_parts(0, 46_i128)),
1802                Some(i256::from_parts(5, 46_i128)),
1803                Some(i256::MAX),
1804            ])
1805            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1806            .unwrap(),
1807        ) as ArrayRef;
1808
1809        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1810        for i in 0..rows.num_rows() - 1 {
1811            assert!(rows.row(i) < rows.row(i + 1));
1812        }
1813
1814        let back = converter.convert_rows(&rows).unwrap();
1815        assert_eq!(back.len(), 1);
1816        assert_eq!(col.as_ref(), back[0].as_ref())
1817    }
1818
1819    #[test]
1820    fn test_bool() {
1821        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1822
1823        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1824
1825        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1826        assert!(rows.row(2) > rows.row(1));
1827        assert!(rows.row(2) > rows.row(0));
1828        assert!(rows.row(1) > rows.row(0));
1829
1830        let cols = converter.convert_rows(&rows).unwrap();
1831        assert_eq!(&cols[0], &col);
1832
1833        let converter = RowConverter::new(vec![SortField::new_with_options(
1834            DataType::Boolean,
1835            SortOptions::default().desc().with_nulls_first(false),
1836        )])
1837        .unwrap();
1838
1839        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1840        assert!(rows.row(2) < rows.row(1));
1841        assert!(rows.row(2) < rows.row(0));
1842        assert!(rows.row(1) < rows.row(0));
1843        let cols = converter.convert_rows(&rows).unwrap();
1844        assert_eq!(&cols[0], &col);
1845    }
1846
1847    #[test]
1848    fn test_timezone() {
1849        let a =
1850            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1851        let d = a.data_type().clone();
1852
1853        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1854        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1855        let back = converter.convert_rows(&rows).unwrap();
1856        assert_eq!(back.len(), 1);
1857        assert_eq!(back[0].data_type(), &d);
1858
1859        // Test dictionary
1860        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1861        a.append(34).unwrap();
1862        a.append_null();
1863        a.append(345).unwrap();
1864
1865        // Construct dictionary with a timezone
1866        let dict = a.finish();
1867        let values = TimestampNanosecondArray::from(dict.values().to_data());
1868        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1869        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1870        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1871
1872        assert_eq!(dict_with_tz.data_type(), &d);
1873        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1874        let rows = converter
1875            .convert_columns(&[Arc::new(dict_with_tz) as _])
1876            .unwrap();
1877        let back = converter.convert_rows(&rows).unwrap();
1878        assert_eq!(back.len(), 1);
1879        assert_eq!(back[0].data_type(), &v);
1880    }
1881
1882    #[test]
1883    fn test_null_encoding() {
1884        let col = Arc::new(NullArray::new(10));
1885        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1886        let rows = converter.convert_columns(&[col]).unwrap();
1887        assert_eq!(rows.num_rows(), 10);
1888        assert_eq!(rows.row(1).data.len(), 0);
1889    }
1890
1891    #[test]
1892    fn test_variable_width() {
1893        let col = Arc::new(StringArray::from_iter([
1894            Some("hello"),
1895            Some("he"),
1896            None,
1897            Some("foo"),
1898            Some(""),
1899        ])) as ArrayRef;
1900
1901        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1902        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1903
1904        assert!(rows.row(1) < rows.row(0));
1905        assert!(rows.row(2) < rows.row(4));
1906        assert!(rows.row(3) < rows.row(0));
1907        assert!(rows.row(3) < rows.row(1));
1908
1909        let cols = converter.convert_rows(&rows).unwrap();
1910        assert_eq!(&cols[0], &col);
1911
1912        let col = Arc::new(BinaryArray::from_iter([
1913            None,
1914            Some(vec![0_u8; 0]),
1915            Some(vec![0_u8; 6]),
1916            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1917            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1918            Some(vec![0_u8; variable::BLOCK_SIZE]),
1919            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1920            Some(vec![1_u8; 6]),
1921            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1922            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1923            Some(vec![1_u8; variable::BLOCK_SIZE]),
1924            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1925            Some(vec![0xFF_u8; 6]),
1926            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1927            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1928            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1929            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1930        ])) as ArrayRef;
1931
1932        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1933        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1934
1935        for i in 0..rows.num_rows() {
1936            for j in i + 1..rows.num_rows() {
1937                assert!(
1938                    rows.row(i) < rows.row(j),
1939                    "{} < {} - {:?} < {:?}",
1940                    i,
1941                    j,
1942                    rows.row(i),
1943                    rows.row(j)
1944                );
1945            }
1946        }
1947
1948        let cols = converter.convert_rows(&rows).unwrap();
1949        assert_eq!(&cols[0], &col);
1950
1951        let converter = RowConverter::new(vec![SortField::new_with_options(
1952            DataType::Binary,
1953            SortOptions::default().desc().with_nulls_first(false),
1954        )])
1955        .unwrap();
1956        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1957
1958        for i in 0..rows.num_rows() {
1959            for j in i + 1..rows.num_rows() {
1960                assert!(
1961                    rows.row(i) > rows.row(j),
1962                    "{} > {} - {:?} > {:?}",
1963                    i,
1964                    j,
1965                    rows.row(i),
1966                    rows.row(j)
1967                );
1968            }
1969        }
1970
1971        let cols = converter.convert_rows(&rows).unwrap();
1972        assert_eq!(&cols[0], &col);
1973    }
1974
1975    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
1976    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1977        match b.data_type() {
1978            DataType::Dictionary(_, v) => {
1979                assert_eq!(a.data_type(), v.as_ref());
1980                let b = arrow_cast::cast(b, v).unwrap();
1981                assert_eq!(a, b.as_ref())
1982            }
1983            _ => assert_eq!(a, b),
1984        }
1985    }
1986
1987    #[test]
1988    fn test_string_dictionary() {
1989        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1990            Some("foo"),
1991            Some("hello"),
1992            Some("he"),
1993            None,
1994            Some("hello"),
1995            Some(""),
1996            Some("hello"),
1997            Some("hello"),
1998        ])) as ArrayRef;
1999
2000        let field = SortField::new(a.data_type().clone());
2001        let converter = RowConverter::new(vec![field]).unwrap();
2002        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2003
2004        assert!(rows_a.row(3) < rows_a.row(5));
2005        assert!(rows_a.row(2) < rows_a.row(1));
2006        assert!(rows_a.row(0) < rows_a.row(1));
2007        assert!(rows_a.row(3) < rows_a.row(0));
2008
2009        assert_eq!(rows_a.row(1), rows_a.row(4));
2010        assert_eq!(rows_a.row(1), rows_a.row(6));
2011        assert_eq!(rows_a.row(1), rows_a.row(7));
2012
2013        let cols = converter.convert_rows(&rows_a).unwrap();
2014        dictionary_eq(&cols[0], &a);
2015
2016        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2017            Some("hello"),
2018            None,
2019            Some("cupcakes"),
2020        ])) as ArrayRef;
2021
2022        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2023        assert_eq!(rows_a.row(1), rows_b.row(0));
2024        assert_eq!(rows_a.row(3), rows_b.row(1));
2025        assert!(rows_b.row(2) < rows_a.row(0));
2026
2027        let cols = converter.convert_rows(&rows_b).unwrap();
2028        dictionary_eq(&cols[0], &b);
2029
2030        let converter = RowConverter::new(vec![SortField::new_with_options(
2031            a.data_type().clone(),
2032            SortOptions::default().desc().with_nulls_first(false),
2033        )])
2034        .unwrap();
2035
2036        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2037        assert!(rows_c.row(3) > rows_c.row(5));
2038        assert!(rows_c.row(2) > rows_c.row(1));
2039        assert!(rows_c.row(0) > rows_c.row(1));
2040        assert!(rows_c.row(3) > rows_c.row(0));
2041
2042        let cols = converter.convert_rows(&rows_c).unwrap();
2043        dictionary_eq(&cols[0], &a);
2044
2045        let converter = RowConverter::new(vec![SortField::new_with_options(
2046            a.data_type().clone(),
2047            SortOptions::default().desc().with_nulls_first(true),
2048        )])
2049        .unwrap();
2050
2051        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2052        assert!(rows_c.row(3) < rows_c.row(5));
2053        assert!(rows_c.row(2) > rows_c.row(1));
2054        assert!(rows_c.row(0) > rows_c.row(1));
2055        assert!(rows_c.row(3) < rows_c.row(0));
2056
2057        let cols = converter.convert_rows(&rows_c).unwrap();
2058        dictionary_eq(&cols[0], &a);
2059    }
2060
2061    #[test]
2062    fn test_struct() {
2063        // Test basic
2064        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2065        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2066        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2067        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2068        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2069
2070        let sort_fields = vec![SortField::new(s1.data_type().clone())];
2071        let converter = RowConverter::new(sort_fields).unwrap();
2072        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2073
2074        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2075            assert!(a < b);
2076        }
2077
2078        let back = converter.convert_rows(&r1).unwrap();
2079        assert_eq!(back.len(), 1);
2080        assert_eq!(&back[0], &s1);
2081
2082        // Test struct nullability
2083        let data = s1
2084            .to_data()
2085            .into_builder()
2086            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2087            .null_count(2)
2088            .build()
2089            .unwrap();
2090
2091        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2092        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2093        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
2094        assert!(r2.row(0) < r2.row(1)); // Nulls first
2095        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
2096        assert_eq!(r1.row(1), r2.row(1)); // Values equal
2097
2098        let back = converter.convert_rows(&r2).unwrap();
2099        assert_eq!(back.len(), 1);
2100        assert_eq!(&back[0], &s2);
2101
2102        back[0].to_data().validate_full().unwrap();
2103    }
2104
2105    #[test]
2106    fn test_primitive_dictionary() {
2107        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2108        builder.append(2).unwrap();
2109        builder.append(3).unwrap();
2110        builder.append(0).unwrap();
2111        builder.append_null();
2112        builder.append(5).unwrap();
2113        builder.append(3).unwrap();
2114        builder.append(-1).unwrap();
2115
2116        let a = builder.finish();
2117        let data_type = a.data_type().clone();
2118        let columns = [Arc::new(a) as ArrayRef];
2119
2120        let field = SortField::new(data_type.clone());
2121        let converter = RowConverter::new(vec![field]).unwrap();
2122        let rows = converter.convert_columns(&columns).unwrap();
2123        assert!(rows.row(0) < rows.row(1));
2124        assert!(rows.row(2) < rows.row(0));
2125        assert!(rows.row(3) < rows.row(2));
2126        assert!(rows.row(6) < rows.row(2));
2127        assert!(rows.row(3) < rows.row(6));
2128    }
2129
2130    #[test]
2131    fn test_dictionary_nulls() {
2132        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2133        let keys =
2134            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2135
2136        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2137        let data = keys
2138            .into_builder()
2139            .data_type(data_type.clone())
2140            .child_data(vec![values])
2141            .build()
2142            .unwrap();
2143
2144        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2145        let field = SortField::new(data_type.clone());
2146        let converter = RowConverter::new(vec![field]).unwrap();
2147        let rows = converter.convert_columns(&columns).unwrap();
2148
2149        assert_eq!(rows.row(0), rows.row(1));
2150        assert_eq!(rows.row(3), rows.row(4));
2151        assert_eq!(rows.row(4), rows.row(5));
2152        assert!(rows.row(3) < rows.row(0));
2153    }
2154
2155    #[test]
2156    #[should_panic(expected = "Encountered non UTF-8 data")]
2157    fn test_invalid_utf8() {
2158        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2159        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2160        let rows = converter.convert_columns(&[array]).unwrap();
2161        let binary_row = rows.row(0);
2162
2163        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2164        let parser = converter.parser();
2165        let utf8_row = parser.parse(binary_row.as_ref());
2166
2167        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2168    }
2169
2170    #[test]
2171    #[should_panic(expected = "Encountered non UTF-8 data")]
2172    fn test_invalid_utf8_array() {
2173        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2174        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2175        let rows = converter.convert_columns(&[array]).unwrap();
2176        let binary_rows = rows.try_into_binary().expect("known-small rows");
2177
2178        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2179        let parsed = converter.from_binary(binary_rows);
2180
2181        converter.convert_rows(parsed.iter()).unwrap();
2182    }
2183
2184    #[test]
2185    #[should_panic(expected = "index out of bounds")]
2186    fn test_invalid_empty() {
2187        let binary_row: &[u8] = &[];
2188
2189        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2190        let parser = converter.parser();
2191        let utf8_row = parser.parse(binary_row.as_ref());
2192
2193        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2194    }
2195
2196    #[test]
2197    #[should_panic(expected = "index out of bounds")]
2198    fn test_invalid_empty_array() {
2199        let row: &[u8] = &[];
2200        let binary_rows = BinaryArray::from(vec![row]);
2201
2202        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2203        let parsed = converter.from_binary(binary_rows);
2204
2205        converter.convert_rows(parsed.iter()).unwrap();
2206    }
2207
2208    #[test]
2209    #[should_panic(expected = "index out of bounds")]
2210    fn test_invalid_truncated() {
2211        let binary_row: &[u8] = &[0x02];
2212
2213        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2214        let parser = converter.parser();
2215        let utf8_row = parser.parse(binary_row.as_ref());
2216
2217        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2218    }
2219
2220    #[test]
2221    #[should_panic(expected = "index out of bounds")]
2222    fn test_invalid_truncated_array() {
2223        let row: &[u8] = &[0x02];
2224        let binary_rows = BinaryArray::from(vec![row]);
2225
2226        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2227        let parsed = converter.from_binary(binary_rows);
2228
2229        converter.convert_rows(parsed.iter()).unwrap();
2230    }
2231
2232    #[test]
2233    #[should_panic(expected = "rows were not produced by this RowConverter")]
2234    fn test_different_converter() {
2235        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2236        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2237        let rows = converter.convert_columns(&[values]).unwrap();
2238
2239        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2240        let _ = converter.convert_rows(&rows);
2241    }
2242
2243    fn test_single_list<O: OffsetSizeTrait>() {
2244        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2245        builder.values().append_value(32);
2246        builder.values().append_value(52);
2247        builder.values().append_value(32);
2248        builder.append(true);
2249        builder.values().append_value(32);
2250        builder.values().append_value(52);
2251        builder.values().append_value(12);
2252        builder.append(true);
2253        builder.values().append_value(32);
2254        builder.values().append_value(52);
2255        builder.append(true);
2256        builder.values().append_value(32); // MASKED
2257        builder.values().append_value(52); // MASKED
2258        builder.append(false);
2259        builder.values().append_value(32);
2260        builder.values().append_null();
2261        builder.append(true);
2262        builder.append(true);
2263        builder.values().append_value(17); // MASKED
2264        builder.values().append_null(); // MASKED
2265        builder.append(false);
2266
2267        let list = Arc::new(builder.finish()) as ArrayRef;
2268        let d = list.data_type().clone();
2269
2270        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2271
2272        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2273        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2274        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2275        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2276        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2277        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2278        assert!(rows.row(3) < rows.row(5)); // null < []
2279        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2280
2281        let back = converter.convert_rows(&rows).unwrap();
2282        assert_eq!(back.len(), 1);
2283        back[0].to_data().validate_full().unwrap();
2284        assert_eq!(&back[0], &list);
2285
2286        let options = SortOptions::default().asc().with_nulls_first(false);
2287        let field = SortField::new_with_options(d.clone(), options);
2288        let converter = RowConverter::new(vec![field]).unwrap();
2289        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2290
2291        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2292        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2293        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2294        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2295        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2296        assert!(rows.row(3) > rows.row(5)); // null > []
2297        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2298
2299        let back = converter.convert_rows(&rows).unwrap();
2300        assert_eq!(back.len(), 1);
2301        back[0].to_data().validate_full().unwrap();
2302        assert_eq!(&back[0], &list);
2303
2304        let options = SortOptions::default().desc().with_nulls_first(false);
2305        let field = SortField::new_with_options(d.clone(), options);
2306        let converter = RowConverter::new(vec![field]).unwrap();
2307        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2308
2309        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2310        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2311        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2312        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2313        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2314        assert!(rows.row(3) > rows.row(5)); // null > []
2315        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2316
2317        let back = converter.convert_rows(&rows).unwrap();
2318        assert_eq!(back.len(), 1);
2319        back[0].to_data().validate_full().unwrap();
2320        assert_eq!(&back[0], &list);
2321
2322        let options = SortOptions::default().desc().with_nulls_first(true);
2323        let field = SortField::new_with_options(d, options);
2324        let converter = RowConverter::new(vec![field]).unwrap();
2325        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2326
2327        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2328        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2329        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2330        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2331        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2332        assert!(rows.row(3) < rows.row(5)); // null < []
2333        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2334
2335        let back = converter.convert_rows(&rows).unwrap();
2336        assert_eq!(back.len(), 1);
2337        back[0].to_data().validate_full().unwrap();
2338        assert_eq!(&back[0], &list);
2339    }
2340
2341    fn test_nested_list<O: OffsetSizeTrait>() {
2342        let mut builder =
2343            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2344
2345        builder.values().values().append_value(1);
2346        builder.values().values().append_value(2);
2347        builder.values().append(true);
2348        builder.values().values().append_value(1);
2349        builder.values().values().append_null();
2350        builder.values().append(true);
2351        builder.append(true);
2352
2353        builder.values().values().append_value(1);
2354        builder.values().values().append_null();
2355        builder.values().append(true);
2356        builder.values().values().append_value(1);
2357        builder.values().values().append_null();
2358        builder.values().append(true);
2359        builder.append(true);
2360
2361        builder.values().values().append_value(1);
2362        builder.values().values().append_null();
2363        builder.values().append(true);
2364        builder.values().append(false);
2365        builder.append(true);
2366        builder.append(false);
2367
2368        builder.values().values().append_value(1);
2369        builder.values().values().append_value(2);
2370        builder.values().append(true);
2371        builder.append(true);
2372
2373        let list = Arc::new(builder.finish()) as ArrayRef;
2374        let d = list.data_type().clone();
2375
2376        // [
2377        //   [[1, 2], [1, null]],
2378        //   [[1, null], [1, null]],
2379        //   [[1, null], null]
2380        //   null
2381        //   [[1, 2]]
2382        // ]
2383        let options = SortOptions::default().asc().with_nulls_first(true);
2384        let field = SortField::new_with_options(d.clone(), options);
2385        let converter = RowConverter::new(vec![field]).unwrap();
2386        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2387
2388        assert!(rows.row(0) > rows.row(1));
2389        assert!(rows.row(1) > rows.row(2));
2390        assert!(rows.row(2) > rows.row(3));
2391        assert!(rows.row(4) < rows.row(0));
2392        assert!(rows.row(4) > rows.row(1));
2393
2394        let back = converter.convert_rows(&rows).unwrap();
2395        assert_eq!(back.len(), 1);
2396        back[0].to_data().validate_full().unwrap();
2397        assert_eq!(&back[0], &list);
2398
2399        let options = SortOptions::default().desc().with_nulls_first(true);
2400        let field = SortField::new_with_options(d.clone(), options);
2401        let converter = RowConverter::new(vec![field]).unwrap();
2402        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2403
2404        assert!(rows.row(0) > rows.row(1));
2405        assert!(rows.row(1) > rows.row(2));
2406        assert!(rows.row(2) > rows.row(3));
2407        assert!(rows.row(4) > rows.row(0));
2408        assert!(rows.row(4) > rows.row(1));
2409
2410        let back = converter.convert_rows(&rows).unwrap();
2411        assert_eq!(back.len(), 1);
2412        back[0].to_data().validate_full().unwrap();
2413        assert_eq!(&back[0], &list);
2414
2415        let options = SortOptions::default().desc().with_nulls_first(false);
2416        let field = SortField::new_with_options(d, options);
2417        let converter = RowConverter::new(vec![field]).unwrap();
2418        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2419
2420        assert!(rows.row(0) < rows.row(1));
2421        assert!(rows.row(1) < rows.row(2));
2422        assert!(rows.row(2) < rows.row(3));
2423        assert!(rows.row(4) > rows.row(0));
2424        assert!(rows.row(4) < rows.row(1));
2425
2426        let back = converter.convert_rows(&rows).unwrap();
2427        assert_eq!(back.len(), 1);
2428        back[0].to_data().validate_full().unwrap();
2429        assert_eq!(&back[0], &list);
2430    }
2431
2432    #[test]
2433    fn test_list() {
2434        test_single_list::<i32>();
2435        test_nested_list::<i32>();
2436    }
2437
2438    #[test]
2439    fn test_large_list() {
2440        test_single_list::<i64>();
2441        test_nested_list::<i64>();
2442    }
2443
2444    #[test]
2445    fn test_fixed_size_list() {
2446        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
2447        builder.values().append_value(32);
2448        builder.values().append_value(52);
2449        builder.values().append_value(32);
2450        builder.append(true);
2451        builder.values().append_value(32);
2452        builder.values().append_value(52);
2453        builder.values().append_value(12);
2454        builder.append(true);
2455        builder.values().append_value(32);
2456        builder.values().append_value(52);
2457        builder.values().append_null();
2458        builder.append(true);
2459        builder.values().append_value(32); // MASKED
2460        builder.values().append_value(52); // MASKED
2461        builder.values().append_value(13); // MASKED
2462        builder.append(false);
2463        builder.values().append_value(32);
2464        builder.values().append_null();
2465        builder.values().append_null();
2466        builder.append(true);
2467        builder.values().append_null();
2468        builder.values().append_null();
2469        builder.values().append_null();
2470        builder.append(true);
2471        builder.values().append_value(17); // MASKED
2472        builder.values().append_null(); // MASKED
2473        builder.values().append_value(77); // MASKED
2474        builder.append(false);
2475
2476        let list = Arc::new(builder.finish()) as ArrayRef;
2477        let d = list.data_type().clone();
2478
2479        // Default sorting (ascending, nulls first)
2480        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2481
2482        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2483        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2484        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12]
2485        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
2486        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
2487        assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null]
2488        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
2489        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2490
2491        let back = converter.convert_rows(&rows).unwrap();
2492        assert_eq!(back.len(), 1);
2493        back[0].to_data().validate_full().unwrap();
2494        assert_eq!(&back[0], &list);
2495
2496        // Ascending, null last
2497        let options = SortOptions::default().asc().with_nulls_first(false);
2498        let field = SortField::new_with_options(d.clone(), options);
2499        let converter = RowConverter::new(vec![field]).unwrap();
2500        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2501        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2502        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
2503        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
2504        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
2505        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
2506        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
2507        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2508
2509        let back = converter.convert_rows(&rows).unwrap();
2510        assert_eq!(back.len(), 1);
2511        back[0].to_data().validate_full().unwrap();
2512        assert_eq!(&back[0], &list);
2513
2514        // Descending, nulls last
2515        let options = SortOptions::default().desc().with_nulls_first(false);
2516        let field = SortField::new_with_options(d.clone(), options);
2517        let converter = RowConverter::new(vec![field]).unwrap();
2518        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2519        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2520        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
2521        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
2522        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
2523        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
2524        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
2525        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2526
2527        let back = converter.convert_rows(&rows).unwrap();
2528        assert_eq!(back.len(), 1);
2529        back[0].to_data().validate_full().unwrap();
2530        assert_eq!(&back[0], &list);
2531
2532        // Descending, nulls first
2533        let options = SortOptions::default().desc().with_nulls_first(true);
2534        let field = SortField::new_with_options(d, options);
2535        let converter = RowConverter::new(vec![field]).unwrap();
2536        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2537
2538        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2539        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12]
2540        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
2541        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
2542        assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null]
2543        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
2544        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2545
2546        let back = converter.convert_rows(&rows).unwrap();
2547        assert_eq!(back.len(), 1);
2548        back[0].to_data().validate_full().unwrap();
2549        assert_eq!(&back[0], &list);
2550    }
2551
2552    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2553    where
2554        K: ArrowPrimitiveType,
2555        StandardUniform: Distribution<K::Native>,
2556    {
2557        let mut rng = rng();
2558        (0..len)
2559            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2560            .collect()
2561    }
2562
2563    fn generate_strings<O: OffsetSizeTrait>(
2564        len: usize,
2565        valid_percent: f64,
2566    ) -> GenericStringArray<O> {
2567        let mut rng = rng();
2568        (0..len)
2569            .map(|_| {
2570                rng.random_bool(valid_percent).then(|| {
2571                    let len = rng.random_range(0..100);
2572                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2573                    String::from_utf8(bytes).unwrap()
2574                })
2575            })
2576            .collect()
2577    }
2578
2579    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2580        let mut rng = rng();
2581        (0..len)
2582            .map(|_| {
2583                rng.random_bool(valid_percent).then(|| {
2584                    let len = rng.random_range(0..100);
2585                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2586                    String::from_utf8(bytes).unwrap()
2587                })
2588            })
2589            .collect()
2590    }
2591
2592    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2593        let mut rng = rng();
2594        (0..len)
2595            .map(|_| {
2596                rng.random_bool(valid_percent).then(|| {
2597                    let len = rng.random_range(0..100);
2598                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2599                    bytes
2600                })
2601            })
2602            .collect()
2603    }
2604
2605    fn generate_dictionary<K>(
2606        values: ArrayRef,
2607        len: usize,
2608        valid_percent: f64,
2609    ) -> DictionaryArray<K>
2610    where
2611        K: ArrowDictionaryKeyType,
2612        K::Native: SampleUniform,
2613    {
2614        let mut rng = rng();
2615        let min_key = K::Native::from_usize(0).unwrap();
2616        let max_key = K::Native::from_usize(values.len()).unwrap();
2617        let keys: PrimitiveArray<K> = (0..len)
2618            .map(|_| {
2619                rng.random_bool(valid_percent)
2620                    .then(|| rng.random_range(min_key..max_key))
2621            })
2622            .collect();
2623
2624        let data_type =
2625            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2626
2627        let data = keys
2628            .into_data()
2629            .into_builder()
2630            .data_type(data_type)
2631            .add_child_data(values.to_data())
2632            .build()
2633            .unwrap();
2634
2635        DictionaryArray::from(data)
2636    }
2637
2638    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2639        let mut rng = rng();
2640        let width = rng.random_range(0..20);
2641        let mut builder = FixedSizeBinaryBuilder::new(width);
2642
2643        let mut b = vec![0; width as usize];
2644        for _ in 0..len {
2645            match rng.random_bool(valid_percent) {
2646                true => {
2647                    b.iter_mut().for_each(|x| *x = rng.random());
2648                    builder.append_value(&b).unwrap();
2649                }
2650                false => builder.append_null(),
2651            }
2652        }
2653
2654        builder.finish()
2655    }
2656
2657    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2658        let mut rng = rng();
2659        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2660        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2661        let b = generate_strings::<i32>(len, valid_percent);
2662        let fields = Fields::from(vec![
2663            Field::new("a", DataType::Int32, true),
2664            Field::new("b", DataType::Utf8, true),
2665        ]);
2666        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2667        StructArray::new(fields, values, Some(nulls))
2668    }
2669
2670    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2671    where
2672        F: FnOnce(usize) -> ArrayRef,
2673    {
2674        let mut rng = rng();
2675        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2676        let values_len = offsets.last().unwrap().to_usize().unwrap();
2677        let values = values(values_len);
2678        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2679        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2680        ListArray::new(field, offsets, values, Some(nulls))
2681    }
2682
2683    fn generate_column(len: usize) -> ArrayRef {
2684        let mut rng = rng();
2685        match rng.random_range(0..16) {
2686            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2687            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2688            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2689            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2690            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2691            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2692            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2693            7 => Arc::new(generate_dictionary::<Int64Type>(
2694                // Cannot test dictionaries containing null values because of #2687
2695                Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2696                len,
2697                0.8,
2698            )),
2699            8 => Arc::new(generate_dictionary::<Int64Type>(
2700                // Cannot test dictionaries containing null values because of #2687
2701                Arc::new(generate_primitive_array::<Int64Type>(
2702                    rng.random_range(1..len),
2703                    1.0,
2704                )),
2705                len,
2706                0.8,
2707            )),
2708            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2709            10 => Arc::new(generate_struct(len, 0.8)),
2710            11 => Arc::new(generate_list(len, 0.8, |values_len| {
2711                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2712            })),
2713            12 => Arc::new(generate_list(len, 0.8, |values_len| {
2714                Arc::new(generate_strings::<i32>(values_len, 0.8))
2715            })),
2716            13 => Arc::new(generate_list(len, 0.8, |values_len| {
2717                Arc::new(generate_struct(values_len, 0.8))
2718            })),
2719            14 => Arc::new(generate_string_view(len, 0.8)),
2720            15 => Arc::new(generate_byte_view(len, 0.8)),
2721            _ => unreachable!(),
2722        }
2723    }
2724
2725    fn print_row(cols: &[SortColumn], row: usize) -> String {
2726        let t: Vec<_> = cols
2727            .iter()
2728            .map(|x| match x.values.is_valid(row) {
2729                true => {
2730                    let opts = FormatOptions::default().with_null("NULL");
2731                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2732                    formatter.value(row).to_string()
2733                }
2734                false => "NULL".to_string(),
2735            })
2736            .collect();
2737        t.join(",")
2738    }
2739
2740    fn print_col_types(cols: &[SortColumn]) -> String {
2741        let t: Vec<_> = cols
2742            .iter()
2743            .map(|x| x.values.data_type().to_string())
2744            .collect();
2745        t.join(",")
2746    }
2747
2748    #[test]
2749    #[cfg_attr(miri, ignore)]
2750    fn fuzz_test() {
2751        for _ in 0..100 {
2752            let mut rng = rng();
2753            let num_columns = rng.random_range(1..5);
2754            let len = rng.random_range(5..100);
2755            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2756
2757            let options: Vec<_> = (0..num_columns)
2758                .map(|_| SortOptions {
2759                    descending: rng.random_bool(0.5),
2760                    nulls_first: rng.random_bool(0.5),
2761                })
2762                .collect();
2763
2764            let sort_columns: Vec<_> = options
2765                .iter()
2766                .zip(&arrays)
2767                .map(|(o, c)| SortColumn {
2768                    values: Arc::clone(c),
2769                    options: Some(*o),
2770                })
2771                .collect();
2772
2773            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2774
2775            let columns: Vec<SortField> = options
2776                .into_iter()
2777                .zip(&arrays)
2778                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2779                .collect();
2780
2781            let converter = RowConverter::new(columns).unwrap();
2782            let rows = converter.convert_columns(&arrays).unwrap();
2783
2784            for i in 0..len {
2785                for j in 0..len {
2786                    let row_i = rows.row(i);
2787                    let row_j = rows.row(j);
2788                    let row_cmp = row_i.cmp(&row_j);
2789                    let lex_cmp = comparator.compare(i, j);
2790                    assert_eq!(
2791                        row_cmp,
2792                        lex_cmp,
2793                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2794                        print_row(&sort_columns, i),
2795                        print_row(&sort_columns, j),
2796                        row_i,
2797                        row_j,
2798                        print_col_types(&sort_columns)
2799                    );
2800                }
2801            }
2802
2803            let back = converter.convert_rows(&rows).unwrap();
2804            for (actual, expected) in back.iter().zip(&arrays) {
2805                actual.to_data().validate_full().unwrap();
2806                dictionary_eq(actual, expected)
2807            }
2808
2809            // Check that we can convert
2810            let rows = rows.try_into_binary().expect("reasonable size");
2811            let parser = converter.parser();
2812            let back = converter
2813                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2814                .unwrap();
2815            for (actual, expected) in back.iter().zip(&arrays) {
2816                actual.to_data().validate_full().unwrap();
2817                dictionary_eq(actual, expected)
2818            }
2819
2820            let rows = converter.from_binary(rows);
2821            let back = converter.convert_rows(&rows).unwrap();
2822            for (actual, expected) in back.iter().zip(&arrays) {
2823                actual.to_data().validate_full().unwrap();
2824                dictionary_eq(actual, expected)
2825            }
2826        }
2827    }
2828
2829    #[test]
2830    fn test_clear() {
2831        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2832        let mut rows = converter.empty_rows(3, 128);
2833
2834        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2835        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2836        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2837
2838        for array in arrays.iter() {
2839            rows.clear();
2840            converter.append(&mut rows, &[array.clone()]).unwrap();
2841            let back = converter.convert_rows(&rows).unwrap();
2842            assert_eq!(&back[0], array);
2843        }
2844
2845        let mut rows_expected = converter.empty_rows(3, 128);
2846        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2847
2848        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2849            assert_eq!(
2850                actual, expected,
2851                "For row {i}: expected {expected:?}, actual: {actual:?}",
2852            );
2853        }
2854    }
2855
2856    #[test]
2857    fn test_append_codec_dictionary_binary() {
2858        use DataType::*;
2859        // Dictionary RowConverter
2860        let converter = RowConverter::new(vec![SortField::new(Dictionary(
2861            Box::new(Int32),
2862            Box::new(Binary),
2863        ))])
2864        .unwrap();
2865        let mut rows = converter.empty_rows(4, 128);
2866
2867        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2868        let values = BinaryArray::from(vec![
2869            Some("a".as_bytes()),
2870            Some(b"b"),
2871            Some(b"c"),
2872            Some(b"d"),
2873        ]);
2874        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2875
2876        rows.clear();
2877        let array = Arc::new(dict_array) as ArrayRef;
2878        converter.append(&mut rows, &[array.clone()]).unwrap();
2879        let back = converter.convert_rows(&rows).unwrap();
2880
2881        dictionary_eq(&back[0], &array);
2882    }
2883
2884    #[test]
2885    fn test_list_prefix() {
2886        let mut a = ListBuilder::new(Int8Builder::new());
2887        a.append_value([None]);
2888        a.append_value([None, None]);
2889        let a = a.finish();
2890
2891        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2892        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2893        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2894    }
2895}