arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexicographic Sorts (lexsort)
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! # Flattening Dictionaries
121//!
122//! For performance reasons, dictionary arrays are flattened ("hydrated") to their
123//! underlying values during row conversion. See [the issue] for more details.
124//!
125//! This means that the arrays that come out of [`RowConverter::convert_rows`]
126//! may not have the same data types as the input arrays. For example, encoding
127//! a `Dictionary<Int8, Utf8>` and then will come out as a `Utf8` array.
128//!
129//! ```
130//! # use arrow_array::{Array, ArrayRef, DictionaryArray};
131//! # use arrow_array::types::Int8Type;
132//! # use arrow_row::{RowConverter, SortField};
133//! # use arrow_schema::DataType;
134//! # use std::sync::Arc;
135//! // Input is a Dictionary array
136//! let dict: DictionaryArray::<Int8Type> = ["a", "b", "c", "a", "b"].into_iter().collect();
137//! let sort_fields = vec![SortField::new(dict.data_type().clone())];
138//! let arrays = vec![Arc::new(dict) as ArrayRef];
139//! let converter = RowConverter::new(sort_fields).unwrap();
140//! // Convert to rows
141//! let rows = converter.convert_columns(&arrays).unwrap();
142//! let converted = converter.convert_rows(&rows).unwrap();
143//! // result was a Utf8 array, not a Dictionary array
144//! assert_eq!(converted[0].data_type(), &DataType::Utf8);
145//! ```
146//!
147//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
148//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
149//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
150//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
151//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
152//! [compared]: PartialOrd
153//! [compare]: PartialOrd
154//! [the issue]: https://github.com/apache/arrow-rs/issues/4811
155
156#![doc(
157    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::ArrowDictionaryKeyType;
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
185///
186/// *Note: The encoding of the row format may change from release to release.*
187///
188/// ## Overview
189///
190/// The row format is a variable length byte sequence created by
191/// concatenating the encoded form of each column. The encoding for
192/// each column depends on its datatype (and sort options).
193///
194/// The encoding is carefully designed in such a way that escaping is
195/// unnecessary: it is never ambiguous as to whether a byte is part of
196/// a sentinel (e.g. null) or a value.
197///
198/// ## Unsigned Integer Encoding
199///
200/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
201/// to the integer's length.
202///
203/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
204/// integer.
205///
206/// ```text
207///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
208///    3          │03│00│00│00│      │01│00│00│00│03│
209///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
210///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
211///   258         │02│01│00│00│      │01│00│00│01│02│
212///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
213///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
214///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
215///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
216///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
217///  NULL         │??│??│??│??│      │00│00│00│00│00│
218///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
219///
220///              32-bit (4 bytes)        Row Format
221///  Value        Little Endian
222/// ```
223///
224/// ## Signed Integer Encoding
225///
226/// Signed integers have their most significant sign bit flipped, and are then encoded in the
227/// same manner as an unsigned integer.
228///
229/// ```text
230///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
231///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
232///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
233///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
234///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
235///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
236///
237///  Value  32-bit (4 bytes)    High bit flipped      Row Format
238///          Little Endian
239/// ```
240///
241/// ## Float Encoding
242///
243/// Floats are converted from IEEE 754 representation to a signed integer representation
244/// by flipping all bar the sign bit if they are negative.
245///
246/// They are then encoded in the same manner as a signed integer.
247///
248/// ## Fixed Length Bytes Encoding
249///
250/// Fixed length bytes are encoded in the same fashion as primitive types above.
251///
252/// For a fixed length array of length `n`:
253///
254/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
255///
256/// A valid value is encoded as `1_u8` followed by the value bytes
257///
258/// ## Variable Length Bytes (including Strings) Encoding
259///
260/// A null is encoded as a `0_u8`.
261///
262/// An empty byte array is encoded as `1_u8`.
263///
264/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
265/// encoded using a block based scheme described below.
266///
267/// The byte array is broken up into fixed-width blocks, each block is written in turn
268/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
269/// with `0_u8` and written to the output, followed by the un-padded length in bytes
270/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
271/// blocks using a length of 32, this is to reduce space amplification for small strings.
272///
273/// Note the following example encodings use a block size of 4 bytes for brevity:
274///
275/// ```text
276///                       ┌───┬───┬───┬───┬───┬───┐
277///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
278///                       └───┴───┴───┴───┴───┴───┘
279///
280///                       ┌───┐
281///  ""                   │01 |
282///                       └───┘
283///
284///  NULL                 ┌───┐
285///                       │00 │
286///                       └───┘
287///
288/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
289///                       │02 │'D'│'e'│'f'│'e'│FF │
290///                       └───┼───┼───┼───┼───┼───┤
291///                           │'n'│'e'│'s'│'t'│FF │
292///                           ├───┼───┼───┼───┼───┤
293///                           │'r'│'a'│'t'│'r'│FF │
294///                           ├───┼───┼───┼───┼───┤
295///                           │'a'│'t'│'i'│'o'│FF │
296///                           ├───┼───┼───┼───┼───┤
297///                           │'n'│00 │00 │00 │01 │
298///                           └───┴───┴───┴───┴───┘
299/// ```
300///
301/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
302/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
303///
304/// ## Dictionary Encoding
305///
306/// Dictionary encoded arrays are hydrated to their underlying values
307///
308/// ## REE Encoding
309///
310/// REE (Run End Encoding) arrays, A form of Run Length Encoding, are hydrated to their underlying values.
311///
312/// ## Struct Encoding
313///
314/// A null is encoded as a `0_u8`.
315///
316/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
317///
318/// This encoding effectively flattens the schema in a depth-first fashion.
319///
320/// For example
321///
322/// ```text
323/// ┌───────┬────────────────────────┬───────┐
324/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
325/// └───────┴────────────────────────┴───────┘
326/// ```
327///
328/// Is encoded as
329///
330/// ```text
331/// ┌───────┬───────────────┬───────┬─────────┬───────┐
332/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
333/// └───────┴───────────────┴───────┴─────────┴───────┘
334/// ```
335///
336/// ## List Encoding
337///
338/// Lists are encoded by first encoding all child elements to the row format.
339///
340/// A list value is then encoded as the concatenation of each of the child elements,
341/// separately encoded using the variable length encoding described above, followed
342/// by the variable length encoding of an empty byte array.
343///
344/// For example given:
345///
346/// ```text
347/// [1_u8, 2_u8, 3_u8]
348/// [1_u8, null]
349/// []
350/// null
351/// ```
352///
353/// The elements would be converted to:
354///
355/// ```text
356///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
357///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
358///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
359///```
360///
361/// Which would be encoded as
362///
363/// ```text
364///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
365///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
366///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
367///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
368///
369///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
370///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
371///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
372///                          └──── 1_u8 ────┘   └──── null ────┘
373///
374///```
375///
376/// With `[]` represented by an empty byte array, and `null` a null byte array.
377///
378/// ## Fixed Size List Encoding
379///
380/// Fixed Size Lists are encoded by first encoding all child elements to the row format.
381///
382/// A non-null list value is then encoded as 0x01 followed by the concatenation of each
383/// of the child elements. A null list value is encoded as a null marker.
384///
385/// For example given:
386///
387/// ```text
388/// [1_u8, 2_u8]
389/// [3_u8, null]
390/// null
391/// ```
392///
393/// The elements would be converted to:
394///
395/// ```text
396///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
397///  1  │01│01│  2  │01│02│  3  │01│03│  null  │00│00│
398///     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
399///```
400///
401/// Which would be encoded as
402///
403/// ```text
404///                 ┌──┬──┬──┬──┬──┐
405///  [1_u8, 2_u8]   │01│01│01│01│02│
406///                 └──┴──┴──┴──┴──┘
407///                     └ 1 ┘ └ 2 ┘
408///                 ┌──┬──┬──┬──┬──┐
409///  [3_u8, null]   │01│01│03│00│00│
410///                 └──┴──┴──┴──┴──┘
411///                     └ 1 ┘ └null┘
412///                 ┌──┐
413///  null           │00│
414///                 └──┘
415///
416///```
417///
418/// # Ordering
419///
420/// ## Float Ordering
421///
422/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
423/// in the IEEE 754 (2008 revision) floating point standard.
424///
425/// The ordering established by this does not always agree with the
426/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
427/// they consider negative and positive zero equal, while this does not
428///
429/// ## Null Ordering
430///
431/// The encoding described above will order nulls first, this can be inverted by representing
432/// nulls as `0xFF_u8` instead of `0_u8`
433///
434/// ## Reverse Column Ordering
435///
436/// The order of a given column can be reversed by negating the encoded bytes of non-null values
437///
438/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
439/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
440#[derive(Debug)]
441pub struct RowConverter {
442    fields: Arc<[SortField]>,
443    /// State for codecs
444    codecs: Vec<Codec>,
445}
446
447#[derive(Debug)]
448enum Codec {
449    /// No additional codec state is necessary
450    Stateless,
451    /// A row converter for the dictionary values
452    /// and the encoding of a row containing only nulls
453    Dictionary(RowConverter, OwnedRow),
454    /// A row converter for the child fields
455    /// and the encoding of a row containing only nulls
456    Struct(RowConverter, OwnedRow),
457    /// A row converter for the child field
458    List(RowConverter),
459    /// A row converter for the values array of a run-end encoded array
460    RunEndEncoded(RowConverter),
461}
462
463impl Codec {
464    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
465        match &sort_field.data_type {
466            DataType::Dictionary(_, values) => {
467                let sort_field =
468                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
469
470                let converter = RowConverter::new(vec![sort_field])?;
471                let null_array = new_null_array(values.as_ref(), 1);
472                let nulls = converter.convert_columns(&[null_array])?;
473
474                let owned = OwnedRow {
475                    data: nulls.buffer.into(),
476                    config: nulls.config,
477                };
478                Ok(Self::Dictionary(converter, owned))
479            }
480            DataType::RunEndEncoded(_, values) => {
481                // Similar to List implementation
482                let options = SortOptions {
483                    descending: false,
484                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
485                };
486
487                let field = SortField::new_with_options(values.data_type().clone(), options);
488                let converter = RowConverter::new(vec![field])?;
489                Ok(Self::RunEndEncoded(converter))
490            }
491            d if !d.is_nested() => Ok(Self::Stateless),
492            DataType::List(f) | DataType::LargeList(f) => {
493                // The encoded contents will be inverted if descending is set to true
494                // As such we set `descending` to false and negate nulls first if it
495                // it set to true
496                let options = SortOptions {
497                    descending: false,
498                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
499                };
500
501                let field = SortField::new_with_options(f.data_type().clone(), options);
502                let converter = RowConverter::new(vec![field])?;
503                Ok(Self::List(converter))
504            }
505            DataType::FixedSizeList(f, _) => {
506                let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
507                let converter = RowConverter::new(vec![field])?;
508                Ok(Self::List(converter))
509            }
510            DataType::Struct(f) => {
511                let sort_fields = f
512                    .iter()
513                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
514                    .collect();
515
516                let converter = RowConverter::new(sort_fields)?;
517                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
518
519                let nulls = converter.convert_columns(&nulls)?;
520                let owned = OwnedRow {
521                    data: nulls.buffer.into(),
522                    config: nulls.config,
523                };
524
525                Ok(Self::Struct(converter, owned))
526            }
527            _ => Err(ArrowError::NotYetImplemented(format!(
528                "not yet implemented: {:?}",
529                sort_field.data_type
530            ))),
531        }
532    }
533
534    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
535        match self {
536            Codec::Stateless => Ok(Encoder::Stateless),
537            Codec::Dictionary(converter, nulls) => {
538                let values = array.as_any_dictionary().values().clone();
539                let rows = converter.convert_columns(&[values])?;
540                Ok(Encoder::Dictionary(rows, nulls.row()))
541            }
542            Codec::Struct(converter, null) => {
543                let v = as_struct_array(array);
544                let rows = converter.convert_columns(v.columns())?;
545                Ok(Encoder::Struct(rows, null.row()))
546            }
547            Codec::List(converter) => {
548                let values = match array.data_type() {
549                    DataType::List(_) => {
550                        let list_array = as_list_array(array);
551                        let first_offset = list_array.offsets()[0] as usize;
552                        let last_offset =
553                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
554
555                        // values can include more data than referenced in the ListArray, only encode
556                        // the referenced values.
557                        list_array
558                            .values()
559                            .slice(first_offset, last_offset - first_offset)
560                    }
561                    DataType::LargeList(_) => {
562                        let list_array = as_large_list_array(array);
563
564                        let first_offset = list_array.offsets()[0] as usize;
565                        let last_offset =
566                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
567
568                        // values can include more data than referenced in the LargeListArray, only encode
569                        // the referenced values.
570                        list_array
571                            .values()
572                            .slice(first_offset, last_offset - first_offset)
573                    }
574                    DataType::FixedSizeList(_, _) => {
575                        as_fixed_size_list_array(array).values().clone()
576                    }
577                    _ => unreachable!(),
578                };
579                let rows = converter.convert_columns(&[values])?;
580                Ok(Encoder::List(rows))
581            }
582            Codec::RunEndEncoded(converter) => {
583                let values = match array.data_type() {
584                    DataType::RunEndEncoded(r, _) => match r.data_type() {
585                        DataType::Int16 => array.as_run::<Int16Type>().values(),
586                        DataType::Int32 => array.as_run::<Int32Type>().values(),
587                        DataType::Int64 => array.as_run::<Int64Type>().values(),
588                        _ => unreachable!("Unsupported run end index type: {r:?}"),
589                    },
590                    _ => unreachable!(),
591                };
592                let rows = converter.convert_columns(std::slice::from_ref(values))?;
593                Ok(Encoder::RunEndEncoded(rows))
594            }
595        }
596    }
597
598    fn size(&self) -> usize {
599        match self {
600            Codec::Stateless => 0,
601            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
602            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
603            Codec::List(converter) => converter.size(),
604            Codec::RunEndEncoded(converter) => converter.size(),
605        }
606    }
607}
608
609#[derive(Debug)]
610enum Encoder<'a> {
611    /// No additional encoder state is necessary
612    Stateless,
613    /// The encoding of the child array and the encoding of a null row
614    Dictionary(Rows, Row<'a>),
615    /// The row encoding of the child arrays and the encoding of a null row
616    ///
617    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
618    /// values that are masked by a null in the parent StructArray, otherwise
619    /// this would establish an ordering between semantically null values
620    Struct(Rows, Row<'a>),
621    /// The row encoding of the child array
622    List(Rows),
623    /// The row encoding of the values array
624    RunEndEncoded(Rows),
625}
626
627/// Configure the data type and sort order for a given column
628#[derive(Debug, Clone, PartialEq, Eq)]
629pub struct SortField {
630    /// Sort options
631    options: SortOptions,
632    /// Data type
633    data_type: DataType,
634}
635
636impl SortField {
637    /// Create a new column with the given data type
638    pub fn new(data_type: DataType) -> Self {
639        Self::new_with_options(data_type, Default::default())
640    }
641
642    /// Create a new column with the given data type and [`SortOptions`]
643    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
644        Self { options, data_type }
645    }
646
647    /// Return size of this instance in bytes.
648    ///
649    /// Includes the size of `Self`.
650    pub fn size(&self) -> usize {
651        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
652    }
653}
654
655impl RowConverter {
656    /// Create a new [`RowConverter`] with the provided schema
657    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
658        if !Self::supports_fields(&fields) {
659            return Err(ArrowError::NotYetImplemented(format!(
660                "Row format support not yet implemented for: {fields:?}"
661            )));
662        }
663
664        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
665        Ok(Self {
666            fields: fields.into(),
667            codecs,
668        })
669    }
670
671    /// Check if the given fields are supported by the row format.
672    pub fn supports_fields(fields: &[SortField]) -> bool {
673        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
674    }
675
676    fn supports_datatype(d: &DataType) -> bool {
677        match d {
678            _ if !d.is_nested() => true,
679            DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
680                Self::supports_datatype(f.data_type())
681            }
682            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
683            DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
684            _ => false,
685        }
686    }
687
688    /// Convert [`ArrayRef`] columns into [`Rows`]
689    ///
690    /// See [`Row`] for information on when [`Row`] can be compared
691    ///
692    /// See [`Self::convert_rows`] for converting [`Rows`] back into [`ArrayRef`]
693    ///
694    /// # Panics
695    ///
696    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
697    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
698        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
699        let mut rows = self.empty_rows(num_rows, 0);
700        self.append(&mut rows, columns)?;
701        Ok(rows)
702    }
703
704    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
705    ///
706    /// See [`Row`] for information on when [`Row`] can be compared
707    ///
708    /// # Panics
709    ///
710    /// Panics if
711    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
712    /// * The provided [`Rows`] were not created by this [`RowConverter`]
713    ///
714    /// ```
715    /// # use std::sync::Arc;
716    /// # use std::collections::HashSet;
717    /// # use arrow_array::cast::AsArray;
718    /// # use arrow_array::StringArray;
719    /// # use arrow_row::{Row, RowConverter, SortField};
720    /// # use arrow_schema::DataType;
721    /// #
722    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
723    /// let a1 = StringArray::from(vec!["hello", "world"]);
724    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
725    ///
726    /// let mut rows = converter.empty_rows(5, 128);
727    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
728    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
729    ///
730    /// let back = converter.convert_rows(&rows).unwrap();
731    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
732    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
733    /// ```
734    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
735        assert!(
736            Arc::ptr_eq(&rows.config.fields, &self.fields),
737            "rows were not produced by this RowConverter"
738        );
739
740        if columns.len() != self.fields.len() {
741            return Err(ArrowError::InvalidArgumentError(format!(
742                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
743                self.fields.len(),
744                columns.len()
745            )));
746        }
747        for colum in columns.iter().skip(1) {
748            if colum.len() != columns[0].len() {
749                return Err(ArrowError::InvalidArgumentError(format!(
750                    "RowConverter columns must all have the same length, expected {} got {}",
751                    columns[0].len(),
752                    colum.len()
753                )));
754            }
755        }
756
757        let encoders = columns
758            .iter()
759            .zip(&self.codecs)
760            .zip(self.fields.iter())
761            .map(|((column, codec), field)| {
762                if !column.data_type().equals_datatype(&field.data_type) {
763                    return Err(ArrowError::InvalidArgumentError(format!(
764                        "RowConverter column schema mismatch, expected {} got {}",
765                        field.data_type,
766                        column.data_type()
767                    )));
768                }
769                codec.encoder(column.as_ref())
770            })
771            .collect::<Result<Vec<_>, _>>()?;
772
773        let write_offset = rows.num_rows();
774        let lengths = row_lengths(columns, &encoders);
775        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
776        rows.buffer.resize(total, 0);
777
778        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
779            // We encode a column at a time to minimise dispatch overheads
780            encode_column(
781                &mut rows.buffer,
782                &mut rows.offsets[write_offset..],
783                column.as_ref(),
784                field.options,
785                &encoder,
786            )
787        }
788
789        if cfg!(debug_assertions) {
790            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
791            rows.offsets
792                .windows(2)
793                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
794        }
795
796        Ok(())
797    }
798
799    /// Convert [`Rows`] columns into [`ArrayRef`]
800    ///
801    /// See [`Self::convert_columns`] for converting [`ArrayRef`] into [`Rows`]
802    ///
803    /// # Panics
804    ///
805    /// Panics if the rows were not produced by this [`RowConverter`]
806    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
807    where
808        I: IntoIterator<Item = Row<'a>>,
809    {
810        let mut validate_utf8 = false;
811        let mut rows: Vec<_> = rows
812            .into_iter()
813            .map(|row| {
814                assert!(
815                    Arc::ptr_eq(&row.config.fields, &self.fields),
816                    "rows were not produced by this RowConverter"
817                );
818                validate_utf8 |= row.config.validate_utf8;
819                row.data
820            })
821            .collect();
822
823        // SAFETY
824        // We have validated that the rows came from this [`RowConverter`]
825        // and therefore must be valid
826        let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
827
828        if cfg!(test) {
829            for (i, row) in rows.iter().enumerate() {
830                if !row.is_empty() {
831                    return Err(ArrowError::InvalidArgumentError(format!(
832                        "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
833                        codecs = &self.codecs
834                    )));
835                }
836            }
837        }
838
839        Ok(result)
840    }
841
842    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
843    /// a total length of `data_capacity`
844    ///
845    /// This can be used to buffer a selection of [`Row`]
846    ///
847    /// ```
848    /// # use std::sync::Arc;
849    /// # use std::collections::HashSet;
850    /// # use arrow_array::cast::AsArray;
851    /// # use arrow_array::StringArray;
852    /// # use arrow_row::{Row, RowConverter, SortField};
853    /// # use arrow_schema::DataType;
854    /// #
855    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
856    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
857    ///
858    /// // Convert to row format and deduplicate
859    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
860    /// let mut distinct_rows = converter.empty_rows(3, 100);
861    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
862    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
863    ///
864    /// // Note: we could skip buffering and feed the filtered iterator directly
865    /// // into convert_rows, this is done for demonstration purposes only
866    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
867    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
868    /// assert_eq!(&values, &["hello", "world", "a"]);
869    /// ```
870    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
871        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
872        offsets.push(0);
873
874        Rows {
875            offsets,
876            buffer: Vec::with_capacity(data_capacity),
877            config: RowConfig {
878                fields: self.fields.clone(),
879                validate_utf8: false,
880            },
881        }
882    }
883
884    /// Create a new [Rows] instance from the given binary data.
885    ///
886    /// ```
887    /// # use std::sync::Arc;
888    /// # use std::collections::HashSet;
889    /// # use arrow_array::cast::AsArray;
890    /// # use arrow_array::StringArray;
891    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
892    /// # use arrow_schema::DataType;
893    /// #
894    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
895    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
896    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
897    ///
898    /// // We can convert rows into binary format and back in batch.
899    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
900    /// let binary = rows.try_into_binary().expect("known-small array");
901    /// let converted = converter.from_binary(binary.clone());
902    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
903    /// ```
904    ///
905    /// # Panics
906    ///
907    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
908    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
909    /// panic if the data is malformed.
910    pub fn from_binary(&self, array: BinaryArray) -> Rows {
911        assert_eq!(
912            array.null_count(),
913            0,
914            "can't construct Rows instance from array with nulls"
915        );
916        let (offsets, values, _) = array.into_parts();
917        let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
918        // Try zero-copy, if it does not succeed, fall back to copying the values.
919        let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
920        Rows {
921            buffer,
922            offsets,
923            config: RowConfig {
924                fields: Arc::clone(&self.fields),
925                validate_utf8: true,
926            },
927        }
928    }
929
930    /// Convert raw bytes into [`ArrayRef`]
931    ///
932    /// # Safety
933    ///
934    /// `rows` must contain valid data for this [`RowConverter`]
935    unsafe fn convert_raw(
936        &self,
937        rows: &mut [&[u8]],
938        validate_utf8: bool,
939    ) -> Result<Vec<ArrayRef>, ArrowError> {
940        self.fields
941            .iter()
942            .zip(&self.codecs)
943            .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
944            .collect()
945    }
946
947    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
948    pub fn parser(&self) -> RowParser {
949        RowParser::new(Arc::clone(&self.fields))
950    }
951
952    /// Returns the size of this instance in bytes
953    ///
954    /// Includes the size of `Self`.
955    pub fn size(&self) -> usize {
956        std::mem::size_of::<Self>()
957            + self.fields.iter().map(|x| x.size()).sum::<usize>()
958            + self.codecs.capacity() * std::mem::size_of::<Codec>()
959            + self.codecs.iter().map(Codec::size).sum::<usize>()
960    }
961}
962
963/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
964#[derive(Debug)]
965pub struct RowParser {
966    config: RowConfig,
967}
968
969impl RowParser {
970    fn new(fields: Arc<[SortField]>) -> Self {
971        Self {
972            config: RowConfig {
973                fields,
974                validate_utf8: true,
975            },
976        }
977    }
978
979    /// Creates a [`Row`] from the provided `bytes`.
980    ///
981    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
982    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
983    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
984        Row {
985            data: bytes,
986            config: &self.config,
987        }
988    }
989}
990
991/// The config of a given set of [`Row`]
992#[derive(Debug, Clone)]
993struct RowConfig {
994    /// The schema for these rows
995    fields: Arc<[SortField]>,
996    /// Whether to run UTF-8 validation when converting to arrow arrays
997    validate_utf8: bool,
998}
999
1000/// A row-oriented representation of arrow data, that is normalized for comparison.
1001///
1002/// See the [module level documentation](self) and [`RowConverter`] for more details.
1003#[derive(Debug)]
1004pub struct Rows {
1005    /// Underlying row bytes
1006    buffer: Vec<u8>,
1007    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
1008    offsets: Vec<usize>,
1009    /// The config for these rows
1010    config: RowConfig,
1011}
1012
1013impl Rows {
1014    /// Append a [`Row`] to this [`Rows`]
1015    pub fn push(&mut self, row: Row<'_>) {
1016        assert!(
1017            Arc::ptr_eq(&row.config.fields, &self.config.fields),
1018            "row was not produced by this RowConverter"
1019        );
1020        self.config.validate_utf8 |= row.config.validate_utf8;
1021        self.buffer.extend_from_slice(row.data);
1022        self.offsets.push(self.buffer.len())
1023    }
1024
1025    /// Returns the row at index `row`
1026    pub fn row(&self, row: usize) -> Row<'_> {
1027        assert!(row + 1 < self.offsets.len());
1028        unsafe { self.row_unchecked(row) }
1029    }
1030
1031    /// Returns the row at `index` without bounds checking
1032    ///
1033    /// # Safety
1034    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
1035    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1036        let end = unsafe { self.offsets.get_unchecked(index + 1) };
1037        let start = unsafe { self.offsets.get_unchecked(index) };
1038        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1039        Row {
1040            data,
1041            config: &self.config,
1042        }
1043    }
1044
1045    /// Sets the length of this [`Rows`] to 0
1046    pub fn clear(&mut self) {
1047        self.offsets.truncate(1);
1048        self.buffer.clear();
1049    }
1050
1051    /// Returns the number of [`Row`] in this [`Rows`]
1052    pub fn num_rows(&self) -> usize {
1053        self.offsets.len() - 1
1054    }
1055
1056    /// Returns an iterator over the [`Row`] in this [`Rows`]
1057    pub fn iter(&self) -> RowsIter<'_> {
1058        self.into_iter()
1059    }
1060
1061    /// Returns the size of this instance in bytes
1062    ///
1063    /// Includes the size of `Self`.
1064    pub fn size(&self) -> usize {
1065        // Size of fields is accounted for as part of RowConverter
1066        std::mem::size_of::<Self>()
1067            + self.buffer.len()
1068            + self.offsets.len() * std::mem::size_of::<usize>()
1069    }
1070
1071    /// Create a [BinaryArray] from the [Rows] data without reallocating the
1072    /// underlying bytes.
1073    ///
1074    ///
1075    /// ```
1076    /// # use std::sync::Arc;
1077    /// # use std::collections::HashSet;
1078    /// # use arrow_array::cast::AsArray;
1079    /// # use arrow_array::StringArray;
1080    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
1081    /// # use arrow_schema::DataType;
1082    /// #
1083    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1084    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1085    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1086    ///
1087    /// // We can convert rows into binary format and back.
1088    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1089    /// let binary = rows.try_into_binary().expect("known-small array");
1090    /// let parser = converter.parser();
1091    /// let parsed: Vec<OwnedRow> =
1092    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
1093    /// assert_eq!(values, parsed);
1094    /// ```
1095    ///
1096    /// # Errors
1097    ///
1098    /// This function will return an error if there is more data than can be stored in
1099    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
1100    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1101        if self.buffer.len() > i32::MAX as usize {
1102            return Err(ArrowError::InvalidArgumentError(format!(
1103                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1104                self.buffer.len()
1105            )));
1106        }
1107        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
1108        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1109        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
1110        let array = unsafe {
1111            BinaryArray::new_unchecked(
1112                OffsetBuffer::new_unchecked(offsets_scalar),
1113                Buffer::from_vec(self.buffer),
1114                None,
1115            )
1116        };
1117        Ok(array)
1118    }
1119}
1120
1121impl<'a> IntoIterator for &'a Rows {
1122    type Item = Row<'a>;
1123    type IntoIter = RowsIter<'a>;
1124
1125    fn into_iter(self) -> Self::IntoIter {
1126        RowsIter {
1127            rows: self,
1128            start: 0,
1129            end: self.num_rows(),
1130        }
1131    }
1132}
1133
1134/// An iterator over [`Rows`]
1135#[derive(Debug)]
1136pub struct RowsIter<'a> {
1137    rows: &'a Rows,
1138    start: usize,
1139    end: usize,
1140}
1141
1142impl<'a> Iterator for RowsIter<'a> {
1143    type Item = Row<'a>;
1144
1145    fn next(&mut self) -> Option<Self::Item> {
1146        if self.end == self.start {
1147            return None;
1148        }
1149
1150        // SAFETY: We have checked that `start` is less than `end`
1151        let row = unsafe { self.rows.row_unchecked(self.start) };
1152        self.start += 1;
1153        Some(row)
1154    }
1155
1156    fn size_hint(&self) -> (usize, Option<usize>) {
1157        let len = self.len();
1158        (len, Some(len))
1159    }
1160}
1161
1162impl ExactSizeIterator for RowsIter<'_> {
1163    fn len(&self) -> usize {
1164        self.end - self.start
1165    }
1166}
1167
1168impl DoubleEndedIterator for RowsIter<'_> {
1169    fn next_back(&mut self) -> Option<Self::Item> {
1170        if self.end == self.start {
1171            return None;
1172        }
1173        // Safety: We have checked that `start` is less than `end`
1174        let row = unsafe { self.rows.row_unchecked(self.end) };
1175        self.end -= 1;
1176        Some(row)
1177    }
1178}
1179
1180/// A comparable representation of a row.
1181///
1182/// See the [module level documentation](self) for more details.
1183///
1184/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1185/// returned by calls to [`RowConverter::convert_columns`] on the same
1186/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1187/// ordering established by comparing the [`Row`] is arbitrary.
1188#[derive(Debug, Copy, Clone)]
1189pub struct Row<'a> {
1190    data: &'a [u8],
1191    config: &'a RowConfig,
1192}
1193
1194impl<'a> Row<'a> {
1195    /// Create owned version of the row to detach it from the shared [`Rows`].
1196    pub fn owned(&self) -> OwnedRow {
1197        OwnedRow {
1198            data: self.data.into(),
1199            config: self.config.clone(),
1200        }
1201    }
1202
1203    /// The row's bytes, with the lifetime of the underlying data.
1204    pub fn data(&self) -> &'a [u8] {
1205        self.data
1206    }
1207}
1208
1209// Manually derive these as don't wish to include `fields`
1210
1211impl PartialEq for Row<'_> {
1212    #[inline]
1213    fn eq(&self, other: &Self) -> bool {
1214        self.data.eq(other.data)
1215    }
1216}
1217
1218impl Eq for Row<'_> {}
1219
1220impl PartialOrd for Row<'_> {
1221    #[inline]
1222    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1223        Some(self.cmp(other))
1224    }
1225}
1226
1227impl Ord for Row<'_> {
1228    #[inline]
1229    fn cmp(&self, other: &Self) -> Ordering {
1230        self.data.cmp(other.data)
1231    }
1232}
1233
1234impl Hash for Row<'_> {
1235    #[inline]
1236    fn hash<H: Hasher>(&self, state: &mut H) {
1237        self.data.hash(state)
1238    }
1239}
1240
1241impl AsRef<[u8]> for Row<'_> {
1242    #[inline]
1243    fn as_ref(&self) -> &[u8] {
1244        self.data
1245    }
1246}
1247
1248/// Owned version of a [`Row`] that can be moved/cloned freely.
1249///
1250/// This contains the data for the one specific row (not the entire buffer of all rows).
1251#[derive(Debug, Clone)]
1252pub struct OwnedRow {
1253    data: Box<[u8]>,
1254    config: RowConfig,
1255}
1256
1257impl OwnedRow {
1258    /// Get borrowed [`Row`] from owned version.
1259    ///
1260    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1261    pub fn row(&self) -> Row<'_> {
1262        Row {
1263            data: &self.data,
1264            config: &self.config,
1265        }
1266    }
1267}
1268
1269// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1270
1271impl PartialEq for OwnedRow {
1272    #[inline]
1273    fn eq(&self, other: &Self) -> bool {
1274        self.row().eq(&other.row())
1275    }
1276}
1277
1278impl Eq for OwnedRow {}
1279
1280impl PartialOrd for OwnedRow {
1281    #[inline]
1282    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1283        Some(self.cmp(other))
1284    }
1285}
1286
1287impl Ord for OwnedRow {
1288    #[inline]
1289    fn cmp(&self, other: &Self) -> Ordering {
1290        self.row().cmp(&other.row())
1291    }
1292}
1293
1294impl Hash for OwnedRow {
1295    #[inline]
1296    fn hash<H: Hasher>(&self, state: &mut H) {
1297        self.row().hash(state)
1298    }
1299}
1300
1301impl AsRef<[u8]> for OwnedRow {
1302    #[inline]
1303    fn as_ref(&self) -> &[u8] {
1304        &self.data
1305    }
1306}
1307
1308/// Returns the null sentinel, negated if `invert` is true
1309#[inline]
1310fn null_sentinel(options: SortOptions) -> u8 {
1311    match options.nulls_first {
1312        true => 0,
1313        false => 0xFF,
1314    }
1315}
1316
1317/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types.
1318enum LengthTracker {
1319    /// Fixed state: All rows have length `length`
1320    Fixed { length: usize, num_rows: usize },
1321    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
1322    Variable {
1323        fixed_length: usize,
1324        lengths: Vec<usize>,
1325    },
1326}
1327
1328impl LengthTracker {
1329    fn new(num_rows: usize) -> Self {
1330        Self::Fixed {
1331            length: 0,
1332            num_rows,
1333        }
1334    }
1335
1336    /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker
1337    fn push_fixed(&mut self, new_length: usize) {
1338        match self {
1339            LengthTracker::Fixed { length, .. } => *length += new_length,
1340            LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1341        }
1342    }
1343
1344    /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)`
1345    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1346        match self {
1347            LengthTracker::Fixed { length, .. } => {
1348                *self = LengthTracker::Variable {
1349                    fixed_length: *length,
1350                    lengths: new_lengths.collect(),
1351                }
1352            }
1353            LengthTracker::Variable { lengths, .. } => {
1354                assert_eq!(lengths.len(), new_lengths.len());
1355                lengths
1356                    .iter_mut()
1357                    .zip(new_lengths)
1358                    .for_each(|(length, new_length)| *length += new_length);
1359            }
1360        }
1361    }
1362
1363    /// Returns the tracked row lengths as a slice
1364    fn materialized(&mut self) -> &mut [usize] {
1365        if let LengthTracker::Fixed { length, num_rows } = *self {
1366            *self = LengthTracker::Variable {
1367                fixed_length: length,
1368                lengths: vec![0; num_rows],
1369            };
1370        }
1371
1372        match self {
1373            LengthTracker::Variable { lengths, .. } => lengths,
1374            LengthTracker::Fixed { .. } => unreachable!(),
1375        }
1376    }
1377
1378    /// Initializes the offsets using the tracked lengths. Returns the sum of the
1379    /// lengths of the rows added.
1380    ///
1381    /// We initialize the offsets shifted down by one row index.
1382    ///
1383    /// As the rows are appended to the offsets will be incremented to match
1384    ///
1385    /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
1386    /// The offsets would be initialized to `0, 0, 3, 7`
1387    ///
1388    /// Writing the first row entirely would yield `0, 3, 3, 7`
1389    /// The second, `0, 3, 7, 7`
1390    /// The third, `0, 3, 7, 13`
1391    //
1392    /// This would be the final offsets for reading
1393    //
1394    /// In this way offsets tracks the position during writing whilst eventually serving
1395    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1396        match self {
1397            LengthTracker::Fixed { length, num_rows } => {
1398                offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1399
1400                initial_offset + num_rows * length
1401            }
1402            LengthTracker::Variable {
1403                fixed_length,
1404                lengths,
1405            } => {
1406                let mut acc = initial_offset;
1407
1408                offsets.extend(lengths.iter().map(|length| {
1409                    let current = acc;
1410                    acc += length + fixed_length;
1411                    current
1412                }));
1413
1414                acc
1415            }
1416        }
1417    }
1418}
1419
1420/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1421fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1422    use fixed::FixedLengthEncoding;
1423
1424    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1425    let mut tracker = LengthTracker::new(num_rows);
1426
1427    for (array, encoder) in cols.iter().zip(encoders) {
1428        match encoder {
1429            Encoder::Stateless => {
1430                downcast_primitive_array! {
1431                    array => tracker.push_fixed(fixed::encoded_len(array)),
1432                    DataType::Null => {},
1433                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1434                    DataType::Binary => tracker.push_variable(
1435                        as_generic_binary_array::<i32>(array)
1436                            .iter()
1437                            .map(|slice| variable::encoded_len(slice))
1438                    ),
1439                    DataType::LargeBinary => tracker.push_variable(
1440                        as_generic_binary_array::<i64>(array)
1441                            .iter()
1442                            .map(|slice| variable::encoded_len(slice))
1443                    ),
1444                    DataType::BinaryView => tracker.push_variable(
1445                        array.as_binary_view()
1446                            .iter()
1447                            .map(|slice| variable::encoded_len(slice))
1448                    ),
1449                    DataType::Utf8 => tracker.push_variable(
1450                        array.as_string::<i32>()
1451                            .iter()
1452                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1453                    ),
1454                    DataType::LargeUtf8 => tracker.push_variable(
1455                        array.as_string::<i64>()
1456                            .iter()
1457                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1458                    ),
1459                    DataType::Utf8View => tracker.push_variable(
1460                        array.as_string_view()
1461                            .iter()
1462                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1463                    ),
1464                    DataType::FixedSizeBinary(len) => {
1465                        let len = len.to_usize().unwrap();
1466                        tracker.push_fixed(1 + len)
1467                    }
1468                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1469                }
1470            }
1471            Encoder::Dictionary(values, null) => {
1472                downcast_dictionary_array! {
1473                    array => {
1474                        tracker.push_variable(
1475                            array.keys().iter().map(|v| match v {
1476                                Some(k) => values.row(k.as_usize()).data.len(),
1477                                None => null.data.len(),
1478                            })
1479                        )
1480                    }
1481                    _ => unreachable!(),
1482                }
1483            }
1484            Encoder::Struct(rows, null) => {
1485                let array = as_struct_array(array);
1486                tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1487                    true => 1 + rows.row(idx).as_ref().len(),
1488                    false => 1 + null.data.len(),
1489                }));
1490            }
1491            Encoder::List(rows) => match array.data_type() {
1492                DataType::List(_) => {
1493                    list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1494                }
1495                DataType::LargeList(_) => {
1496                    list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1497                }
1498                DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1499                    &mut tracker,
1500                    rows,
1501                    as_fixed_size_list_array(array),
1502                ),
1503                _ => unreachable!(),
1504            },
1505            Encoder::RunEndEncoded(rows) => match array.data_type() {
1506                DataType::RunEndEncoded(r, _) => match r.data_type() {
1507                    DataType::Int16 => run::compute_lengths(
1508                        tracker.materialized(),
1509                        rows,
1510                        array.as_run::<Int16Type>(),
1511                    ),
1512                    DataType::Int32 => run::compute_lengths(
1513                        tracker.materialized(),
1514                        rows,
1515                        array.as_run::<Int32Type>(),
1516                    ),
1517                    DataType::Int64 => run::compute_lengths(
1518                        tracker.materialized(),
1519                        rows,
1520                        array.as_run::<Int64Type>(),
1521                    ),
1522                    _ => unreachable!("Unsupported run end index type: {r:?}"),
1523                },
1524                _ => unreachable!(),
1525            },
1526        }
1527    }
1528
1529    tracker
1530}
1531
1532/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1533fn encode_column(
1534    data: &mut [u8],
1535    offsets: &mut [usize],
1536    column: &dyn Array,
1537    opts: SortOptions,
1538    encoder: &Encoder<'_>,
1539) {
1540    match encoder {
1541        Encoder::Stateless => {
1542            downcast_primitive_array! {
1543                column => {
1544                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1545                        fixed::encode(data, offsets, column.values(), nulls, opts)
1546                    } else {
1547                        fixed::encode_not_null(data, offsets, column.values(), opts)
1548                    }
1549                }
1550                DataType::Null => {}
1551                DataType::Boolean => {
1552                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1553                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1554                    } else {
1555                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1556                    }
1557                }
1558                DataType::Binary => {
1559                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1560                }
1561                DataType::BinaryView => {
1562                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1563                }
1564                DataType::LargeBinary => {
1565                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1566                }
1567                DataType::Utf8 => variable::encode(
1568                    data, offsets,
1569                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1570                    opts,
1571                ),
1572                DataType::LargeUtf8 => variable::encode(
1573                    data, offsets,
1574                    column.as_string::<i64>()
1575                        .iter()
1576                        .map(|x| x.map(|x| x.as_bytes())),
1577                    opts,
1578                ),
1579                DataType::Utf8View => variable::encode(
1580                    data, offsets,
1581                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1582                    opts,
1583                ),
1584                DataType::FixedSizeBinary(_) => {
1585                    let array = column.as_any().downcast_ref().unwrap();
1586                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1587                }
1588                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1589            }
1590        }
1591        Encoder::Dictionary(values, nulls) => {
1592            downcast_dictionary_array! {
1593                column => encode_dictionary_values(data, offsets, column, values, nulls),
1594                _ => unreachable!()
1595            }
1596        }
1597        Encoder::Struct(rows, null) => {
1598            let array = as_struct_array(column);
1599            let null_sentinel = null_sentinel(opts);
1600            offsets
1601                .iter_mut()
1602                .skip(1)
1603                .enumerate()
1604                .for_each(|(idx, offset)| {
1605                    let (row, sentinel) = match array.is_valid(idx) {
1606                        true => (rows.row(idx), 0x01),
1607                        false => (*null, null_sentinel),
1608                    };
1609                    let end_offset = *offset + 1 + row.as_ref().len();
1610                    data[*offset] = sentinel;
1611                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1612                    *offset = end_offset;
1613                })
1614        }
1615        Encoder::List(rows) => match column.data_type() {
1616            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1617            DataType::LargeList(_) => {
1618                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1619            }
1620            DataType::FixedSizeList(_, _) => {
1621                encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1622            }
1623            _ => unreachable!(),
1624        },
1625        Encoder::RunEndEncoded(rows) => match column.data_type() {
1626            DataType::RunEndEncoded(r, _) => match r.data_type() {
1627                DataType::Int16 => {
1628                    run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1629                }
1630                DataType::Int32 => {
1631                    run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1632                }
1633                DataType::Int64 => {
1634                    run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1635                }
1636                _ => unreachable!("Unsupported run end index type: {r:?}"),
1637            },
1638            _ => unreachable!(),
1639        },
1640    }
1641}
1642
1643/// Encode dictionary values not preserving the dictionary encoding
1644pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1645    data: &mut [u8],
1646    offsets: &mut [usize],
1647    column: &DictionaryArray<K>,
1648    values: &Rows,
1649    null: &Row<'_>,
1650) {
1651    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1652        let row = match k {
1653            Some(k) => values.row(k.as_usize()).data,
1654            None => null.data,
1655        };
1656        let end_offset = *offset + row.len();
1657        data[*offset..end_offset].copy_from_slice(row);
1658        *offset = end_offset;
1659    }
1660}
1661
1662macro_rules! decode_primitive_helper {
1663    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1664        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1665    };
1666}
1667
1668/// Decodes a the provided `field` from `rows`
1669///
1670/// # Safety
1671///
1672/// Rows must contain valid data for the provided field
1673unsafe fn decode_column(
1674    field: &SortField,
1675    rows: &mut [&[u8]],
1676    codec: &Codec,
1677    validate_utf8: bool,
1678) -> Result<ArrayRef, ArrowError> {
1679    let options = field.options;
1680
1681    let array: ArrayRef = match codec {
1682        Codec::Stateless => {
1683            let data_type = field.data_type.clone();
1684            downcast_primitive! {
1685                data_type => (decode_primitive_helper, rows, data_type, options),
1686                DataType::Null => Arc::new(NullArray::new(rows.len())),
1687                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1688                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1689                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1690                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1691                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1692                DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1693                DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1694                DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1695                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1696            }
1697        }
1698        Codec::Dictionary(converter, _) => {
1699            let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1700            cols.into_iter().next().unwrap()
1701        }
1702        Codec::Struct(converter, _) => {
1703            let (null_count, nulls) = fixed::decode_nulls(rows);
1704            rows.iter_mut().for_each(|row| *row = &row[1..]);
1705            let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1706
1707            let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1708            // Since RowConverter flattens certain data types (i.e. Dictionary),
1709            // we need to use updated data type instead of original field
1710            let corrected_fields: Vec<Field> = match &field.data_type {
1711                DataType::Struct(struct_fields) => struct_fields
1712                    .iter()
1713                    .zip(child_data.iter())
1714                    .map(|(orig_field, child_array)| {
1715                        orig_field
1716                            .as_ref()
1717                            .clone()
1718                            .with_data_type(child_array.data_type().clone())
1719                    })
1720                    .collect(),
1721                _ => unreachable!("Only Struct types should be corrected here"),
1722            };
1723            let corrected_struct_type = DataType::Struct(corrected_fields.into());
1724            let builder = ArrayDataBuilder::new(corrected_struct_type)
1725                .len(rows.len())
1726                .null_count(null_count)
1727                .null_bit_buffer(Some(nulls))
1728                .child_data(child_data);
1729
1730            Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1731        }
1732        Codec::List(converter) => match &field.data_type {
1733            DataType::List(_) => {
1734                Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1735            }
1736            DataType::LargeList(_) => {
1737                Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1738            }
1739            DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1740                list::decode_fixed_size_list(
1741                    converter,
1742                    rows,
1743                    field,
1744                    validate_utf8,
1745                    value_length.as_usize(),
1746                )
1747            }?),
1748            _ => unreachable!(),
1749        },
1750        Codec::RunEndEncoded(converter) => match &field.data_type {
1751            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1752                DataType::Int16 => Arc::new(unsafe {
1753                    run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1754                }?),
1755                DataType::Int32 => Arc::new(unsafe {
1756                    run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1757                }?),
1758                DataType::Int64 => Arc::new(unsafe {
1759                    run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1760                }?),
1761                _ => unreachable!(),
1762            },
1763            _ => unreachable!(),
1764        },
1765    };
1766    Ok(array)
1767}
1768
1769#[cfg(test)]
1770mod tests {
1771    use rand::distr::uniform::SampleUniform;
1772    use rand::distr::{Distribution, StandardUniform};
1773    use rand::{Rng, rng};
1774
1775    use arrow_array::builder::*;
1776    use arrow_array::types::*;
1777    use arrow_array::*;
1778    use arrow_buffer::{Buffer, OffsetBuffer};
1779    use arrow_buffer::{NullBuffer, i256};
1780    use arrow_cast::display::{ArrayFormatter, FormatOptions};
1781    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1782
1783    use super::*;
1784
1785    #[test]
1786    fn test_fixed_width() {
1787        let cols = [
1788            Arc::new(Int16Array::from_iter([
1789                Some(1),
1790                Some(2),
1791                None,
1792                Some(-5),
1793                Some(2),
1794                Some(2),
1795                Some(0),
1796            ])) as ArrayRef,
1797            Arc::new(Float32Array::from_iter([
1798                Some(1.3),
1799                Some(2.5),
1800                None,
1801                Some(4.),
1802                Some(0.1),
1803                Some(-4.),
1804                Some(-0.),
1805            ])) as ArrayRef,
1806        ];
1807
1808        let converter = RowConverter::new(vec![
1809            SortField::new(DataType::Int16),
1810            SortField::new(DataType::Float32),
1811        ])
1812        .unwrap();
1813        let rows = converter.convert_columns(&cols).unwrap();
1814
1815        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1816        assert_eq!(
1817            rows.buffer,
1818            &[
1819                1, 128, 1, //
1820                1, 191, 166, 102, 102, //
1821                1, 128, 2, //
1822                1, 192, 32, 0, 0, //
1823                0, 0, 0, //
1824                0, 0, 0, 0, 0, //
1825                1, 127, 251, //
1826                1, 192, 128, 0, 0, //
1827                1, 128, 2, //
1828                1, 189, 204, 204, 205, //
1829                1, 128, 2, //
1830                1, 63, 127, 255, 255, //
1831                1, 128, 0, //
1832                1, 127, 255, 255, 255 //
1833            ]
1834        );
1835
1836        assert!(rows.row(3) < rows.row(6));
1837        assert!(rows.row(0) < rows.row(1));
1838        assert!(rows.row(3) < rows.row(0));
1839        assert!(rows.row(4) < rows.row(1));
1840        assert!(rows.row(5) < rows.row(4));
1841
1842        let back = converter.convert_rows(&rows).unwrap();
1843        for (expected, actual) in cols.iter().zip(&back) {
1844            assert_eq!(expected, actual);
1845        }
1846    }
1847
1848    #[test]
1849    fn test_decimal32() {
1850        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
1851            DECIMAL32_MAX_PRECISION,
1852            7,
1853        ))])
1854        .unwrap();
1855        let col = Arc::new(
1856            Decimal32Array::from_iter([
1857                None,
1858                Some(i32::MIN),
1859                Some(-13),
1860                Some(46_i32),
1861                Some(5456_i32),
1862                Some(i32::MAX),
1863            ])
1864            .with_precision_and_scale(9, 7)
1865            .unwrap(),
1866        ) as ArrayRef;
1867
1868        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1869        for i in 0..rows.num_rows() - 1 {
1870            assert!(rows.row(i) < rows.row(i + 1));
1871        }
1872
1873        let back = converter.convert_rows(&rows).unwrap();
1874        assert_eq!(back.len(), 1);
1875        assert_eq!(col.as_ref(), back[0].as_ref())
1876    }
1877
1878    #[test]
1879    fn test_decimal64() {
1880        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
1881            DECIMAL64_MAX_PRECISION,
1882            7,
1883        ))])
1884        .unwrap();
1885        let col = Arc::new(
1886            Decimal64Array::from_iter([
1887                None,
1888                Some(i64::MIN),
1889                Some(-13),
1890                Some(46_i64),
1891                Some(5456_i64),
1892                Some(i64::MAX),
1893            ])
1894            .with_precision_and_scale(18, 7)
1895            .unwrap(),
1896        ) as ArrayRef;
1897
1898        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1899        for i in 0..rows.num_rows() - 1 {
1900            assert!(rows.row(i) < rows.row(i + 1));
1901        }
1902
1903        let back = converter.convert_rows(&rows).unwrap();
1904        assert_eq!(back.len(), 1);
1905        assert_eq!(col.as_ref(), back[0].as_ref())
1906    }
1907
1908    #[test]
1909    fn test_decimal128() {
1910        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1911            DECIMAL128_MAX_PRECISION,
1912            7,
1913        ))])
1914        .unwrap();
1915        let col = Arc::new(
1916            Decimal128Array::from_iter([
1917                None,
1918                Some(i128::MIN),
1919                Some(-13),
1920                Some(46_i128),
1921                Some(5456_i128),
1922                Some(i128::MAX),
1923            ])
1924            .with_precision_and_scale(38, 7)
1925            .unwrap(),
1926        ) as ArrayRef;
1927
1928        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1929        for i in 0..rows.num_rows() - 1 {
1930            assert!(rows.row(i) < rows.row(i + 1));
1931        }
1932
1933        let back = converter.convert_rows(&rows).unwrap();
1934        assert_eq!(back.len(), 1);
1935        assert_eq!(col.as_ref(), back[0].as_ref())
1936    }
1937
1938    #[test]
1939    fn test_decimal256() {
1940        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1941            DECIMAL256_MAX_PRECISION,
1942            7,
1943        ))])
1944        .unwrap();
1945        let col = Arc::new(
1946            Decimal256Array::from_iter([
1947                None,
1948                Some(i256::MIN),
1949                Some(i256::from_parts(0, -1)),
1950                Some(i256::from_parts(u128::MAX, -1)),
1951                Some(i256::from_parts(u128::MAX, 0)),
1952                Some(i256::from_parts(0, 46_i128)),
1953                Some(i256::from_parts(5, 46_i128)),
1954                Some(i256::MAX),
1955            ])
1956            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1957            .unwrap(),
1958        ) as ArrayRef;
1959
1960        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1961        for i in 0..rows.num_rows() - 1 {
1962            assert!(rows.row(i) < rows.row(i + 1));
1963        }
1964
1965        let back = converter.convert_rows(&rows).unwrap();
1966        assert_eq!(back.len(), 1);
1967        assert_eq!(col.as_ref(), back[0].as_ref())
1968    }
1969
1970    #[test]
1971    fn test_bool() {
1972        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1973
1974        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1975
1976        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1977        assert!(rows.row(2) > rows.row(1));
1978        assert!(rows.row(2) > rows.row(0));
1979        assert!(rows.row(1) > rows.row(0));
1980
1981        let cols = converter.convert_rows(&rows).unwrap();
1982        assert_eq!(&cols[0], &col);
1983
1984        let converter = RowConverter::new(vec![SortField::new_with_options(
1985            DataType::Boolean,
1986            SortOptions::default().desc().with_nulls_first(false),
1987        )])
1988        .unwrap();
1989
1990        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1991        assert!(rows.row(2) < rows.row(1));
1992        assert!(rows.row(2) < rows.row(0));
1993        assert!(rows.row(1) < rows.row(0));
1994        let cols = converter.convert_rows(&rows).unwrap();
1995        assert_eq!(&cols[0], &col);
1996    }
1997
1998    #[test]
1999    fn test_timezone() {
2000        let a =
2001            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2002        let d = a.data_type().clone();
2003
2004        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2005        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2006        let back = converter.convert_rows(&rows).unwrap();
2007        assert_eq!(back.len(), 1);
2008        assert_eq!(back[0].data_type(), &d);
2009
2010        // Test dictionary
2011        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2012        a.append(34).unwrap();
2013        a.append_null();
2014        a.append(345).unwrap();
2015
2016        // Construct dictionary with a timezone
2017        let dict = a.finish();
2018        let values = TimestampNanosecondArray::from(dict.values().to_data());
2019        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2020        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2021        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2022
2023        assert_eq!(dict_with_tz.data_type(), &d);
2024        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2025        let rows = converter
2026            .convert_columns(&[Arc::new(dict_with_tz) as _])
2027            .unwrap();
2028        let back = converter.convert_rows(&rows).unwrap();
2029        assert_eq!(back.len(), 1);
2030        assert_eq!(back[0].data_type(), &v);
2031    }
2032
2033    #[test]
2034    fn test_null_encoding() {
2035        let col = Arc::new(NullArray::new(10));
2036        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2037        let rows = converter.convert_columns(&[col]).unwrap();
2038        assert_eq!(rows.num_rows(), 10);
2039        assert_eq!(rows.row(1).data.len(), 0);
2040    }
2041
2042    #[test]
2043    fn test_variable_width() {
2044        let col = Arc::new(StringArray::from_iter([
2045            Some("hello"),
2046            Some("he"),
2047            None,
2048            Some("foo"),
2049            Some(""),
2050        ])) as ArrayRef;
2051
2052        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2053        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2054
2055        assert!(rows.row(1) < rows.row(0));
2056        assert!(rows.row(2) < rows.row(4));
2057        assert!(rows.row(3) < rows.row(0));
2058        assert!(rows.row(3) < rows.row(1));
2059
2060        let cols = converter.convert_rows(&rows).unwrap();
2061        assert_eq!(&cols[0], &col);
2062
2063        let col = Arc::new(BinaryArray::from_iter([
2064            None,
2065            Some(vec![0_u8; 0]),
2066            Some(vec![0_u8; 6]),
2067            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2068            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2069            Some(vec![0_u8; variable::BLOCK_SIZE]),
2070            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2071            Some(vec![1_u8; 6]),
2072            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2073            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2074            Some(vec![1_u8; variable::BLOCK_SIZE]),
2075            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2076            Some(vec![0xFF_u8; 6]),
2077            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2078            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2079            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2080            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2081        ])) as ArrayRef;
2082
2083        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2084        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2085
2086        for i in 0..rows.num_rows() {
2087            for j in i + 1..rows.num_rows() {
2088                assert!(
2089                    rows.row(i) < rows.row(j),
2090                    "{} < {} - {:?} < {:?}",
2091                    i,
2092                    j,
2093                    rows.row(i),
2094                    rows.row(j)
2095                );
2096            }
2097        }
2098
2099        let cols = converter.convert_rows(&rows).unwrap();
2100        assert_eq!(&cols[0], &col);
2101
2102        let converter = RowConverter::new(vec![SortField::new_with_options(
2103            DataType::Binary,
2104            SortOptions::default().desc().with_nulls_first(false),
2105        )])
2106        .unwrap();
2107        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2108
2109        for i in 0..rows.num_rows() {
2110            for j in i + 1..rows.num_rows() {
2111                assert!(
2112                    rows.row(i) > rows.row(j),
2113                    "{} > {} - {:?} > {:?}",
2114                    i,
2115                    j,
2116                    rows.row(i),
2117                    rows.row(j)
2118                );
2119            }
2120        }
2121
2122        let cols = converter.convert_rows(&rows).unwrap();
2123        assert_eq!(&cols[0], &col);
2124    }
2125
2126    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
2127    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2128        match b.data_type() {
2129            DataType::Dictionary(_, v) => {
2130                assert_eq!(a.data_type(), v.as_ref());
2131                let b = arrow_cast::cast(b, v).unwrap();
2132                assert_eq!(a, b.as_ref())
2133            }
2134            _ => assert_eq!(a, b),
2135        }
2136    }
2137
2138    #[test]
2139    fn test_string_dictionary() {
2140        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2141            Some("foo"),
2142            Some("hello"),
2143            Some("he"),
2144            None,
2145            Some("hello"),
2146            Some(""),
2147            Some("hello"),
2148            Some("hello"),
2149        ])) as ArrayRef;
2150
2151        let field = SortField::new(a.data_type().clone());
2152        let converter = RowConverter::new(vec![field]).unwrap();
2153        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2154
2155        assert!(rows_a.row(3) < rows_a.row(5));
2156        assert!(rows_a.row(2) < rows_a.row(1));
2157        assert!(rows_a.row(0) < rows_a.row(1));
2158        assert!(rows_a.row(3) < rows_a.row(0));
2159
2160        assert_eq!(rows_a.row(1), rows_a.row(4));
2161        assert_eq!(rows_a.row(1), rows_a.row(6));
2162        assert_eq!(rows_a.row(1), rows_a.row(7));
2163
2164        let cols = converter.convert_rows(&rows_a).unwrap();
2165        dictionary_eq(&cols[0], &a);
2166
2167        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2168            Some("hello"),
2169            None,
2170            Some("cupcakes"),
2171        ])) as ArrayRef;
2172
2173        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2174        assert_eq!(rows_a.row(1), rows_b.row(0));
2175        assert_eq!(rows_a.row(3), rows_b.row(1));
2176        assert!(rows_b.row(2) < rows_a.row(0));
2177
2178        let cols = converter.convert_rows(&rows_b).unwrap();
2179        dictionary_eq(&cols[0], &b);
2180
2181        let converter = RowConverter::new(vec![SortField::new_with_options(
2182            a.data_type().clone(),
2183            SortOptions::default().desc().with_nulls_first(false),
2184        )])
2185        .unwrap();
2186
2187        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2188        assert!(rows_c.row(3) > rows_c.row(5));
2189        assert!(rows_c.row(2) > rows_c.row(1));
2190        assert!(rows_c.row(0) > rows_c.row(1));
2191        assert!(rows_c.row(3) > rows_c.row(0));
2192
2193        let cols = converter.convert_rows(&rows_c).unwrap();
2194        dictionary_eq(&cols[0], &a);
2195
2196        let converter = RowConverter::new(vec![SortField::new_with_options(
2197            a.data_type().clone(),
2198            SortOptions::default().desc().with_nulls_first(true),
2199        )])
2200        .unwrap();
2201
2202        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2203        assert!(rows_c.row(3) < rows_c.row(5));
2204        assert!(rows_c.row(2) > rows_c.row(1));
2205        assert!(rows_c.row(0) > rows_c.row(1));
2206        assert!(rows_c.row(3) < rows_c.row(0));
2207
2208        let cols = converter.convert_rows(&rows_c).unwrap();
2209        dictionary_eq(&cols[0], &a);
2210    }
2211
2212    #[test]
2213    fn test_struct() {
2214        // Test basic
2215        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2216        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2217        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2218        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2219        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2220
2221        let sort_fields = vec![SortField::new(s1.data_type().clone())];
2222        let converter = RowConverter::new(sort_fields).unwrap();
2223        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2224
2225        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2226            assert!(a < b);
2227        }
2228
2229        let back = converter.convert_rows(&r1).unwrap();
2230        assert_eq!(back.len(), 1);
2231        assert_eq!(&back[0], &s1);
2232
2233        // Test struct nullability
2234        let data = s1
2235            .to_data()
2236            .into_builder()
2237            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2238            .null_count(2)
2239            .build()
2240            .unwrap();
2241
2242        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2243        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2244        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
2245        assert!(r2.row(0) < r2.row(1)); // Nulls first
2246        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
2247        assert_eq!(r1.row(1), r2.row(1)); // Values equal
2248
2249        let back = converter.convert_rows(&r2).unwrap();
2250        assert_eq!(back.len(), 1);
2251        assert_eq!(&back[0], &s2);
2252
2253        back[0].to_data().validate_full().unwrap();
2254    }
2255
2256    #[test]
2257    fn test_dictionary_in_struct() {
2258        let builder = StringDictionaryBuilder::<Int32Type>::new();
2259        let mut struct_builder = StructBuilder::new(
2260            vec![Field::new_dictionary(
2261                "foo",
2262                DataType::Int32,
2263                DataType::Utf8,
2264                true,
2265            )],
2266            vec![Box::new(builder)],
2267        );
2268
2269        let dict_builder = struct_builder
2270            .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2271            .unwrap();
2272
2273        // Flattened: ["a", null, "a", "b"]
2274        dict_builder.append_value("a");
2275        dict_builder.append_null();
2276        dict_builder.append_value("a");
2277        dict_builder.append_value("b");
2278
2279        for _ in 0..4 {
2280            struct_builder.append(true);
2281        }
2282
2283        let s = Arc::new(struct_builder.finish()) as ArrayRef;
2284        let sort_fields = vec![SortField::new(s.data_type().clone())];
2285        let converter = RowConverter::new(sort_fields).unwrap();
2286        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2287
2288        let back = converter.convert_rows(&r).unwrap();
2289        let [s2] = back.try_into().unwrap();
2290
2291        // RowConverter flattens Dictionary
2292        // s.ty = Struct("foo": Dictionary(Int32, Utf8)), s2.ty = Struct("foo": Utf8)
2293        assert_ne!(&s.data_type(), &s2.data_type());
2294        s2.to_data().validate_full().unwrap();
2295
2296        // Check if the logical data remains the same
2297        // Keys: [0, null, 0, 1]
2298        // Values: ["a", "b"]
2299        let s1_struct = s.as_struct();
2300        let s1_0 = s1_struct.column(0);
2301        let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2302        let keys = s1_idx_0.keys();
2303        let values = s1_idx_0.values().as_string::<i32>();
2304        // Flattened: ["a", null, "a", "b"]
2305        let s2_struct = s2.as_struct();
2306        let s2_0 = s2_struct.column(0);
2307        let s2_idx_0 = s2_0.as_string::<i32>();
2308
2309        for i in 0..keys.len() {
2310            if keys.is_null(i) {
2311                assert!(s2_idx_0.is_null(i));
2312            } else {
2313                let dict_index = keys.value(i) as usize;
2314                assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2315            }
2316        }
2317    }
2318
2319    #[test]
2320    fn test_dictionary_in_struct_empty() {
2321        let ty = DataType::Struct(
2322            vec![Field::new_dictionary(
2323                "foo",
2324                DataType::Int32,
2325                DataType::Int32,
2326                false,
2327            )]
2328            .into(),
2329        );
2330        let s = arrow_array::new_empty_array(&ty);
2331
2332        let sort_fields = vec![SortField::new(s.data_type().clone())];
2333        let converter = RowConverter::new(sort_fields).unwrap();
2334        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2335
2336        let back = converter.convert_rows(&r).unwrap();
2337        let [s2] = back.try_into().unwrap();
2338
2339        // RowConverter flattens Dictionary
2340        // s.ty = Struct("foo": Dictionary(Int32, Int32)), s2.ty = Struct("foo": Int32)
2341        assert_ne!(&s.data_type(), &s2.data_type());
2342        s2.to_data().validate_full().unwrap();
2343        assert_eq!(s.len(), 0);
2344        assert_eq!(s2.len(), 0);
2345    }
2346
2347    #[test]
2348    fn test_list_of_string_dictionary() {
2349        let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2350        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2351        builder.values().append("a").unwrap();
2352        builder.values().append("b").unwrap();
2353        builder.values().append("zero").unwrap();
2354        builder.values().append_null();
2355        builder.values().append("c").unwrap();
2356        builder.values().append("b").unwrap();
2357        builder.values().append("d").unwrap();
2358        builder.append(true);
2359        // List[1] = null
2360        builder.append(false);
2361        // List[2] = ["e", "zero", "a" (dict)]
2362        builder.values().append("e").unwrap();
2363        builder.values().append("zero").unwrap();
2364        builder.values().append("a").unwrap();
2365        builder.append(true);
2366
2367        let a = Arc::new(builder.finish()) as ArrayRef;
2368        let data_type = a.data_type().clone();
2369
2370        let field = SortField::new(data_type.clone());
2371        let converter = RowConverter::new(vec![field]).unwrap();
2372        let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2373
2374        let back = converter.convert_rows(&rows).unwrap();
2375        assert_eq!(back.len(), 1);
2376        let [a2] = back.try_into().unwrap();
2377
2378        // RowConverter flattens Dictionary
2379        // a.ty: List(Dictionary(Int32, Utf8)), a2.ty: List(Utf8)
2380        assert_ne!(&a.data_type(), &a2.data_type());
2381
2382        a2.to_data().validate_full().unwrap();
2383
2384        let a2_list = a2.as_list::<i32>();
2385        let a1_list = a.as_list::<i32>();
2386
2387        // Check if the logical data remains the same
2388        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2389        let a1_0 = a1_list.value(0);
2390        let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2391        let keys = a1_idx_0.keys();
2392        let values = a1_idx_0.values().as_string::<i32>();
2393        let a2_0 = a2_list.value(0);
2394        let a2_idx_0 = a2_0.as_string::<i32>();
2395
2396        for i in 0..keys.len() {
2397            if keys.is_null(i) {
2398                assert!(a2_idx_0.is_null(i));
2399            } else {
2400                let dict_index = keys.value(i) as usize;
2401                assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2402            }
2403        }
2404
2405        // List[1] = null
2406        assert!(a1_list.is_null(1));
2407        assert!(a2_list.is_null(1));
2408
2409        // List[2] = ["e", "zero", "a" (dict)]
2410        let a1_2 = a1_list.value(2);
2411        let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2412        let keys = a1_idx_2.keys();
2413        let values = a1_idx_2.values().as_string::<i32>();
2414        let a2_2 = a2_list.value(2);
2415        let a2_idx_2 = a2_2.as_string::<i32>();
2416
2417        for i in 0..keys.len() {
2418            if keys.is_null(i) {
2419                assert!(a2_idx_2.is_null(i));
2420            } else {
2421                let dict_index = keys.value(i) as usize;
2422                assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2423            }
2424        }
2425    }
2426
2427    #[test]
2428    fn test_primitive_dictionary() {
2429        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2430        builder.append(2).unwrap();
2431        builder.append(3).unwrap();
2432        builder.append(0).unwrap();
2433        builder.append_null();
2434        builder.append(5).unwrap();
2435        builder.append(3).unwrap();
2436        builder.append(-1).unwrap();
2437
2438        let a = builder.finish();
2439        let data_type = a.data_type().clone();
2440        let columns = [Arc::new(a) as ArrayRef];
2441
2442        let field = SortField::new(data_type.clone());
2443        let converter = RowConverter::new(vec![field]).unwrap();
2444        let rows = converter.convert_columns(&columns).unwrap();
2445        assert!(rows.row(0) < rows.row(1));
2446        assert!(rows.row(2) < rows.row(0));
2447        assert!(rows.row(3) < rows.row(2));
2448        assert!(rows.row(6) < rows.row(2));
2449        assert!(rows.row(3) < rows.row(6));
2450
2451        let back = converter.convert_rows(&rows).unwrap();
2452        assert_eq!(back.len(), 1);
2453        back[0].to_data().validate_full().unwrap();
2454    }
2455
2456    #[test]
2457    fn test_dictionary_nulls() {
2458        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2459        let keys =
2460            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2461
2462        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2463        let data = keys
2464            .into_builder()
2465            .data_type(data_type.clone())
2466            .child_data(vec![values])
2467            .build()
2468            .unwrap();
2469
2470        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2471        let field = SortField::new(data_type.clone());
2472        let converter = RowConverter::new(vec![field]).unwrap();
2473        let rows = converter.convert_columns(&columns).unwrap();
2474
2475        assert_eq!(rows.row(0), rows.row(1));
2476        assert_eq!(rows.row(3), rows.row(4));
2477        assert_eq!(rows.row(4), rows.row(5));
2478        assert!(rows.row(3) < rows.row(0));
2479    }
2480
2481    #[test]
2482    fn test_from_binary_shared_buffer() {
2483        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2484        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2485        let rows = converter.convert_columns(&[array]).unwrap();
2486        let binary_rows = rows.try_into_binary().expect("known-small rows");
2487        let _binary_rows_shared_buffer = binary_rows.clone();
2488
2489        let parsed = converter.from_binary(binary_rows);
2490
2491        converter.convert_rows(parsed.iter()).unwrap();
2492    }
2493
2494    #[test]
2495    #[should_panic(expected = "Encountered non UTF-8 data")]
2496    fn test_invalid_utf8() {
2497        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2498        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2499        let rows = converter.convert_columns(&[array]).unwrap();
2500        let binary_row = rows.row(0);
2501
2502        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2503        let parser = converter.parser();
2504        let utf8_row = parser.parse(binary_row.as_ref());
2505
2506        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2507    }
2508
2509    #[test]
2510    #[should_panic(expected = "Encountered non UTF-8 data")]
2511    fn test_invalid_utf8_array() {
2512        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2513        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2514        let rows = converter.convert_columns(&[array]).unwrap();
2515        let binary_rows = rows.try_into_binary().expect("known-small rows");
2516
2517        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2518        let parsed = converter.from_binary(binary_rows);
2519
2520        converter.convert_rows(parsed.iter()).unwrap();
2521    }
2522
2523    #[test]
2524    #[should_panic(expected = "index out of bounds")]
2525    fn test_invalid_empty() {
2526        let binary_row: &[u8] = &[];
2527
2528        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2529        let parser = converter.parser();
2530        let utf8_row = parser.parse(binary_row.as_ref());
2531
2532        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2533    }
2534
2535    #[test]
2536    #[should_panic(expected = "index out of bounds")]
2537    fn test_invalid_empty_array() {
2538        let row: &[u8] = &[];
2539        let binary_rows = BinaryArray::from(vec![row]);
2540
2541        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2542        let parsed = converter.from_binary(binary_rows);
2543
2544        converter.convert_rows(parsed.iter()).unwrap();
2545    }
2546
2547    #[test]
2548    #[should_panic(expected = "index out of bounds")]
2549    fn test_invalid_truncated() {
2550        let binary_row: &[u8] = &[0x02];
2551
2552        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2553        let parser = converter.parser();
2554        let utf8_row = parser.parse(binary_row.as_ref());
2555
2556        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2557    }
2558
2559    #[test]
2560    #[should_panic(expected = "index out of bounds")]
2561    fn test_invalid_truncated_array() {
2562        let row: &[u8] = &[0x02];
2563        let binary_rows = BinaryArray::from(vec![row]);
2564
2565        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2566        let parsed = converter.from_binary(binary_rows);
2567
2568        converter.convert_rows(parsed.iter()).unwrap();
2569    }
2570
2571    #[test]
2572    #[should_panic(expected = "rows were not produced by this RowConverter")]
2573    fn test_different_converter() {
2574        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2575        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2576        let rows = converter.convert_columns(&[values]).unwrap();
2577
2578        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2579        let _ = converter.convert_rows(&rows);
2580    }
2581
2582    fn test_single_list<O: OffsetSizeTrait>() {
2583        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2584        builder.values().append_value(32);
2585        builder.values().append_value(52);
2586        builder.values().append_value(32);
2587        builder.append(true);
2588        builder.values().append_value(32);
2589        builder.values().append_value(52);
2590        builder.values().append_value(12);
2591        builder.append(true);
2592        builder.values().append_value(32);
2593        builder.values().append_value(52);
2594        builder.append(true);
2595        builder.values().append_value(32); // MASKED
2596        builder.values().append_value(52); // MASKED
2597        builder.append(false);
2598        builder.values().append_value(32);
2599        builder.values().append_null();
2600        builder.append(true);
2601        builder.append(true);
2602        builder.values().append_value(17); // MASKED
2603        builder.values().append_null(); // MASKED
2604        builder.append(false);
2605
2606        let list = Arc::new(builder.finish()) as ArrayRef;
2607        let d = list.data_type().clone();
2608
2609        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2610
2611        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2612        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2613        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2614        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2615        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2616        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2617        assert!(rows.row(3) < rows.row(5)); // null < []
2618        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2619
2620        let back = converter.convert_rows(&rows).unwrap();
2621        assert_eq!(back.len(), 1);
2622        back[0].to_data().validate_full().unwrap();
2623        assert_eq!(&back[0], &list);
2624
2625        let options = SortOptions::default().asc().with_nulls_first(false);
2626        let field = SortField::new_with_options(d.clone(), options);
2627        let converter = RowConverter::new(vec![field]).unwrap();
2628        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2629
2630        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2631        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2632        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2633        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2634        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2635        assert!(rows.row(3) > rows.row(5)); // null > []
2636        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2637
2638        let back = converter.convert_rows(&rows).unwrap();
2639        assert_eq!(back.len(), 1);
2640        back[0].to_data().validate_full().unwrap();
2641        assert_eq!(&back[0], &list);
2642
2643        let options = SortOptions::default().desc().with_nulls_first(false);
2644        let field = SortField::new_with_options(d.clone(), options);
2645        let converter = RowConverter::new(vec![field]).unwrap();
2646        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2647
2648        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2649        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2650        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2651        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2652        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2653        assert!(rows.row(3) > rows.row(5)); // null > []
2654        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2655
2656        let back = converter.convert_rows(&rows).unwrap();
2657        assert_eq!(back.len(), 1);
2658        back[0].to_data().validate_full().unwrap();
2659        assert_eq!(&back[0], &list);
2660
2661        let options = SortOptions::default().desc().with_nulls_first(true);
2662        let field = SortField::new_with_options(d, options);
2663        let converter = RowConverter::new(vec![field]).unwrap();
2664        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2665
2666        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2667        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2668        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2669        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2670        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2671        assert!(rows.row(3) < rows.row(5)); // null < []
2672        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2673
2674        let back = converter.convert_rows(&rows).unwrap();
2675        assert_eq!(back.len(), 1);
2676        back[0].to_data().validate_full().unwrap();
2677        assert_eq!(&back[0], &list);
2678
2679        let sliced_list = list.slice(1, 5);
2680        let rows_on_sliced_list = converter
2681            .convert_columns(&[Arc::clone(&sliced_list)])
2682            .unwrap();
2683
2684        assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); // [32, 52] > [32, 52, 12]
2685        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52]
2686        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null] < [32, 52]
2687        assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); // [] > [32, 52]
2688        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < []
2689
2690        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2691        assert_eq!(back.len(), 1);
2692        back[0].to_data().validate_full().unwrap();
2693        assert_eq!(&back[0], &sliced_list);
2694    }
2695
2696    fn test_nested_list<O: OffsetSizeTrait>() {
2697        let mut builder =
2698            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2699
2700        builder.values().values().append_value(1);
2701        builder.values().values().append_value(2);
2702        builder.values().append(true);
2703        builder.values().values().append_value(1);
2704        builder.values().values().append_null();
2705        builder.values().append(true);
2706        builder.append(true);
2707
2708        builder.values().values().append_value(1);
2709        builder.values().values().append_null();
2710        builder.values().append(true);
2711        builder.values().values().append_value(1);
2712        builder.values().values().append_null();
2713        builder.values().append(true);
2714        builder.append(true);
2715
2716        builder.values().values().append_value(1);
2717        builder.values().values().append_null();
2718        builder.values().append(true);
2719        builder.values().append(false);
2720        builder.append(true);
2721        builder.append(false);
2722
2723        builder.values().values().append_value(1);
2724        builder.values().values().append_value(2);
2725        builder.values().append(true);
2726        builder.append(true);
2727
2728        let list = Arc::new(builder.finish()) as ArrayRef;
2729        let d = list.data_type().clone();
2730
2731        // [
2732        //   [[1, 2], [1, null]],
2733        //   [[1, null], [1, null]],
2734        //   [[1, null], null]
2735        //   null
2736        //   [[1, 2]]
2737        // ]
2738        let options = SortOptions::default().asc().with_nulls_first(true);
2739        let field = SortField::new_with_options(d.clone(), options);
2740        let converter = RowConverter::new(vec![field]).unwrap();
2741        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2742
2743        assert!(rows.row(0) > rows.row(1));
2744        assert!(rows.row(1) > rows.row(2));
2745        assert!(rows.row(2) > rows.row(3));
2746        assert!(rows.row(4) < rows.row(0));
2747        assert!(rows.row(4) > rows.row(1));
2748
2749        let back = converter.convert_rows(&rows).unwrap();
2750        assert_eq!(back.len(), 1);
2751        back[0].to_data().validate_full().unwrap();
2752        assert_eq!(&back[0], &list);
2753
2754        let options = SortOptions::default().desc().with_nulls_first(true);
2755        let field = SortField::new_with_options(d.clone(), options);
2756        let converter = RowConverter::new(vec![field]).unwrap();
2757        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2758
2759        assert!(rows.row(0) > rows.row(1));
2760        assert!(rows.row(1) > rows.row(2));
2761        assert!(rows.row(2) > rows.row(3));
2762        assert!(rows.row(4) > rows.row(0));
2763        assert!(rows.row(4) > rows.row(1));
2764
2765        let back = converter.convert_rows(&rows).unwrap();
2766        assert_eq!(back.len(), 1);
2767        back[0].to_data().validate_full().unwrap();
2768        assert_eq!(&back[0], &list);
2769
2770        let options = SortOptions::default().desc().with_nulls_first(false);
2771        let field = SortField::new_with_options(d, options);
2772        let converter = RowConverter::new(vec![field]).unwrap();
2773        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2774
2775        assert!(rows.row(0) < rows.row(1));
2776        assert!(rows.row(1) < rows.row(2));
2777        assert!(rows.row(2) < rows.row(3));
2778        assert!(rows.row(4) > rows.row(0));
2779        assert!(rows.row(4) < rows.row(1));
2780
2781        let back = converter.convert_rows(&rows).unwrap();
2782        assert_eq!(back.len(), 1);
2783        back[0].to_data().validate_full().unwrap();
2784        assert_eq!(&back[0], &list);
2785
2786        let sliced_list = list.slice(1, 3);
2787        let rows = converter
2788            .convert_columns(&[Arc::clone(&sliced_list)])
2789            .unwrap();
2790
2791        assert!(rows.row(0) < rows.row(1));
2792        assert!(rows.row(1) < rows.row(2));
2793
2794        let back = converter.convert_rows(&rows).unwrap();
2795        assert_eq!(back.len(), 1);
2796        back[0].to_data().validate_full().unwrap();
2797        assert_eq!(&back[0], &sliced_list);
2798    }
2799
2800    #[test]
2801    fn test_list() {
2802        test_single_list::<i32>();
2803        test_nested_list::<i32>();
2804    }
2805
2806    #[test]
2807    fn test_large_list() {
2808        test_single_list::<i64>();
2809        test_nested_list::<i64>();
2810    }
2811
2812    #[test]
2813    fn test_fixed_size_list() {
2814        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
2815        builder.values().append_value(32);
2816        builder.values().append_value(52);
2817        builder.values().append_value(32);
2818        builder.append(true);
2819        builder.values().append_value(32);
2820        builder.values().append_value(52);
2821        builder.values().append_value(12);
2822        builder.append(true);
2823        builder.values().append_value(32);
2824        builder.values().append_value(52);
2825        builder.values().append_null();
2826        builder.append(true);
2827        builder.values().append_value(32); // MASKED
2828        builder.values().append_value(52); // MASKED
2829        builder.values().append_value(13); // MASKED
2830        builder.append(false);
2831        builder.values().append_value(32);
2832        builder.values().append_null();
2833        builder.values().append_null();
2834        builder.append(true);
2835        builder.values().append_null();
2836        builder.values().append_null();
2837        builder.values().append_null();
2838        builder.append(true);
2839        builder.values().append_value(17); // MASKED
2840        builder.values().append_null(); // MASKED
2841        builder.values().append_value(77); // MASKED
2842        builder.append(false);
2843
2844        let list = Arc::new(builder.finish()) as ArrayRef;
2845        let d = list.data_type().clone();
2846
2847        // Default sorting (ascending, nulls first)
2848        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2849
2850        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2851        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2852        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12]
2853        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
2854        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
2855        assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null]
2856        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
2857        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2858
2859        let back = converter.convert_rows(&rows).unwrap();
2860        assert_eq!(back.len(), 1);
2861        back[0].to_data().validate_full().unwrap();
2862        assert_eq!(&back[0], &list);
2863
2864        // Ascending, null last
2865        let options = SortOptions::default().asc().with_nulls_first(false);
2866        let field = SortField::new_with_options(d.clone(), options);
2867        let converter = RowConverter::new(vec![field]).unwrap();
2868        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2869        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2870        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
2871        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
2872        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
2873        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
2874        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
2875        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2876
2877        let back = converter.convert_rows(&rows).unwrap();
2878        assert_eq!(back.len(), 1);
2879        back[0].to_data().validate_full().unwrap();
2880        assert_eq!(&back[0], &list);
2881
2882        // Descending, nulls last
2883        let options = SortOptions::default().desc().with_nulls_first(false);
2884        let field = SortField::new_with_options(d.clone(), options);
2885        let converter = RowConverter::new(vec![field]).unwrap();
2886        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2887        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2888        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
2889        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
2890        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
2891        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
2892        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
2893        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2894
2895        let back = converter.convert_rows(&rows).unwrap();
2896        assert_eq!(back.len(), 1);
2897        back[0].to_data().validate_full().unwrap();
2898        assert_eq!(&back[0], &list);
2899
2900        // Descending, nulls first
2901        let options = SortOptions::default().desc().with_nulls_first(true);
2902        let field = SortField::new_with_options(d, options);
2903        let converter = RowConverter::new(vec![field]).unwrap();
2904        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2905
2906        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2907        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12]
2908        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
2909        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
2910        assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null]
2911        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
2912        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2913
2914        let back = converter.convert_rows(&rows).unwrap();
2915        assert_eq!(back.len(), 1);
2916        back[0].to_data().validate_full().unwrap();
2917        assert_eq!(&back[0], &list);
2918
2919        let sliced_list = list.slice(1, 5);
2920        let rows_on_sliced_list = converter
2921            .convert_columns(&[Arc::clone(&sliced_list)])
2922            .unwrap();
2923
2924        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52, null]
2925        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null, null] < [32, 52, null]
2926        assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); // [null, null, null] > [32, 52, null]
2927        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < [null, null, null]
2928
2929        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2930        assert_eq!(back.len(), 1);
2931        back[0].to_data().validate_full().unwrap();
2932        assert_eq!(&back[0], &sliced_list);
2933    }
2934
2935    #[test]
2936    fn test_two_fixed_size_lists() {
2937        let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2938        // 0: [100]
2939        first.values().append_value(100);
2940        first.append(true);
2941        // 1: [101]
2942        first.values().append_value(101);
2943        first.append(true);
2944        // 2: [102]
2945        first.values().append_value(102);
2946        first.append(true);
2947        // 3: [null]
2948        first.values().append_null();
2949        first.append(true);
2950        // 4: null
2951        first.values().append_null(); // MASKED
2952        first.append(false);
2953        let first = Arc::new(first.finish()) as ArrayRef;
2954        let first_type = first.data_type().clone();
2955
2956        let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2957        // 0: [200]
2958        second.values().append_value(200);
2959        second.append(true);
2960        // 1: [201]
2961        second.values().append_value(201);
2962        second.append(true);
2963        // 2: [202]
2964        second.values().append_value(202);
2965        second.append(true);
2966        // 3: [null]
2967        second.values().append_null();
2968        second.append(true);
2969        // 4: null
2970        second.values().append_null(); // MASKED
2971        second.append(false);
2972        let second = Arc::new(second.finish()) as ArrayRef;
2973        let second_type = second.data_type().clone();
2974
2975        let converter = RowConverter::new(vec![
2976            SortField::new(first_type.clone()),
2977            SortField::new(second_type.clone()),
2978        ])
2979        .unwrap();
2980
2981        let rows = converter
2982            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
2983            .unwrap();
2984
2985        let back = converter.convert_rows(&rows).unwrap();
2986        assert_eq!(back.len(), 2);
2987        back[0].to_data().validate_full().unwrap();
2988        assert_eq!(&back[0], &first);
2989        back[1].to_data().validate_full().unwrap();
2990        assert_eq!(&back[1], &second);
2991    }
2992
2993    #[test]
2994    fn test_fixed_size_list_with_variable_width_content() {
2995        let mut first = FixedSizeListBuilder::new(
2996            StructBuilder::from_fields(
2997                vec![
2998                    Field::new(
2999                        "timestamp",
3000                        DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3001                        false,
3002                    ),
3003                    Field::new("offset_minutes", DataType::Int16, false),
3004                    Field::new("time_zone", DataType::Utf8, false),
3005                ],
3006                1,
3007            ),
3008            1,
3009        );
3010        // 0: null
3011        first
3012            .values()
3013            .field_builder::<TimestampMicrosecondBuilder>(0)
3014            .unwrap()
3015            .append_null();
3016        first
3017            .values()
3018            .field_builder::<Int16Builder>(1)
3019            .unwrap()
3020            .append_null();
3021        first
3022            .values()
3023            .field_builder::<StringBuilder>(2)
3024            .unwrap()
3025            .append_null();
3026        first.values().append(false);
3027        first.append(false);
3028        // 1: [null]
3029        first
3030            .values()
3031            .field_builder::<TimestampMicrosecondBuilder>(0)
3032            .unwrap()
3033            .append_null();
3034        first
3035            .values()
3036            .field_builder::<Int16Builder>(1)
3037            .unwrap()
3038            .append_null();
3039        first
3040            .values()
3041            .field_builder::<StringBuilder>(2)
3042            .unwrap()
3043            .append_null();
3044        first.values().append(false);
3045        first.append(true);
3046        // 2: [1970-01-01 00:00:00.000000 UTC]
3047        first
3048            .values()
3049            .field_builder::<TimestampMicrosecondBuilder>(0)
3050            .unwrap()
3051            .append_value(0);
3052        first
3053            .values()
3054            .field_builder::<Int16Builder>(1)
3055            .unwrap()
3056            .append_value(0);
3057        first
3058            .values()
3059            .field_builder::<StringBuilder>(2)
3060            .unwrap()
3061            .append_value("UTC");
3062        first.values().append(true);
3063        first.append(true);
3064        // 3: [2005-09-10 13:30:00.123456 Europe/Warsaw]
3065        first
3066            .values()
3067            .field_builder::<TimestampMicrosecondBuilder>(0)
3068            .unwrap()
3069            .append_value(1126351800123456);
3070        first
3071            .values()
3072            .field_builder::<Int16Builder>(1)
3073            .unwrap()
3074            .append_value(120);
3075        first
3076            .values()
3077            .field_builder::<StringBuilder>(2)
3078            .unwrap()
3079            .append_value("Europe/Warsaw");
3080        first.values().append(true);
3081        first.append(true);
3082        let first = Arc::new(first.finish()) as ArrayRef;
3083        let first_type = first.data_type().clone();
3084
3085        let mut second = StringBuilder::new();
3086        second.append_value("somewhere near");
3087        second.append_null();
3088        second.append_value("Greenwich");
3089        second.append_value("Warsaw");
3090        let second = Arc::new(second.finish()) as ArrayRef;
3091        let second_type = second.data_type().clone();
3092
3093        let converter = RowConverter::new(vec![
3094            SortField::new(first_type.clone()),
3095            SortField::new(second_type.clone()),
3096        ])
3097        .unwrap();
3098
3099        let rows = converter
3100            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3101            .unwrap();
3102
3103        let back = converter.convert_rows(&rows).unwrap();
3104        assert_eq!(back.len(), 2);
3105        back[0].to_data().validate_full().unwrap();
3106        assert_eq!(&back[0], &first);
3107        back[1].to_data().validate_full().unwrap();
3108        assert_eq!(&back[1], &second);
3109    }
3110
3111    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3112    where
3113        K: ArrowPrimitiveType,
3114        StandardUniform: Distribution<K::Native>,
3115    {
3116        let mut rng = rng();
3117        (0..len)
3118            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3119            .collect()
3120    }
3121
3122    fn generate_strings<O: OffsetSizeTrait>(
3123        len: usize,
3124        valid_percent: f64,
3125    ) -> GenericStringArray<O> {
3126        let mut rng = rng();
3127        (0..len)
3128            .map(|_| {
3129                rng.random_bool(valid_percent).then(|| {
3130                    let len = rng.random_range(0..100);
3131                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3132                    String::from_utf8(bytes).unwrap()
3133                })
3134            })
3135            .collect()
3136    }
3137
3138    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3139        let mut rng = rng();
3140        (0..len)
3141            .map(|_| {
3142                rng.random_bool(valid_percent).then(|| {
3143                    let len = rng.random_range(0..100);
3144                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3145                    String::from_utf8(bytes).unwrap()
3146                })
3147            })
3148            .collect()
3149    }
3150
3151    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3152        let mut rng = rng();
3153        (0..len)
3154            .map(|_| {
3155                rng.random_bool(valid_percent).then(|| {
3156                    let len = rng.random_range(0..100);
3157                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3158                    bytes
3159                })
3160            })
3161            .collect()
3162    }
3163
3164    fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3165        let edge_cases = vec![
3166            Some("bar".to_string()),
3167            Some("bar\0".to_string()),
3168            Some("LongerThan12Bytes".to_string()),
3169            Some("LongerThan12Bytez".to_string()),
3170            Some("LongerThan12Bytes\0".to_string()),
3171            Some("LongerThan12Byt".to_string()),
3172            Some("backend one".to_string()),
3173            Some("backend two".to_string()),
3174            Some("a".repeat(257)),
3175            Some("a".repeat(300)),
3176        ];
3177
3178        // Fill up to `len` by repeating edge cases and trimming
3179        let mut values = Vec::with_capacity(len);
3180        for i in 0..len {
3181            values.push(
3182                edge_cases
3183                    .get(i % edge_cases.len())
3184                    .cloned()
3185                    .unwrap_or(None),
3186            );
3187        }
3188
3189        StringViewArray::from(values)
3190    }
3191
3192    fn generate_dictionary<K>(
3193        values: ArrayRef,
3194        len: usize,
3195        valid_percent: f64,
3196    ) -> DictionaryArray<K>
3197    where
3198        K: ArrowDictionaryKeyType,
3199        K::Native: SampleUniform,
3200    {
3201        let mut rng = rng();
3202        let min_key = K::Native::from_usize(0).unwrap();
3203        let max_key = K::Native::from_usize(values.len()).unwrap();
3204        let keys: PrimitiveArray<K> = (0..len)
3205            .map(|_| {
3206                rng.random_bool(valid_percent)
3207                    .then(|| rng.random_range(min_key..max_key))
3208            })
3209            .collect();
3210
3211        let data_type =
3212            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3213
3214        let data = keys
3215            .into_data()
3216            .into_builder()
3217            .data_type(data_type)
3218            .add_child_data(values.to_data())
3219            .build()
3220            .unwrap();
3221
3222        DictionaryArray::from(data)
3223    }
3224
3225    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3226        let mut rng = rng();
3227        let width = rng.random_range(0..20);
3228        let mut builder = FixedSizeBinaryBuilder::new(width);
3229
3230        let mut b = vec![0; width as usize];
3231        for _ in 0..len {
3232            match rng.random_bool(valid_percent) {
3233                true => {
3234                    b.iter_mut().for_each(|x| *x = rng.random());
3235                    builder.append_value(&b).unwrap();
3236                }
3237                false => builder.append_null(),
3238            }
3239        }
3240
3241        builder.finish()
3242    }
3243
3244    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3245        let mut rng = rng();
3246        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3247        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3248        let b = generate_strings::<i32>(len, valid_percent);
3249        let fields = Fields::from(vec![
3250            Field::new("a", DataType::Int32, true),
3251            Field::new("b", DataType::Utf8, true),
3252        ]);
3253        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3254        StructArray::new(fields, values, Some(nulls))
3255    }
3256
3257    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3258    where
3259        F: FnOnce(usize) -> ArrayRef,
3260    {
3261        let mut rng = rng();
3262        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3263        let values_len = offsets.last().unwrap().to_usize().unwrap();
3264        let values = values(values_len);
3265        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3266        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3267        ListArray::new(field, offsets, values, Some(nulls))
3268    }
3269
3270    fn generate_column(len: usize) -> ArrayRef {
3271        let mut rng = rng();
3272        match rng.random_range(0..18) {
3273            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3274            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3275            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3276            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3277            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3278            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3279            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3280            7 => Arc::new(generate_dictionary::<Int64Type>(
3281                // Cannot test dictionaries containing null values because of #2687
3282                Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3283                len,
3284                0.8,
3285            )),
3286            8 => Arc::new(generate_dictionary::<Int64Type>(
3287                // Cannot test dictionaries containing null values because of #2687
3288                Arc::new(generate_primitive_array::<Int64Type>(
3289                    rng.random_range(1..len),
3290                    1.0,
3291                )),
3292                len,
3293                0.8,
3294            )),
3295            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3296            10 => Arc::new(generate_struct(len, 0.8)),
3297            11 => Arc::new(generate_list(len, 0.8, |values_len| {
3298                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3299            })),
3300            12 => Arc::new(generate_list(len, 0.8, |values_len| {
3301                Arc::new(generate_strings::<i32>(values_len, 0.8))
3302            })),
3303            13 => Arc::new(generate_list(len, 0.8, |values_len| {
3304                Arc::new(generate_struct(values_len, 0.8))
3305            })),
3306            14 => Arc::new(generate_string_view(len, 0.8)),
3307            15 => Arc::new(generate_byte_view(len, 0.8)),
3308            16 => Arc::new(generate_fixed_stringview_column(len)),
3309            17 => Arc::new(
3310                generate_list(len + 1000, 0.8, |values_len| {
3311                    Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3312                })
3313                .slice(500, len),
3314            ),
3315            _ => unreachable!(),
3316        }
3317    }
3318
3319    fn print_row(cols: &[SortColumn], row: usize) -> String {
3320        let t: Vec<_> = cols
3321            .iter()
3322            .map(|x| match x.values.is_valid(row) {
3323                true => {
3324                    let opts = FormatOptions::default().with_null("NULL");
3325                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3326                    formatter.value(row).to_string()
3327                }
3328                false => "NULL".to_string(),
3329            })
3330            .collect();
3331        t.join(",")
3332    }
3333
3334    fn print_col_types(cols: &[SortColumn]) -> String {
3335        let t: Vec<_> = cols
3336            .iter()
3337            .map(|x| x.values.data_type().to_string())
3338            .collect();
3339        t.join(",")
3340    }
3341
3342    #[test]
3343    #[cfg_attr(miri, ignore)]
3344    fn fuzz_test() {
3345        for _ in 0..100 {
3346            let mut rng = rng();
3347            let num_columns = rng.random_range(1..5);
3348            let len = rng.random_range(5..100);
3349            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3350
3351            let options: Vec<_> = (0..num_columns)
3352                .map(|_| SortOptions {
3353                    descending: rng.random_bool(0.5),
3354                    nulls_first: rng.random_bool(0.5),
3355                })
3356                .collect();
3357
3358            let sort_columns: Vec<_> = options
3359                .iter()
3360                .zip(&arrays)
3361                .map(|(o, c)| SortColumn {
3362                    values: Arc::clone(c),
3363                    options: Some(*o),
3364                })
3365                .collect();
3366
3367            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3368
3369            let columns: Vec<SortField> = options
3370                .into_iter()
3371                .zip(&arrays)
3372                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3373                .collect();
3374
3375            let converter = RowConverter::new(columns).unwrap();
3376            let rows = converter.convert_columns(&arrays).unwrap();
3377
3378            for i in 0..len {
3379                for j in 0..len {
3380                    let row_i = rows.row(i);
3381                    let row_j = rows.row(j);
3382                    let row_cmp = row_i.cmp(&row_j);
3383                    let lex_cmp = comparator.compare(i, j);
3384                    assert_eq!(
3385                        row_cmp,
3386                        lex_cmp,
3387                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3388                        print_row(&sort_columns, i),
3389                        print_row(&sort_columns, j),
3390                        row_i,
3391                        row_j,
3392                        print_col_types(&sort_columns)
3393                    );
3394                }
3395            }
3396
3397            // Convert rows produced from convert_columns().
3398            // Note: validate_utf8 is set to false since Row is initialized through empty_rows()
3399            let back = converter.convert_rows(&rows).unwrap();
3400            for (actual, expected) in back.iter().zip(&arrays) {
3401                actual.to_data().validate_full().unwrap();
3402                dictionary_eq(actual, expected)
3403            }
3404
3405            // Check that we can convert rows into ByteArray and then parse, convert it back to array
3406            // Note: validate_utf8 is set to true since Row is initialized through RowParser
3407            let rows = rows.try_into_binary().expect("reasonable size");
3408            let parser = converter.parser();
3409            let back = converter
3410                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3411                .unwrap();
3412            for (actual, expected) in back.iter().zip(&arrays) {
3413                actual.to_data().validate_full().unwrap();
3414                dictionary_eq(actual, expected)
3415            }
3416
3417            let rows = converter.from_binary(rows);
3418            let back = converter.convert_rows(&rows).unwrap();
3419            for (actual, expected) in back.iter().zip(&arrays) {
3420                actual.to_data().validate_full().unwrap();
3421                dictionary_eq(actual, expected)
3422            }
3423        }
3424    }
3425
3426    #[test]
3427    fn test_clear() {
3428        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3429        let mut rows = converter.empty_rows(3, 128);
3430
3431        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3432        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3433        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3434
3435        for array in arrays.iter() {
3436            rows.clear();
3437            converter
3438                .append(&mut rows, std::slice::from_ref(array))
3439                .unwrap();
3440            let back = converter.convert_rows(&rows).unwrap();
3441            assert_eq!(&back[0], array);
3442        }
3443
3444        let mut rows_expected = converter.empty_rows(3, 128);
3445        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3446
3447        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3448            assert_eq!(
3449                actual, expected,
3450                "For row {i}: expected {expected:?}, actual: {actual:?}",
3451            );
3452        }
3453    }
3454
3455    #[test]
3456    fn test_append_codec_dictionary_binary() {
3457        use DataType::*;
3458        // Dictionary RowConverter
3459        let converter = RowConverter::new(vec![SortField::new(Dictionary(
3460            Box::new(Int32),
3461            Box::new(Binary),
3462        ))])
3463        .unwrap();
3464        let mut rows = converter.empty_rows(4, 128);
3465
3466        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3467        let values = BinaryArray::from(vec![
3468            Some("a".as_bytes()),
3469            Some(b"b"),
3470            Some(b"c"),
3471            Some(b"d"),
3472        ]);
3473        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3474
3475        rows.clear();
3476        let array = Arc::new(dict_array) as ArrayRef;
3477        converter
3478            .append(&mut rows, std::slice::from_ref(&array))
3479            .unwrap();
3480        let back = converter.convert_rows(&rows).unwrap();
3481
3482        dictionary_eq(&back[0], &array);
3483    }
3484
3485    #[test]
3486    fn test_list_prefix() {
3487        let mut a = ListBuilder::new(Int8Builder::new());
3488        a.append_value([None]);
3489        a.append_value([None, None]);
3490        let a = a.finish();
3491
3492        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3493        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3494        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3495    }
3496
3497    #[test]
3498    fn map_should_be_marked_as_unsupported() {
3499        let map_data_type = Field::new_map(
3500            "map",
3501            "entries",
3502            Field::new("key", DataType::Utf8, false),
3503            Field::new("value", DataType::Utf8, true),
3504            false,
3505            true,
3506        )
3507        .data_type()
3508        .clone();
3509
3510        let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3511
3512        assert!(!is_supported, "Map should not be supported");
3513    }
3514
3515    #[test]
3516    fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3517        let map_data_type = Field::new_map(
3518            "map",
3519            "entries",
3520            Field::new("key", DataType::Utf8, false),
3521            Field::new("value", DataType::Utf8, true),
3522            false,
3523            true,
3524        )
3525        .data_type()
3526        .clone();
3527
3528        let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3529
3530        match converter {
3531            Err(ArrowError::NotYetImplemented(message)) => {
3532                assert!(
3533                    message.contains("Row format support not yet implemented for"),
3534                    "Expected NotYetImplemented error for map data type, got: {message}",
3535                );
3536            }
3537            Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3538            Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3539        }
3540    }
3541
3542    #[test]
3543    fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3544        fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3545            // 1. Convert cols into rows
3546            let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3547
3548            // 2a. Convert rows into colsa (validate_utf8 = false)
3549            let rows = converter.convert_columns(&[col]).unwrap();
3550            let converted = converter.convert_rows(&rows).unwrap();
3551            let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3552
3553            // 2b. Convert rows into cols (validate_utf8 = true since Row is initialized through RowParser)
3554            let rows = rows.try_into_binary().expect("reasonable size");
3555            let parser = converter.parser();
3556            let converted = converter
3557                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3558                .unwrap();
3559            let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3560            (unchecked_values_len, checked_values_len)
3561        }
3562
3563        // Case1. StringViewArray with inline strings
3564        let col = Arc::new(StringViewArray::from_iter([
3565            Some("hello"), // short(5)
3566            None,          // null
3567            Some("short"), // short(5)
3568            Some("tiny"),  // short(4)
3569        ])) as ArrayRef;
3570
3571        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3572        // Since there are no long (>12) strings, len of values buffer is 0
3573        assert_eq!(unchecked_values_len, 0);
3574        // When utf8 validation enabled, values buffer includes inline strings (5+5+4)
3575        assert_eq!(checked_values_len, 14);
3576
3577        // Case2. StringViewArray with long(>12) strings
3578        let col = Arc::new(StringViewArray::from_iter([
3579            Some("this is a very long string over 12 bytes"),
3580            Some("another long string to test the buffer"),
3581        ])) as ArrayRef;
3582
3583        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3584        // Since there are no inline strings, expected length of values buffer is the same
3585        assert!(unchecked_values_len > 0);
3586        assert_eq!(unchecked_values_len, checked_values_len);
3587
3588        // Case3. StringViewArray with both short and long strings
3589        let col = Arc::new(StringViewArray::from_iter([
3590            Some("tiny"),          // 4 (short)
3591            Some("thisisexact13"), // 13 (long)
3592            None,
3593            Some("short"), // 5 (short)
3594        ])) as ArrayRef;
3595
3596        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3597        // Since there is single long string, len of values buffer is 13
3598        assert_eq!(unchecked_values_len, 13);
3599        assert!(checked_values_len > unchecked_values_len);
3600    }
3601}