Skip to main content

arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexicographic Sorts (lexsort)
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! # Flattening Dictionaries
121//!
122//! For performance reasons, dictionary arrays are flattened ("hydrated") to their
123//! underlying values during row conversion. See [the issue] for more details.
124//!
125//! This means that the arrays that come out of [`RowConverter::convert_rows`]
126//! may not have the same data types as the input arrays. For example, encoding
127//! a `Dictionary<Int8, Utf8>` and then will come out as a `Utf8` array.
128//!
129//! ```
130//! # use arrow_array::{Array, ArrayRef, DictionaryArray};
131//! # use arrow_array::types::Int8Type;
132//! # use arrow_row::{RowConverter, SortField};
133//! # use arrow_schema::DataType;
134//! # use std::sync::Arc;
135//! // Input is a Dictionary array
136//! let dict: DictionaryArray::<Int8Type> = ["a", "b", "c", "a", "b"].into_iter().collect();
137//! let sort_fields = vec![SortField::new(dict.data_type().clone())];
138//! let arrays = vec![Arc::new(dict) as ArrayRef];
139//! let converter = RowConverter::new(sort_fields).unwrap();
140//! // Convert to rows
141//! let rows = converter.convert_columns(&arrays).unwrap();
142//! let converted = converter.convert_rows(&rows).unwrap();
143//! // result was a Utf8 array, not a Dictionary array
144//! assert_eq!(converted[0].data_type(), &DataType::Utf8);
145//! ```
146//!
147//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
148//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
149//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
150//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
151//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
152//! [compared]: PartialOrd
153//! [compare]: PartialOrd
154//! [the issue]: https://github.com/apache/arrow-rs/issues/4811
155
156#![doc(
157    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::iter::Map;
165use std::slice::Windows;
166use std::sync::Arc;
167
168use arrow_array::cast::*;
169use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType, ByteViewType};
170use arrow_array::*;
171use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
172use arrow_data::{ArrayData, ArrayDataBuilder};
173use arrow_schema::*;
174use variable::{decode_binary_view, decode_string_view};
175
176use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
177use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
178use crate::variable::{decode_binary, decode_string};
179use arrow_array::types::{Int16Type, Int32Type, Int64Type};
180
181mod fixed;
182mod list;
183mod run;
184mod variable;
185
186/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
187///
188/// *Note: The encoding of the row format may change from release to release.*
189///
190/// ## Overview
191///
192/// The row format is a variable length byte sequence created by
193/// concatenating the encoded form of each column. The encoding for
194/// each column depends on its datatype (and sort options).
195///
196/// The encoding is carefully designed in such a way that escaping is
197/// unnecessary: it is never ambiguous as to whether a byte is part of
198/// a sentinel (e.g. null) or a value.
199///
200/// ## Unsigned Integer Encoding
201///
202/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
203/// to the integer's length.
204///
205/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
206/// integer.
207///
208/// ```text
209///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
210///    3          │03│00│00│00│      │01│00│00│00│03│
211///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
212///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
213///   258         │02│01│00│00│      │01│00│00│01│02│
214///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
215///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
216///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
217///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
218///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
219///  NULL         │??│??│??│??│      │00│00│00│00│00│
220///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
221///
222///              32-bit (4 bytes)        Row Format
223///  Value        Little Endian
224/// ```
225///
226/// ## Signed Integer Encoding
227///
228/// Signed integers have their most significant sign bit flipped, and are then encoded in the
229/// same manner as an unsigned integer.
230///
231/// ```text
232///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
233///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
234///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
235///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
236///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
237///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
238///
239///  Value  32-bit (4 bytes)    High bit flipped      Row Format
240///          Little Endian
241/// ```
242///
243/// ## Float Encoding
244///
245/// Floats are converted from IEEE 754 representation to a signed integer representation
246/// by flipping all bar the sign bit if they are negative.
247///
248/// They are then encoded in the same manner as a signed integer.
249///
250/// ## Fixed Length Bytes Encoding
251///
252/// Fixed length bytes are encoded in the same fashion as primitive types above.
253///
254/// For a fixed length array of length `n`:
255///
256/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
257///
258/// A valid value is encoded as `1_u8` followed by the value bytes
259///
260/// ## Variable Length Bytes (including Strings) Encoding
261///
262/// A null is encoded as a `0_u8`.
263///
264/// An empty byte array is encoded as `1_u8`.
265///
266/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
267/// encoded using a block based scheme described below.
268///
269/// The byte array is broken up into fixed-width blocks, each block is written in turn
270/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
271/// with `0_u8` and written to the output, followed by the un-padded length in bytes
272/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
273/// blocks using a length of 32, this is to reduce space amplification for small strings.
274///
275/// Note the following example encodings use a block size of 4 bytes for brevity:
276///
277/// ```text
278///                       ┌───┬───┬───┬───┬───┬───┐
279///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
280///                       └───┴───┴───┴───┴───┴───┘
281///
282///                       ┌───┐
283///  ""                   │01 |
284///                       └───┘
285///
286///  NULL                 ┌───┐
287///                       │00 │
288///                       └───┘
289///
290/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
291///                       │02 │'D'│'e'│'f'│'e'│FF │
292///                       └───┼───┼───┼───┼───┼───┤
293///                           │'n'│'e'│'s'│'t'│FF │
294///                           ├───┼───┼───┼───┼───┤
295///                           │'r'│'a'│'t'│'r'│FF │
296///                           ├───┼───┼───┼───┼───┤
297///                           │'a'│'t'│'i'│'o'│FF │
298///                           ├───┼───┼───┼───┼───┤
299///                           │'n'│00 │00 │00 │01 │
300///                           └───┴───┴───┴───┴───┘
301/// ```
302///
303/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
304/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
305///
306/// ## Dictionary Encoding
307///
308/// Dictionary encoded arrays are hydrated to their underlying values
309///
310/// ## REE Encoding
311///
312/// REE (Run End Encoding) arrays, A form of Run Length Encoding, are hydrated to their underlying values.
313///
314/// ## Struct Encoding
315///
316/// A null is encoded as a `0_u8`.
317///
318/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
319///
320/// This encoding effectively flattens the schema in a depth-first fashion.
321///
322/// For example
323///
324/// ```text
325/// ┌───────┬────────────────────────┬───────┐
326/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
327/// └───────┴────────────────────────┴───────┘
328/// ```
329///
330/// Is encoded as
331///
332/// ```text
333/// ┌───────┬───────────────┬───────┬─────────┬───────┐
334/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
335/// └───────┴───────────────┴───────┴─────────┴───────┘
336/// ```
337///
338/// ## List Encoding
339///
340/// Lists are encoded by first encoding all child elements to the row format.
341///
342/// A list value is then encoded as the concatenation of each of the child elements,
343/// separately encoded using the variable length encoding described above, followed
344/// by the variable length encoding of an empty byte array.
345///
346/// For example given:
347///
348/// ```text
349/// [1_u8, 2_u8, 3_u8]
350/// [1_u8, null]
351/// []
352/// null
353/// ```
354///
355/// The elements would be converted to:
356///
357/// ```text
358///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
359///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
360///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
361///```
362///
363/// Which would be encoded as
364///
365/// ```text
366///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
367///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
368///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
369///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
370///
371///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
372///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
373///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
374///                          └──── 1_u8 ────┘   └──── null ────┘
375///
376///```
377///
378/// With `[]` represented by an empty byte array, and `null` a null byte array.
379///
380/// ## Fixed Size List Encoding
381///
382/// Fixed Size Lists are encoded by first encoding all child elements to the row format.
383///
384/// A non-null list value is then encoded as 0x01 followed by the concatenation of each
385/// of the child elements. A null list value is encoded as a null marker.
386///
387/// For example given:
388///
389/// ```text
390/// [1_u8, 2_u8]
391/// [3_u8, null]
392/// null
393/// ```
394///
395/// The elements would be converted to:
396///
397/// ```text
398///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
399///  1  │01│01│  2  │01│02│  3  │01│03│  null  │00│00│
400///     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
401///```
402///
403/// Which would be encoded as
404///
405/// ```text
406///                 ┌──┬──┬──┬──┬──┐
407///  [1_u8, 2_u8]   │01│01│01│01│02│
408///                 └──┴──┴──┴──┴──┘
409///                     └ 1 ┘ └ 2 ┘
410///                 ┌──┬──┬──┬──┬──┐
411///  [3_u8, null]   │01│01│03│00│00│
412///                 └──┴──┴──┴──┴──┘
413///                     └ 1 ┘ └null┘
414///                 ┌──┐
415///  null           │00│
416///                 └──┘
417///
418///```
419///
420/// ## ListView Encoding
421///
422/// ListView arrays differ from List arrays in their representation: instead of using
423/// consecutive offset pairs to define each list, ListView uses explicit offset and size
424/// pairs for each element. This allows ListView elements to reference arbitrary (potentially
425/// overlapping) regions of the child array.
426///
427/// Despite this structural difference, ListView uses the **same row encoding as List**.
428/// Each list value is encoded as the concatenation of its child elements (each separately
429/// variable-length encoded), followed by a variable-length encoded empty byte array terminator.
430///
431/// **Important**: When a ListView is decoded back from row format, it is still a
432/// ListView, but any child element sharing that may have existed in the original
433/// (where multiple list entries could reference overlapping regions of the child
434/// array) is **not preserved** - each list's children are decoded independently
435/// with sequential offsets.
436///
437/// For example, given a ListView with offset/size pairs:
438///
439/// ```text
440/// offsets: [0, 1, 0]
441/// sizes:   [2, 2, 0]
442/// values:  [1_u8, 2_u8, 3_u8]
443///
444/// Resulting lists:
445/// [1_u8, 2_u8]  (offset=0, size=2 -> values[0..2])
446/// [2_u8, 3_u8]  (offset=1, size=2 -> values[1..3])
447/// []            (offset=0, size=0 -> empty)
448/// ```
449///
450/// The elements would be encoded identically to List encoding:
451///
452/// ```text
453///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
454///  [1_u8, 2_u8]           │02│01│01│00│00│02│02│01│02│00│00│02│01│
455///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
456///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘
457///
458///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
459///  [2_u8, 3_u8]           │02│01│02│00│00│02│02│01│03│00│00│02│01│
460///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
461///                          └──── 2_u8 ────┘   └──── 3_u8 ────┘
462///
463///```
464///
465/// Note that element `2_u8` appears in both encoded rows, even though it was shared
466/// in the original ListView, and `[]` is represented by an empty byte array.
467///
468/// ## Union Encoding
469///
470/// A union value is encoded as a single type-id byte followed by the row encoding of the selected child value.
471/// The type-id byte is always present; union arrays have no top-level null marker, so nulls are represented by the child encoding.
472///
473/// For example, given a union of Int32 (type_id = 0) and Utf8 (type_id = 1):
474///
475/// ```text
476///                           ┌──┬──────────────┐
477///  3                        │00│01│80│00│00│03│
478///                           └──┴──────────────┘
479///                            │  └─ signed integer encoding (non-null)
480///                            └──── type_id
481///
482///                           ┌──┬────────────────────────────────┐
483/// "abc"                     │01│02│'a'│'b'│'c'│00│00│00│00│00│03│
484///                           └──┴────────────────────────────────┘
485///                            │  └─ string encoding (non-null)
486///                            └──── type_id
487///
488///                           ┌──┬──────────────┐
489/// null Int32                │00│00│00│00│00│00│
490///                           └──┴──────────────┘
491///                            │  └─ signed integer encoding (null)
492///                            └──── type_id
493///
494///                           ┌──┬──┐
495/// null Utf8                 │01│00│
496///                           └──┴──┘
497///                            │  └─ string encoding (null)
498///                            └──── type_id
499/// ```
500///
501/// See [`UnionArray`] for more details on union types.
502///
503/// # Ordering
504///
505/// ## Float Ordering
506///
507/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
508/// in the IEEE 754 (2008 revision) floating point standard.
509///
510/// The ordering established by this does not always agree with the
511/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
512/// they consider negative and positive zero equal, while this does not
513///
514/// ## Null Ordering
515///
516/// The encoding described above will order nulls first, this can be inverted by representing
517/// nulls as `0xFF_u8` instead of `0_u8`
518///
519/// ## Union Ordering
520///
521/// Values of the same type are ordered according to the ordering of that type.
522/// Values of different types are ordered by their type id.
523/// The type_id is negated when descending order is specified.
524///
525/// ## Reverse Column Ordering
526///
527/// The order of a given column can be reversed by negating the encoded bytes of non-null values
528///
529/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
530/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
531#[derive(Debug)]
532pub struct RowConverter {
533    fields: Arc<[SortField]>,
534    /// State for codecs
535    codecs: Vec<Codec>,
536}
537
538#[derive(Debug)]
539enum Codec {
540    /// No additional codec state is necessary
541    Stateless,
542    /// A row converter for the dictionary values
543    /// and the encoding of a row containing only nulls
544    Dictionary(RowConverter, OwnedRow),
545    /// A row converter for the child fields
546    /// and the encoding of a row containing only nulls
547    Struct(RowConverter, OwnedRow),
548    /// A row converter for the child field
549    List(RowConverter),
550    /// A row converter for the values array of a run-end encoded array
551    RunEndEncoded(RowConverter),
552    /// Row converters for each union field (indexed by field position)
553    /// the type_ids for each field position, and the encoding of null rows for each field
554    Union(Vec<RowConverter>, Vec<i8>, Vec<OwnedRow>),
555}
556
557/// Computes the minimum offset and maximum end (offset + size) for a ListView array.
558/// Returns (min_offset, max_end) which can be used to slice the values array.
559fn compute_list_view_bounds<O: OffsetSizeTrait>(array: &GenericListViewArray<O>) -> (usize, usize) {
560    if array.is_empty() {
561        return (0, 0);
562    }
563
564    let offsets = array.value_offsets();
565    let sizes = array.value_sizes();
566    let values_len = array.values().len();
567
568    let mut min_offset = usize::MAX;
569    let mut max_end = 0usize;
570
571    for i in 0..array.len() {
572        let offset = offsets[i].as_usize();
573        let size = sizes[i].as_usize();
574        let end = offset + size;
575
576        if size > 0 {
577            min_offset = min_offset.min(offset);
578            max_end = max_end.max(end);
579        }
580
581        // Early exit if we've found the full range of the values array. This is possible with
582        // ListViews since offsets and sizes are arbitrary and the full range can be covered early
583        // in the iteration contrary to regular Lists.
584        if min_offset == 0 && max_end == values_len {
585            break;
586        }
587    }
588
589    if min_offset == usize::MAX {
590        // All lists are empty
591        (0, 0)
592    } else {
593        (min_offset, max_end)
594    }
595}
596
597impl Codec {
598    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
599        match &sort_field.data_type {
600            DataType::Dictionary(_, values) => {
601                let sort_field =
602                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
603
604                let converter = RowConverter::new(vec![sort_field])?;
605                let null_array = new_null_array(values.as_ref(), 1);
606                let nulls = converter.convert_columns(&[null_array])?;
607
608                let owned = OwnedRow {
609                    data: nulls.buffer.into(),
610                    config: nulls.config,
611                };
612                Ok(Self::Dictionary(converter, owned))
613            }
614            DataType::RunEndEncoded(_, values) => {
615                // Similar to List implementation
616                let options = SortOptions {
617                    descending: false,
618                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
619                };
620
621                let field = SortField::new_with_options(values.data_type().clone(), options);
622                let converter = RowConverter::new(vec![field])?;
623                Ok(Self::RunEndEncoded(converter))
624            }
625            d if !d.is_nested() => Ok(Self::Stateless),
626            DataType::List(f)
627            | DataType::LargeList(f)
628            | DataType::ListView(f)
629            | DataType::LargeListView(f) => {
630                // The encoded contents will be inverted if descending is set to true
631                // As such we set `descending` to false and negate nulls first if it
632                // it set to true
633                let options = SortOptions {
634                    descending: false,
635                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
636                };
637
638                let field = SortField::new_with_options(f.data_type().clone(), options);
639                let converter = RowConverter::new(vec![field])?;
640                Ok(Self::List(converter))
641            }
642            DataType::FixedSizeList(f, _) => {
643                let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
644                let converter = RowConverter::new(vec![field])?;
645                Ok(Self::List(converter))
646            }
647            DataType::Struct(f) => {
648                let sort_fields = f
649                    .iter()
650                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
651                    .collect();
652
653                let converter = RowConverter::new(sort_fields)?;
654                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
655
656                let nulls = converter.convert_columns(&nulls)?;
657                let owned = OwnedRow {
658                    data: nulls.buffer.into(),
659                    config: nulls.config,
660                };
661
662                Ok(Self::Struct(converter, owned))
663            }
664            DataType::Union(fields, _mode) => {
665                // similar to dictionaries and lists, we set descending to false and negate nulls_first
666                // since the encoded contents will be inverted if descending is set
667                let options = SortOptions {
668                    descending: false,
669                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
670                };
671
672                let mut converters = Vec::with_capacity(fields.len());
673                let mut type_ids = Vec::with_capacity(fields.len());
674                let mut null_rows = Vec::with_capacity(fields.len());
675
676                for (type_id, field) in fields.iter() {
677                    let sort_field =
678                        SortField::new_with_options(field.data_type().clone(), options);
679                    let converter = RowConverter::new(vec![sort_field])?;
680
681                    let null_array = new_null_array(field.data_type(), 1);
682                    let nulls = converter.convert_columns(&[null_array])?;
683                    let owned = OwnedRow {
684                        data: nulls.buffer.into(),
685                        config: nulls.config,
686                    };
687
688                    converters.push(converter);
689                    type_ids.push(type_id);
690                    null_rows.push(owned);
691                }
692
693                Ok(Self::Union(converters, type_ids, null_rows))
694            }
695            _ => Err(ArrowError::NotYetImplemented(format!(
696                "not yet implemented: {:?}",
697                sort_field.data_type
698            ))),
699        }
700    }
701
702    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
703        match self {
704            Codec::Stateless => Ok(Encoder::Stateless),
705            Codec::Dictionary(converter, nulls) => {
706                let values = array.as_any_dictionary().values().clone();
707                let rows = converter.convert_columns(&[values])?;
708                Ok(Encoder::Dictionary(rows, nulls.row()))
709            }
710            Codec::Struct(converter, null) => {
711                let v = as_struct_array(array);
712                let rows = converter.convert_columns(v.columns())?;
713                Ok(Encoder::Struct(rows, null.row()))
714            }
715            Codec::List(converter) => {
716                let values = match array.data_type() {
717                    DataType::List(_) => {
718                        let list_array = as_list_array(array);
719                        let first_offset = list_array.offsets()[0] as usize;
720                        let last_offset =
721                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
722
723                        // values can include more data than referenced in the ListArray, only encode
724                        // the referenced values.
725                        list_array
726                            .values()
727                            .slice(first_offset, last_offset - first_offset)
728                    }
729                    DataType::LargeList(_) => {
730                        let list_array = as_large_list_array(array);
731
732                        let first_offset = list_array.offsets()[0] as usize;
733                        let last_offset =
734                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
735
736                        // values can include more data than referenced in the LargeListArray, only encode
737                        // the referenced values.
738                        list_array
739                            .values()
740                            .slice(first_offset, last_offset - first_offset)
741                    }
742                    DataType::ListView(_) => {
743                        let list_view_array = array.as_list_view::<i32>();
744                        let (min_offset, max_end) = compute_list_view_bounds(list_view_array);
745                        list_view_array
746                            .values()
747                            .slice(min_offset, max_end - min_offset)
748                    }
749                    DataType::LargeListView(_) => {
750                        let list_view_array = array.as_list_view::<i64>();
751                        let (min_offset, max_end) = compute_list_view_bounds(list_view_array);
752                        list_view_array
753                            .values()
754                            .slice(min_offset, max_end - min_offset)
755                    }
756                    DataType::FixedSizeList(_, _) => {
757                        as_fixed_size_list_array(array).values().clone()
758                    }
759                    _ => unreachable!(),
760                };
761                let rows = converter.convert_columns(&[values])?;
762                Ok(Encoder::List(rows))
763            }
764            Codec::RunEndEncoded(converter) => {
765                let values = match array.data_type() {
766                    DataType::RunEndEncoded(r, _) => match r.data_type() {
767                        DataType::Int16 => array.as_run::<Int16Type>().values_slice(),
768                        DataType::Int32 => array.as_run::<Int32Type>().values_slice(),
769                        DataType::Int64 => array.as_run::<Int64Type>().values_slice(),
770                        _ => unreachable!("Unsupported run end index type: {r:?}"),
771                    },
772                    _ => unreachable!(),
773                };
774                let rows = converter.convert_columns(std::slice::from_ref(&values))?;
775                Ok(Encoder::RunEndEncoded(rows))
776            }
777            Codec::Union(converters, field_to_type_ids, _) => {
778                let union_array = array
779                    .as_any()
780                    .downcast_ref::<UnionArray>()
781                    .expect("expected Union array");
782
783                let type_ids = union_array.type_ids().clone();
784                let offsets = union_array.offsets().cloned();
785
786                let mut child_rows = Vec::with_capacity(converters.len());
787                for (field_idx, converter) in converters.iter().enumerate() {
788                    let type_id = field_to_type_ids[field_idx];
789                    let child_array = union_array.child(type_id);
790                    let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
791                    child_rows.push(rows);
792                }
793
794                Ok(Encoder::Union {
795                    child_rows,
796                    field_to_type_ids: field_to_type_ids.clone(),
797                    type_ids,
798                    offsets,
799                })
800            }
801        }
802    }
803
804    fn size(&self) -> usize {
805        match self {
806            Codec::Stateless => 0,
807            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
808            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
809            Codec::List(converter) => converter.size(),
810            Codec::RunEndEncoded(converter) => converter.size(),
811            Codec::Union(converters, _, null_rows) => {
812                converters.iter().map(|c| c.size()).sum::<usize>()
813                    + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
814            }
815        }
816    }
817}
818
819#[derive(Debug)]
820enum Encoder<'a> {
821    /// No additional encoder state is necessary
822    Stateless,
823    /// The encoding of the child array and the encoding of a null row
824    Dictionary(Rows, Row<'a>),
825    /// The row encoding of the child arrays and the encoding of a null row
826    ///
827    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
828    /// values that are masked by a null in the parent StructArray, otherwise
829    /// this would establish an ordering between semantically null values
830    Struct(Rows, Row<'a>),
831    /// The row encoding of the child array
832    List(Rows),
833    /// The row encoding of the values array
834    RunEndEncoded(Rows),
835    /// The row encoding of each union field's child array, type_ids buffer, offsets buffer (for Dense), and mode
836    Union {
837        child_rows: Vec<Rows>,
838        field_to_type_ids: Vec<i8>,
839        type_ids: ScalarBuffer<i8>,
840        offsets: Option<ScalarBuffer<i32>>,
841    },
842}
843
844/// Configure the data type and sort order for a given column
845#[derive(Debug, Clone, PartialEq, Eq)]
846pub struct SortField {
847    /// Sort options
848    options: SortOptions,
849    /// Data type
850    data_type: DataType,
851}
852
853impl SortField {
854    /// Create a new column with the given data type
855    pub fn new(data_type: DataType) -> Self {
856        Self::new_with_options(data_type, Default::default())
857    }
858
859    /// Create a new column with the given data type and [`SortOptions`]
860    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
861        Self { options, data_type }
862    }
863
864    /// Return size of this instance in bytes.
865    ///
866    /// Includes the size of `Self`.
867    pub fn size(&self) -> usize {
868        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
869    }
870}
871
872impl RowConverter {
873    /// Create a new [`RowConverter`] with the provided schema
874    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
875        if !Self::supports_fields(&fields) {
876            return Err(ArrowError::NotYetImplemented(format!(
877                "Row format support not yet implemented for: {fields:?}"
878            )));
879        }
880
881        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
882        Ok(Self {
883            fields: fields.into(),
884            codecs,
885        })
886    }
887
888    /// Check if the given fields are supported by the row format.
889    pub fn supports_fields(fields: &[SortField]) -> bool {
890        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
891    }
892
893    fn supports_datatype(d: &DataType) -> bool {
894        match d {
895            _ if !d.is_nested() => true,
896            DataType::List(f)
897            | DataType::LargeList(f)
898            | DataType::ListView(f)
899            | DataType::LargeListView(f)
900            | DataType::FixedSizeList(f, _) => Self::supports_datatype(f.data_type()),
901            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
902            DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
903            DataType::Union(fs, _mode) => fs
904                .iter()
905                .all(|(_, f)| Self::supports_datatype(f.data_type())),
906            _ => false,
907        }
908    }
909
910    /// Convert [`ArrayRef`] columns into [`Rows`]
911    ///
912    /// See [`Row`] for information on when [`Row`] can be compared
913    ///
914    /// See [`Self::convert_rows`] for converting [`Rows`] back into [`ArrayRef`]
915    ///
916    /// # Panics
917    ///
918    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
919    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
920        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
921        let mut rows = self.empty_rows(num_rows, 0);
922        self.append(&mut rows, columns)?;
923        Ok(rows)
924    }
925
926    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
927    ///
928    /// See [`Row`] for information on when [`Row`] can be compared
929    ///
930    /// # Panics
931    ///
932    /// Panics if
933    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
934    /// * The provided [`Rows`] were not created by this [`RowConverter`]
935    ///
936    /// ```
937    /// # use std::sync::Arc;
938    /// # use std::collections::HashSet;
939    /// # use arrow_array::cast::AsArray;
940    /// # use arrow_array::StringArray;
941    /// # use arrow_row::{Row, RowConverter, SortField};
942    /// # use arrow_schema::DataType;
943    /// #
944    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
945    /// let a1 = StringArray::from(vec!["hello", "world"]);
946    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
947    ///
948    /// let mut rows = converter.empty_rows(5, 128);
949    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
950    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
951    ///
952    /// let back = converter.convert_rows(&rows).unwrap();
953    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
954    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
955    /// ```
956    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
957        assert!(
958            Arc::ptr_eq(&rows.config.fields, &self.fields),
959            "rows were not produced by this RowConverter"
960        );
961
962        if columns.len() != self.fields.len() {
963            return Err(ArrowError::InvalidArgumentError(format!(
964                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
965                self.fields.len(),
966                columns.len()
967            )));
968        }
969        for colum in columns.iter().skip(1) {
970            if colum.len() != columns[0].len() {
971                return Err(ArrowError::InvalidArgumentError(format!(
972                    "RowConverter columns must all have the same length, expected {} got {}",
973                    columns[0].len(),
974                    colum.len()
975                )));
976            }
977        }
978
979        let encoders = columns
980            .iter()
981            .zip(&self.codecs)
982            .zip(self.fields.iter())
983            .map(|((column, codec), field)| {
984                if !column.data_type().equals_datatype(&field.data_type) {
985                    return Err(ArrowError::InvalidArgumentError(format!(
986                        "RowConverter column schema mismatch, expected {} got {}",
987                        field.data_type,
988                        column.data_type()
989                    )));
990                }
991                codec.encoder(column.as_ref())
992            })
993            .collect::<Result<Vec<_>, _>>()?;
994
995        let write_offset = rows.num_rows();
996        let lengths = row_lengths(columns, &encoders);
997        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
998        rows.buffer.resize(total, 0);
999
1000        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
1001            // We encode a column at a time to minimise dispatch overheads
1002            encode_column(
1003                &mut rows.buffer,
1004                &mut rows.offsets[write_offset..],
1005                column.as_ref(),
1006                field.options,
1007                &encoder,
1008            )
1009        }
1010
1011        if cfg!(debug_assertions) {
1012            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
1013            rows.offsets
1014                .windows(2)
1015                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
1016        }
1017
1018        Ok(())
1019    }
1020
1021    /// Convert [`Rows`] columns into [`ArrayRef`]
1022    ///
1023    /// See [`Self::convert_columns`] for converting [`ArrayRef`] into [`Rows`]
1024    ///
1025    /// # Panics
1026    ///
1027    /// Panics if the rows were not produced by this [`RowConverter`]
1028    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
1029    where
1030        I: IntoIterator<Item = Row<'a>>,
1031    {
1032        let mut validate_utf8 = false;
1033        let mut rows: Vec<_> = rows
1034            .into_iter()
1035            .map(|row| {
1036                assert!(
1037                    Arc::ptr_eq(&row.config.fields, &self.fields),
1038                    "rows were not produced by this RowConverter"
1039                );
1040                validate_utf8 |= row.config.validate_utf8;
1041                row.data
1042            })
1043            .collect();
1044
1045        // SAFETY
1046        // We have validated that the rows came from this [`RowConverter`]
1047        // and therefore must be valid
1048        let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
1049
1050        if cfg!(debug_assertions) {
1051            for (i, row) in rows.iter().enumerate() {
1052                if !row.is_empty() {
1053                    return Err(ArrowError::InvalidArgumentError(format!(
1054                        "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
1055                        codecs = &self.codecs
1056                    )));
1057                }
1058            }
1059        }
1060
1061        Ok(result)
1062    }
1063
1064    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
1065    /// a total length of `data_capacity`
1066    ///
1067    /// This can be used to buffer a selection of [`Row`]
1068    ///
1069    /// ```
1070    /// # use std::sync::Arc;
1071    /// # use std::collections::HashSet;
1072    /// # use arrow_array::cast::AsArray;
1073    /// # use arrow_array::StringArray;
1074    /// # use arrow_row::{Row, RowConverter, SortField};
1075    /// # use arrow_schema::DataType;
1076    /// #
1077    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1078    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1079    ///
1080    /// // Convert to row format and deduplicate
1081    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
1082    /// let mut distinct_rows = converter.empty_rows(3, 100);
1083    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
1084    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
1085    ///
1086    /// // Note: we could skip buffering and feed the filtered iterator directly
1087    /// // into convert_rows, this is done for demonstration purposes only
1088    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
1089    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
1090    /// assert_eq!(&values, &["hello", "world", "a"]);
1091    /// ```
1092    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
1093        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
1094        offsets.push(0);
1095
1096        Rows {
1097            offsets,
1098            buffer: Vec::with_capacity(data_capacity),
1099            config: RowConfig {
1100                fields: self.fields.clone(),
1101                validate_utf8: false,
1102            },
1103        }
1104    }
1105
1106    /// Create a new [Rows] instance from the given binary data.
1107    ///
1108    /// ```
1109    /// # use std::sync::Arc;
1110    /// # use std::collections::HashSet;
1111    /// # use arrow_array::cast::AsArray;
1112    /// # use arrow_array::StringArray;
1113    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
1114    /// # use arrow_schema::DataType;
1115    /// #
1116    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1117    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1118    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1119    ///
1120    /// // We can convert rows into binary format and back in batch.
1121    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1122    /// let binary = rows.try_into_binary().expect("known-small array");
1123    /// let converted = converter.from_binary(binary.clone());
1124    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
1125    /// ```
1126    ///
1127    /// # Panics
1128    ///
1129    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
1130    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
1131    /// panic if the data is malformed.
1132    pub fn from_binary(&self, array: BinaryArray) -> Rows {
1133        assert_eq!(
1134            array.null_count(),
1135            0,
1136            "can't construct Rows instance from array with nulls"
1137        );
1138        let (offsets, values, _) = array.into_parts();
1139        let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
1140        // Try zero-copy, if it does not succeed, fall back to copying the values.
1141        let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
1142        Rows {
1143            buffer,
1144            offsets,
1145            config: RowConfig {
1146                fields: Arc::clone(&self.fields),
1147                validate_utf8: true,
1148            },
1149        }
1150    }
1151
1152    /// Convert raw bytes into [`ArrayRef`]
1153    ///
1154    /// # Safety
1155    ///
1156    /// `rows` must contain valid data for this [`RowConverter`]
1157    unsafe fn convert_raw(
1158        &self,
1159        rows: &mut [&[u8]],
1160        validate_utf8: bool,
1161    ) -> Result<Vec<ArrayRef>, ArrowError> {
1162        self.fields
1163            .iter()
1164            .zip(&self.codecs)
1165            .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1166            .collect()
1167    }
1168
1169    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
1170    pub fn parser(&self) -> RowParser {
1171        RowParser::new(Arc::clone(&self.fields))
1172    }
1173
1174    /// Returns the size of this instance in bytes
1175    ///
1176    /// Includes the size of `Self`.
1177    pub fn size(&self) -> usize {
1178        std::mem::size_of::<Self>()
1179            + self.fields.iter().map(|x| x.size()).sum::<usize>()
1180            + self.codecs.capacity() * std::mem::size_of::<Codec>()
1181            + self.codecs.iter().map(Codec::size).sum::<usize>()
1182    }
1183}
1184
1185/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
1186#[derive(Debug)]
1187pub struct RowParser {
1188    config: RowConfig,
1189}
1190
1191impl RowParser {
1192    fn new(fields: Arc<[SortField]>) -> Self {
1193        Self {
1194            config: RowConfig {
1195                fields,
1196                validate_utf8: true,
1197            },
1198        }
1199    }
1200
1201    /// Creates a [`Row`] from the provided `bytes`.
1202    ///
1203    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
1204    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
1205    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1206        Row {
1207            data: bytes,
1208            config: &self.config,
1209        }
1210    }
1211}
1212
1213/// The config of a given set of [`Row`]
1214#[derive(Debug, Clone)]
1215struct RowConfig {
1216    /// The schema for these rows
1217    fields: Arc<[SortField]>,
1218    /// Whether to run UTF-8 validation when converting to arrow arrays
1219    validate_utf8: bool,
1220}
1221
1222/// A row-oriented representation of arrow data, that is normalized for comparison.
1223///
1224/// See the [module level documentation](self) and [`RowConverter`] for more details.
1225#[derive(Debug)]
1226pub struct Rows {
1227    /// Underlying row bytes
1228    buffer: Vec<u8>,
1229    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
1230    offsets: Vec<usize>,
1231    /// The config for these rows
1232    config: RowConfig,
1233}
1234
1235/// The iterator type for [`Rows::lengths`]
1236pub type RowLengthIter<'a> = Map<Windows<'a, usize>, fn(&'a [usize]) -> usize>;
1237
1238impl Rows {
1239    /// Append a [`Row`] to this [`Rows`]
1240    pub fn push(&mut self, row: Row<'_>) {
1241        assert!(
1242            Arc::ptr_eq(&row.config.fields, &self.config.fields),
1243            "row was not produced by this RowConverter"
1244        );
1245        self.config.validate_utf8 |= row.config.validate_utf8;
1246        self.buffer.extend_from_slice(row.data);
1247        self.offsets.push(self.buffer.len())
1248    }
1249
1250    /// Reserve capacity for `row_capacity` rows with a total length of `data_capacity`
1251    pub fn reserve(&mut self, row_capacity: usize, data_capacity: usize) {
1252        self.buffer.reserve(data_capacity);
1253        self.offsets.reserve(row_capacity);
1254    }
1255
1256    /// Returns the row at index `row`
1257    pub fn row(&self, row: usize) -> Row<'_> {
1258        self.checked_row_end(row);
1259        unsafe { self.row_unchecked(row) }
1260    }
1261
1262    fn checked_row_end(&self, row: usize) -> usize {
1263        row.checked_add(1)
1264            .filter(|end| *end < self.offsets.len())
1265            .expect("row index out of bounds")
1266    }
1267
1268    /// Returns the row at `index` without bounds checking
1269    ///
1270    /// # Safety
1271    /// Caller must ensure that `index + 1` is less than the number of offsets (#rows + 1)
1272    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1273        let end = unsafe { self.offsets.get_unchecked(index + 1) };
1274        let start = unsafe { self.offsets.get_unchecked(index) };
1275        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1276        Row {
1277            data,
1278            config: &self.config,
1279        }
1280    }
1281
1282    /// Returns the number of bytes the row at index `row` is occupying,
1283    /// that is, what is the length of the returned [`Row::data`] will be.
1284    pub fn row_len(&self, row: usize) -> usize {
1285        let end = self.checked_row_end(row);
1286
1287        self.offsets[end] - self.offsets[row]
1288    }
1289
1290    /// Get an iterator over the lengths of each row in this [`Rows`]
1291    pub fn lengths(&self) -> RowLengthIter<'_> {
1292        self.offsets.windows(2).map(|w| w[1] - w[0])
1293    }
1294
1295    /// Sets the length of this [`Rows`] to 0
1296    pub fn clear(&mut self) {
1297        self.offsets.truncate(1);
1298        self.buffer.clear();
1299    }
1300
1301    /// Returns the number of [`Row`] in this [`Rows`]
1302    pub fn num_rows(&self) -> usize {
1303        self.offsets.len() - 1
1304    }
1305
1306    /// Returns an iterator over the [`Row`] in this [`Rows`]
1307    pub fn iter(&self) -> RowsIter<'_> {
1308        self.into_iter()
1309    }
1310
1311    /// Returns the size of this instance in bytes
1312    ///
1313    /// Includes the size of `Self`.
1314    pub fn size(&self) -> usize {
1315        // Size of fields is accounted for as part of RowConverter
1316        std::mem::size_of::<Self>()
1317            + self.buffer.capacity()
1318            + self.offsets.capacity() * std::mem::size_of::<usize>()
1319    }
1320
1321    /// Create a [BinaryArray] from the [Rows] data without reallocating the
1322    /// underlying bytes.
1323    ///
1324    ///
1325    /// ```
1326    /// # use std::sync::Arc;
1327    /// # use std::collections::HashSet;
1328    /// # use arrow_array::cast::AsArray;
1329    /// # use arrow_array::StringArray;
1330    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
1331    /// # use arrow_schema::DataType;
1332    /// #
1333    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1334    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1335    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1336    ///
1337    /// // We can convert rows into binary format and back.
1338    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1339    /// let binary = rows.try_into_binary().expect("known-small array");
1340    /// let parser = converter.parser();
1341    /// let parsed: Vec<OwnedRow> =
1342    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
1343    /// assert_eq!(values, parsed);
1344    /// ```
1345    ///
1346    /// # Errors
1347    ///
1348    /// This function will return an error if there is more data than can be stored in
1349    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
1350    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1351        if self.buffer.len() > i32::MAX as usize {
1352            return Err(ArrowError::InvalidArgumentError(format!(
1353                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1354                self.buffer.len()
1355            )));
1356        }
1357        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
1358        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1359        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
1360        let array = unsafe {
1361            BinaryArray::new_unchecked(
1362                OffsetBuffer::new_unchecked(offsets_scalar),
1363                Buffer::from_vec(self.buffer),
1364                None,
1365            )
1366        };
1367        Ok(array)
1368    }
1369}
1370
1371impl<'a> IntoIterator for &'a Rows {
1372    type Item = Row<'a>;
1373    type IntoIter = RowsIter<'a>;
1374
1375    fn into_iter(self) -> Self::IntoIter {
1376        RowsIter {
1377            rows: self,
1378            start: 0,
1379            end: self.num_rows(),
1380        }
1381    }
1382}
1383
1384/// An iterator over [`Rows`]
1385#[derive(Debug)]
1386pub struct RowsIter<'a> {
1387    rows: &'a Rows,
1388    start: usize,
1389    end: usize,
1390}
1391
1392impl<'a> Iterator for RowsIter<'a> {
1393    type Item = Row<'a>;
1394
1395    fn next(&mut self) -> Option<Self::Item> {
1396        if self.end == self.start {
1397            return None;
1398        }
1399
1400        // SAFETY: We have checked that `start` is less than `end`
1401        let row = unsafe { self.rows.row_unchecked(self.start) };
1402        self.start += 1;
1403        Some(row)
1404    }
1405
1406    fn size_hint(&self) -> (usize, Option<usize>) {
1407        let len = self.len();
1408        (len, Some(len))
1409    }
1410}
1411
1412impl ExactSizeIterator for RowsIter<'_> {
1413    fn len(&self) -> usize {
1414        self.end - self.start
1415    }
1416}
1417
1418impl DoubleEndedIterator for RowsIter<'_> {
1419    fn next_back(&mut self) -> Option<Self::Item> {
1420        if self.end == self.start {
1421            return None;
1422        }
1423
1424        self.end -= 1;
1425
1426        // Safety: By construction we create `end >= start`, so if `end` is not equal to `start` it cannot be less than `start`
1427        //          therefore `end - 1` is within range
1428        let row = unsafe { self.rows.row_unchecked(self.end) };
1429        Some(row)
1430    }
1431}
1432
1433/// A comparable representation of a row.
1434///
1435/// See the [module level documentation](self) for more details.
1436///
1437/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1438/// returned by calls to [`RowConverter::convert_columns`] on the same
1439/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1440/// ordering established by comparing the [`Row`] is arbitrary.
1441#[derive(Debug, Copy, Clone)]
1442pub struct Row<'a> {
1443    data: &'a [u8],
1444    config: &'a RowConfig,
1445}
1446
1447impl<'a> Row<'a> {
1448    /// Create owned version of the row to detach it from the shared [`Rows`].
1449    pub fn owned(&self) -> OwnedRow {
1450        OwnedRow {
1451            data: self.data.into(),
1452            config: self.config.clone(),
1453        }
1454    }
1455
1456    /// The row's bytes, with the lifetime of the underlying data.
1457    pub fn data(&self) -> &'a [u8] {
1458        self.data
1459    }
1460}
1461
1462// Manually derive these as don't wish to include `fields`
1463
1464impl PartialEq for Row<'_> {
1465    #[inline]
1466    fn eq(&self, other: &Self) -> bool {
1467        self.data.eq(other.data)
1468    }
1469}
1470
1471impl Eq for Row<'_> {}
1472
1473impl PartialOrd for Row<'_> {
1474    #[inline]
1475    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1476        Some(self.cmp(other))
1477    }
1478}
1479
1480impl Ord for Row<'_> {
1481    #[inline]
1482    fn cmp(&self, other: &Self) -> Ordering {
1483        self.data.cmp(other.data)
1484    }
1485}
1486
1487impl Hash for Row<'_> {
1488    #[inline]
1489    fn hash<H: Hasher>(&self, state: &mut H) {
1490        self.data.hash(state)
1491    }
1492}
1493
1494impl AsRef<[u8]> for Row<'_> {
1495    #[inline]
1496    fn as_ref(&self) -> &[u8] {
1497        self.data
1498    }
1499}
1500
1501/// Owned version of a [`Row`] that can be moved/cloned freely.
1502///
1503/// This contains the data for the one specific row (not the entire buffer of all rows).
1504#[derive(Debug, Clone)]
1505pub struct OwnedRow {
1506    data: Box<[u8]>,
1507    config: RowConfig,
1508}
1509
1510impl OwnedRow {
1511    /// Get borrowed [`Row`] from owned version.
1512    ///
1513    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1514    pub fn row(&self) -> Row<'_> {
1515        Row {
1516            data: &self.data,
1517            config: &self.config,
1518        }
1519    }
1520}
1521
1522// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1523
1524impl PartialEq for OwnedRow {
1525    #[inline]
1526    fn eq(&self, other: &Self) -> bool {
1527        self.row().eq(&other.row())
1528    }
1529}
1530
1531impl Eq for OwnedRow {}
1532
1533impl PartialOrd for OwnedRow {
1534    #[inline]
1535    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1536        Some(self.cmp(other))
1537    }
1538}
1539
1540impl Ord for OwnedRow {
1541    #[inline]
1542    fn cmp(&self, other: &Self) -> Ordering {
1543        self.row().cmp(&other.row())
1544    }
1545}
1546
1547impl Hash for OwnedRow {
1548    #[inline]
1549    fn hash<H: Hasher>(&self, state: &mut H) {
1550        self.row().hash(state)
1551    }
1552}
1553
1554impl AsRef<[u8]> for OwnedRow {
1555    #[inline]
1556    fn as_ref(&self) -> &[u8] {
1557        &self.data
1558    }
1559}
1560
1561/// Returns the null sentinel, negated if `invert` is true
1562#[inline]
1563fn null_sentinel(options: SortOptions) -> u8 {
1564    match options.nulls_first {
1565        true => 0,
1566        false => 0xFF,
1567    }
1568}
1569
1570/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types.
1571enum LengthTracker {
1572    /// Fixed state: All rows have length `length`
1573    Fixed { length: usize, num_rows: usize },
1574    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
1575    Variable {
1576        fixed_length: usize,
1577        lengths: Vec<usize>,
1578    },
1579}
1580
1581impl LengthTracker {
1582    fn new(num_rows: usize) -> Self {
1583        Self::Fixed {
1584            length: 0,
1585            num_rows,
1586        }
1587    }
1588
1589    /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker
1590    fn push_fixed(&mut self, new_length: usize) {
1591        match self {
1592            LengthTracker::Fixed { length, .. } => *length += new_length,
1593            LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1594        }
1595    }
1596
1597    /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)`
1598    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1599        match self {
1600            LengthTracker::Fixed { length, .. } => {
1601                *self = LengthTracker::Variable {
1602                    fixed_length: *length,
1603                    lengths: new_lengths.collect(),
1604                }
1605            }
1606            LengthTracker::Variable { lengths, .. } => {
1607                assert_eq!(lengths.len(), new_lengths.len());
1608                lengths
1609                    .iter_mut()
1610                    .zip(new_lengths)
1611                    .for_each(|(length, new_length)| *length += new_length);
1612            }
1613        }
1614    }
1615
1616    /// Returns the tracked row lengths as a slice
1617    fn materialized(&mut self) -> &mut [usize] {
1618        if let LengthTracker::Fixed { length, num_rows } = *self {
1619            *self = LengthTracker::Variable {
1620                fixed_length: length,
1621                lengths: vec![0; num_rows],
1622            };
1623        }
1624
1625        match self {
1626            LengthTracker::Variable { lengths, .. } => lengths,
1627            LengthTracker::Fixed { .. } => unreachable!(),
1628        }
1629    }
1630
1631    /// Initializes the offsets using the tracked lengths. Returns the sum of the
1632    /// lengths of the rows added.
1633    ///
1634    /// We initialize the offsets shifted down by one row index.
1635    ///
1636    /// As the rows are appended to the offsets will be incremented to match
1637    ///
1638    /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
1639    /// The offsets would be initialized to `0, 0, 3, 7`
1640    ///
1641    /// Writing the first row entirely would yield `0, 3, 3, 7`
1642    /// The second, `0, 3, 7, 7`
1643    /// The third, `0, 3, 7, 13`
1644    //
1645    /// This would be the final offsets for reading
1646    //
1647    /// In this way offsets tracks the position during writing whilst eventually serving
1648    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1649        match self {
1650            LengthTracker::Fixed { length, num_rows } => {
1651                offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1652
1653                initial_offset + num_rows * length
1654            }
1655            LengthTracker::Variable {
1656                fixed_length,
1657                lengths,
1658            } => {
1659                let mut acc = initial_offset;
1660
1661                offsets.extend(lengths.iter().map(|length| {
1662                    let current = acc;
1663                    acc += length + fixed_length;
1664                    current
1665                }));
1666
1667                acc
1668            }
1669        }
1670    }
1671}
1672
1673/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1674fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1675    use fixed::FixedLengthEncoding;
1676
1677    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1678    let mut tracker = LengthTracker::new(num_rows);
1679
1680    for (array, encoder) in cols.iter().zip(encoders) {
1681        match encoder {
1682            Encoder::Stateless => {
1683                downcast_primitive_array! {
1684                    array => tracker.push_fixed(fixed::encoded_len(array)),
1685                    DataType::Null => tracker.push_fixed(2)
1686                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1687                    DataType::Binary => push_generic_byte_array_lengths(&mut tracker, as_generic_binary_array::<i32>(array)),
1688                    DataType::LargeBinary => push_generic_byte_array_lengths(&mut tracker, as_generic_binary_array::<i64>(array)),
1689                    DataType::BinaryView => push_byte_view_array_lengths(&mut tracker, array.as_binary_view()),
1690                    DataType::Utf8 => push_generic_byte_array_lengths(&mut tracker, array.as_string::<i32>()),
1691                    DataType::LargeUtf8 => push_generic_byte_array_lengths(&mut tracker, array.as_string::<i64>()),
1692                    DataType::Utf8View => push_byte_view_array_lengths(&mut tracker, array.as_string_view()),
1693                    DataType::FixedSizeBinary(len) => {
1694                        let len = len.to_usize().unwrap();
1695                        tracker.push_fixed(1 + len)
1696                    }
1697                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1698                }
1699            }
1700            Encoder::Dictionary(values, null) => {
1701                downcast_dictionary_array! {
1702                    array => {
1703                        tracker.push_variable(
1704                            array.keys().iter().map(|v| match v {
1705                                Some(k) => values.row_len(k.as_usize()),
1706                                None => null.data.len(),
1707                            })
1708                        )
1709                    }
1710                    _ => unreachable!(),
1711                }
1712            }
1713            Encoder::Struct(rows, null) => {
1714                let array = as_struct_array(array);
1715                if rows.num_rows() > 0 {
1716                    // Only calculate row length if there are rows
1717                    tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1718                        true => 1 + rows.row_len(idx),
1719                        false => 1 + null.data.len(),
1720                    }));
1721                } else {
1722                    // Edge case for Struct([]) arrays (no child fields)
1723                    tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1724                        true => 1,
1725                        false => 1 + null.data.len(),
1726                    }));
1727                }
1728            }
1729            Encoder::List(rows) => match array.data_type() {
1730                DataType::List(_) => {
1731                    list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1732                }
1733                DataType::LargeList(_) => {
1734                    list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1735                }
1736                DataType::ListView(_) => {
1737                    let list_view = array.as_list_view::<i32>();
1738                    let (min_offset, _) = compute_list_view_bounds(list_view);
1739                    list::compute_lengths_list_view(
1740                        tracker.materialized(),
1741                        rows,
1742                        list_view,
1743                        min_offset,
1744                    )
1745                }
1746                DataType::LargeListView(_) => {
1747                    let list_view = array.as_list_view::<i64>();
1748                    let (min_offset, _) = compute_list_view_bounds(list_view);
1749                    list::compute_lengths_list_view(
1750                        tracker.materialized(),
1751                        rows,
1752                        list_view,
1753                        min_offset,
1754                    )
1755                }
1756                DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1757                    &mut tracker,
1758                    rows,
1759                    as_fixed_size_list_array(array),
1760                ),
1761                _ => unreachable!(),
1762            },
1763            Encoder::RunEndEncoded(rows) => match array.data_type() {
1764                DataType::RunEndEncoded(r, _) => match r.data_type() {
1765                    DataType::Int16 => run::compute_lengths(
1766                        tracker.materialized(),
1767                        rows,
1768                        array.as_run::<Int16Type>(),
1769                    ),
1770                    DataType::Int32 => run::compute_lengths(
1771                        tracker.materialized(),
1772                        rows,
1773                        array.as_run::<Int32Type>(),
1774                    ),
1775                    DataType::Int64 => run::compute_lengths(
1776                        tracker.materialized(),
1777                        rows,
1778                        array.as_run::<Int64Type>(),
1779                    ),
1780                    _ => unreachable!("Unsupported run end index type: {r:?}"),
1781                },
1782                _ => unreachable!(),
1783            },
1784            Encoder::Union {
1785                child_rows,
1786                field_to_type_ids,
1787                type_ids,
1788                offsets,
1789            } => {
1790                let union_array = array
1791                    .as_any()
1792                    .downcast_ref::<UnionArray>()
1793                    .expect("expected UnionArray");
1794
1795                let mut type_id_to_field_idx = [0usize; 128];
1796                for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
1797                    type_id_to_field_idx[type_id as usize] = field_idx;
1798                }
1799
1800                let lengths = (0..union_array.len()).map(|i| {
1801                    let type_id = type_ids[i];
1802                    let field_idx = type_id_to_field_idx[type_id as usize];
1803                    let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1804                    let child_row_len = child_rows[field_idx].row_len(child_row_i);
1805
1806                    // length: 1 byte type_id + child row bytes
1807                    1 + child_row_len
1808                });
1809
1810                tracker.push_variable(lengths);
1811            }
1812        }
1813    }
1814
1815    tracker
1816}
1817
1818/// Add to [`LengthTracker`] the encoded length of each item in the [`GenericByteArray`]
1819fn push_generic_byte_array_lengths<T: ByteArrayType>(
1820    tracker: &mut LengthTracker,
1821    array: &GenericByteArray<T>,
1822) {
1823    if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
1824        tracker.push_variable(
1825            array
1826                .offsets()
1827                .lengths()
1828                .zip(nulls.iter())
1829                .map(|(length, is_valid)| if is_valid { Some(length) } else { None })
1830                .map(variable::padded_length),
1831        )
1832    } else {
1833        tracker.push_variable(
1834            array
1835                .offsets()
1836                .lengths()
1837                .map(variable::non_null_padded_length),
1838        )
1839    }
1840}
1841
1842/// Add to [`LengthTracker`] the encoded length of each item in the [`GenericByteViewArray`]
1843fn push_byte_view_array_lengths<T: ByteViewType>(
1844    tracker: &mut LengthTracker,
1845    array: &GenericByteViewArray<T>,
1846) {
1847    if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
1848        tracker.push_variable(
1849            array
1850                .lengths()
1851                .zip(nulls.iter())
1852                .map(|(length, is_valid)| {
1853                    if is_valid {
1854                        Some(length as usize)
1855                    } else {
1856                        None
1857                    }
1858                })
1859                .map(variable::padded_length),
1860        )
1861    } else {
1862        tracker.push_variable(
1863            array
1864                .lengths()
1865                .map(|len| variable::padded_length(Some(len as usize))),
1866        )
1867    }
1868}
1869
1870/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1871fn encode_column(
1872    data: &mut [u8],
1873    offsets: &mut [usize],
1874    column: &dyn Array,
1875    opts: SortOptions,
1876    encoder: &Encoder<'_>,
1877) {
1878    match encoder {
1879        Encoder::Stateless => {
1880            downcast_primitive_array! {
1881                column => {
1882                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1883                        fixed::encode(data, offsets, column.values(), nulls, opts)
1884                    } else {
1885                        fixed::encode_not_null(data, offsets, column.values(), opts)
1886                    }
1887                }
1888                DataType::Null => {
1889                    for offset in offsets.iter_mut().skip(1) {
1890                        variable::encode_null_value(&mut data[*offset..], opts);
1891                        *offset += 2;
1892                    }
1893                }
1894                DataType::Boolean => {
1895                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1896                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1897                    } else {
1898                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1899                    }
1900                }
1901                DataType::Binary => {
1902                    variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
1903                }
1904                DataType::BinaryView => {
1905                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1906                }
1907                DataType::LargeBinary => {
1908                    variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
1909                }
1910                DataType::Utf8 => variable::encode_generic_byte_array(
1911                    data, offsets,
1912                    column.as_string::<i32>(),
1913                    opts,
1914                ),
1915                DataType::LargeUtf8 => variable::encode_generic_byte_array(
1916                    data, offsets,
1917                    column.as_string::<i64>(),
1918                    opts,
1919                ),
1920                DataType::Utf8View => variable::encode(
1921                    data, offsets,
1922                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1923                    opts,
1924                ),
1925                DataType::FixedSizeBinary(_) => {
1926                    let array = column.as_any().downcast_ref().unwrap();
1927                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1928                }
1929                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1930            }
1931        }
1932        Encoder::Dictionary(values, nulls) => {
1933            downcast_dictionary_array! {
1934                column => encode_dictionary_values(data, offsets, column, values, nulls),
1935                _ => unreachable!()
1936            }
1937        }
1938        Encoder::Struct(rows, null) => {
1939            fn struct_encode_helper<const NO_CHILD_FIELDS: bool>(
1940                array: &StructArray,
1941                offsets: &mut [usize],
1942                null_sentinel: u8,
1943                rows: &Rows,
1944                null: &Row<'_>,
1945                data: &mut [u8],
1946            ) {
1947                let empty_row = Row {
1948                    data: &[],
1949                    config: &rows.config,
1950                };
1951
1952                offsets
1953                    .iter_mut()
1954                    .skip(1)
1955                    .enumerate()
1956                    .for_each(|(idx, offset)| {
1957                        let (row, sentinel) = match array.is_valid(idx) {
1958                            true => (
1959                                if NO_CHILD_FIELDS {
1960                                    empty_row
1961                                } else {
1962                                    rows.row(idx)
1963                                },
1964                                0x01,
1965                            ),
1966                            false => (*null, null_sentinel),
1967                        };
1968                        let end_offset = *offset + 1 + row.as_ref().len();
1969                        data[*offset] = sentinel;
1970                        data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1971                        *offset = end_offset;
1972                    })
1973            }
1974
1975            let array = as_struct_array(column);
1976            let null_sentinel = null_sentinel(opts);
1977            if rows.num_rows() == 0 {
1978                // Edge case for Struct([]) arrays (no child fields)
1979                struct_encode_helper::<true>(array, offsets, null_sentinel, rows, null, data);
1980            } else {
1981                struct_encode_helper::<false>(array, offsets, null_sentinel, rows, null, data);
1982            }
1983        }
1984        Encoder::List(rows) => match column.data_type() {
1985            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1986            DataType::LargeList(_) => {
1987                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1988            }
1989            DataType::ListView(_) => {
1990                let list_view = column.as_list_view::<i32>();
1991                let (min_offset, _) = compute_list_view_bounds(list_view);
1992                list::encode_list_view(data, offsets, rows, opts, list_view, min_offset)
1993            }
1994            DataType::LargeListView(_) => {
1995                let list_view = column.as_list_view::<i64>();
1996                let (min_offset, _) = compute_list_view_bounds(list_view);
1997                list::encode_list_view(data, offsets, rows, opts, list_view, min_offset)
1998            }
1999            DataType::FixedSizeList(_, _) => {
2000                encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
2001            }
2002            _ => unreachable!(),
2003        },
2004        Encoder::RunEndEncoded(rows) => match column.data_type() {
2005            DataType::RunEndEncoded(r, _) => match r.data_type() {
2006                DataType::Int16 => {
2007                    run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
2008                }
2009                DataType::Int32 => {
2010                    run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
2011                }
2012                DataType::Int64 => {
2013                    run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
2014                }
2015                _ => unreachable!("Unsupported run end index type: {r:?}"),
2016            },
2017            _ => unreachable!(),
2018        },
2019        Encoder::Union {
2020            child_rows,
2021            field_to_type_ids,
2022            type_ids,
2023            offsets: offsets_buf,
2024        } => {
2025            let mut type_id_to_field_idx = [0usize; 128];
2026            for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
2027                type_id_to_field_idx[type_id as usize] = field_idx;
2028            }
2029
2030            offsets
2031                .iter_mut()
2032                .skip(1)
2033                .enumerate()
2034                .for_each(|(i, offset)| {
2035                    let type_id = type_ids[i];
2036                    let field_idx = type_id_to_field_idx[type_id as usize];
2037
2038                    let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
2039                    let child_row = child_rows[field_idx].row(child_row_idx);
2040                    let child_bytes = child_row.as_ref();
2041
2042                    let type_id_byte = if opts.descending {
2043                        !(type_id as u8)
2044                    } else {
2045                        type_id as u8
2046                    };
2047                    data[*offset] = type_id_byte;
2048
2049                    let child_start = *offset + 1;
2050                    let child_end = child_start + child_bytes.len();
2051                    data[child_start..child_end].copy_from_slice(child_bytes);
2052
2053                    *offset = child_end;
2054                });
2055        }
2056    }
2057}
2058
2059/// Encode dictionary values not preserving the dictionary encoding
2060pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
2061    data: &mut [u8],
2062    offsets: &mut [usize],
2063    column: &DictionaryArray<K>,
2064    values: &Rows,
2065    null: &Row<'_>,
2066) {
2067    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
2068        let row = match k {
2069            Some(k) => values.row(k.as_usize()).data,
2070            None => null.data,
2071        };
2072        let end_offset = *offset + row.len();
2073        data[*offset..end_offset].copy_from_slice(row);
2074        *offset = end_offset;
2075    }
2076}
2077
2078macro_rules! decode_primitive_helper {
2079    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
2080        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
2081    };
2082}
2083
2084/// Decodes a the provided `field` from `rows`
2085///
2086/// # Safety
2087///
2088/// Rows must contain valid data for the provided field
2089unsafe fn decode_column(
2090    field: &SortField,
2091    rows: &mut [&[u8]],
2092    codec: &Codec,
2093    validate_utf8: bool,
2094) -> Result<ArrayRef, ArrowError> {
2095    let options = field.options;
2096
2097    let array: ArrayRef = match codec {
2098        Codec::Stateless => {
2099            let data_type = field.data_type.clone();
2100            downcast_primitive! {
2101                data_type => (decode_primitive_helper, rows, data_type, options),
2102                DataType::Null => {
2103                    variable::decode_null_value(rows, options);
2104                    Arc::new(NullArray::new(rows.len()))
2105                }
2106                DataType::Boolean => Arc::new(decode_bool(rows, options)),
2107                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
2108                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
2109                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
2110                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
2111                DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
2112                DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
2113                DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
2114                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
2115            }
2116        }
2117        Codec::Dictionary(converter, _) => {
2118            let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
2119            cols.into_iter().next().unwrap()
2120        }
2121        Codec::Struct(converter, _) => {
2122            let (null_count, nulls) = fixed::decode_nulls(rows);
2123            rows.iter_mut().for_each(|row| *row = &row[1..]);
2124            let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
2125
2126            let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
2127            // Since RowConverter flattens certain data types (i.e. Dictionary),
2128            // we need to use updated data type instead of original field
2129            let corrected_fields: Vec<Field> = match &field.data_type {
2130                DataType::Struct(struct_fields) => struct_fields
2131                    .iter()
2132                    .zip(child_data.iter())
2133                    .map(|(orig_field, child_array)| {
2134                        orig_field
2135                            .as_ref()
2136                            .clone()
2137                            .with_data_type(child_array.data_type().clone())
2138                    })
2139                    .collect(),
2140                _ => unreachable!("Only Struct types should be corrected here"),
2141            };
2142            let corrected_struct_type = DataType::Struct(corrected_fields.into());
2143            let builder = ArrayDataBuilder::new(corrected_struct_type)
2144                .len(rows.len())
2145                .null_count(null_count)
2146                .null_bit_buffer(Some(nulls))
2147                .child_data(child_data);
2148
2149            Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
2150        }
2151        Codec::List(converter) => match &field.data_type {
2152            DataType::List(_) => {
2153                Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
2154            }
2155            DataType::LargeList(_) => {
2156                Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
2157            }
2158            DataType::ListView(_) => Arc::new(unsafe {
2159                list::decode_list_view::<i32>(converter, rows, field, validate_utf8)
2160            }?),
2161            DataType::LargeListView(_) => Arc::new(unsafe {
2162                list::decode_list_view::<i64>(converter, rows, field, validate_utf8)
2163            }?),
2164            DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
2165                list::decode_fixed_size_list(
2166                    converter,
2167                    rows,
2168                    field,
2169                    validate_utf8,
2170                    value_length.as_usize(),
2171                )
2172            }?),
2173            _ => unreachable!(),
2174        },
2175        Codec::RunEndEncoded(converter) => match &field.data_type {
2176            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
2177                DataType::Int16 => Arc::new(unsafe {
2178                    run::decode::<Int16Type>(converter, rows, field, validate_utf8)
2179                }?),
2180                DataType::Int32 => Arc::new(unsafe {
2181                    run::decode::<Int32Type>(converter, rows, field, validate_utf8)
2182                }?),
2183                DataType::Int64 => Arc::new(unsafe {
2184                    run::decode::<Int64Type>(converter, rows, field, validate_utf8)
2185                }?),
2186                _ => unreachable!(),
2187            },
2188            _ => unreachable!(),
2189        },
2190        Codec::Union(converters, field_to_type_ids, null_rows) => {
2191            let len = rows.len();
2192
2193            let DataType::Union(union_fields, mode) = &field.data_type else {
2194                unreachable!()
2195            };
2196
2197            let mut type_id_to_field_idx = [0usize; 128];
2198            for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
2199                type_id_to_field_idx[type_id as usize] = field_idx;
2200            }
2201
2202            let mut type_ids = Vec::with_capacity(len);
2203            let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
2204
2205            for (idx, row) in rows.iter_mut().enumerate() {
2206                let type_id_byte = {
2207                    let id = row[0];
2208                    if options.descending { !id } else { id }
2209                };
2210
2211                let type_id = type_id_byte as i8;
2212                type_ids.push(type_id);
2213
2214                let field_idx = type_id_to_field_idx[type_id as usize];
2215
2216                let child_row = &row[1..];
2217                rows_by_field[field_idx].push((idx, child_row));
2218            }
2219
2220            let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
2221            let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
2222
2223            for (field_idx, converter) in converters.iter().enumerate() {
2224                let field_rows = &rows_by_field[field_idx];
2225
2226                match &mode {
2227                    UnionMode::Dense => {
2228                        if field_rows.is_empty() {
2229                            let (_, field) = union_fields.iter().nth(field_idx).unwrap();
2230                            child_arrays.push(arrow_array::new_empty_array(field.data_type()));
2231                            continue;
2232                        }
2233
2234                        let mut child_data = field_rows
2235                            .iter()
2236                            .map(|(_, bytes)| *bytes)
2237                            .collect::<Vec<_>>();
2238
2239                        let child_array =
2240                            unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
2241
2242                        // advance row slices by the bytes consumed
2243                        for ((row_idx, original_bytes), remaining_bytes) in
2244                            field_rows.iter().zip(child_data)
2245                        {
2246                            let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
2247                            rows[*row_idx] = &rows[*row_idx][consumed_length..];
2248                        }
2249
2250                        child_arrays.push(child_array.into_iter().next().unwrap());
2251                    }
2252                    UnionMode::Sparse => {
2253                        let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
2254                        let mut field_row_iter = field_rows.iter().peekable();
2255                        let null_row_bytes: &[u8] = &null_rows[field_idx].data;
2256
2257                        for idx in 0..len {
2258                            if let Some((next_idx, bytes)) = field_row_iter.peek() {
2259                                if *next_idx == idx {
2260                                    sparse_data.push(*bytes);
2261
2262                                    field_row_iter.next();
2263                                    continue;
2264                                }
2265                            }
2266                            sparse_data.push(null_row_bytes);
2267                        }
2268
2269                        let child_array =
2270                            unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
2271
2272                        // advance row slices by the bytes consumed for rows that belong to this field
2273                        for (row_idx, child_row) in field_rows.iter() {
2274                            let remaining_len = sparse_data[*row_idx].len();
2275                            let consumed_length = 1 + child_row.len() - remaining_len;
2276                            rows[*row_idx] = &rows[*row_idx][consumed_length..];
2277                        }
2278
2279                        child_arrays.push(child_array.into_iter().next().unwrap());
2280                    }
2281                }
2282            }
2283
2284            // build offsets for dense unions
2285            if let Some(ref mut offsets_vec) = offsets {
2286                let mut count = vec![0i32; converters.len()];
2287                for type_id in &type_ids {
2288                    let field_idx = *type_id as usize;
2289                    offsets_vec.push(count[field_idx]);
2290
2291                    count[field_idx] += 1;
2292                }
2293            }
2294
2295            let type_ids_buffer = ScalarBuffer::from(type_ids);
2296            let offsets_buffer = offsets.map(ScalarBuffer::from);
2297
2298            let union_array = UnionArray::try_new(
2299                union_fields.clone(),
2300                type_ids_buffer,
2301                offsets_buffer,
2302                child_arrays,
2303            )?;
2304
2305            // note: union arrays don't support physical null buffers
2306            // nulls are represented logically though child arrays
2307            Arc::new(union_array)
2308        }
2309    };
2310    Ok(array)
2311}
2312
2313#[cfg(test)]
2314mod tests {
2315    use arrow_array::builder::*;
2316    use arrow_array::types::*;
2317    use arrow_array::*;
2318    use arrow_buffer::{Buffer, OffsetBuffer};
2319    use arrow_buffer::{NullBuffer, i256};
2320    use arrow_cast::display::{ArrayFormatter, FormatOptions};
2321    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2322    use rand::distr::uniform::SampleUniform;
2323    use rand::distr::{Distribution, StandardUniform};
2324    use rand::prelude::StdRng;
2325    use rand::{Rng, RngCore, SeedableRng};
2326
2327    use super::*;
2328
2329    #[test]
2330    fn test_fixed_width() {
2331        let cols = [
2332            Arc::new(Int16Array::from_iter([
2333                Some(1),
2334                Some(2),
2335                None,
2336                Some(-5),
2337                Some(2),
2338                Some(2),
2339                Some(0),
2340            ])) as ArrayRef,
2341            Arc::new(Float32Array::from_iter([
2342                Some(1.3),
2343                Some(2.5),
2344                None,
2345                Some(4.),
2346                Some(0.1),
2347                Some(-4.),
2348                Some(-0.),
2349            ])) as ArrayRef,
2350        ];
2351
2352        let converter = RowConverter::new(vec![
2353            SortField::new(DataType::Int16),
2354            SortField::new(DataType::Float32),
2355        ])
2356        .unwrap();
2357        let rows = converter.convert_columns(&cols).unwrap();
2358
2359        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2360        assert_eq!(
2361            rows.buffer,
2362            &[
2363                1, 128, 1, //
2364                1, 191, 166, 102, 102, //
2365                1, 128, 2, //
2366                1, 192, 32, 0, 0, //
2367                0, 0, 0, //
2368                0, 0, 0, 0, 0, //
2369                1, 127, 251, //
2370                1, 192, 128, 0, 0, //
2371                1, 128, 2, //
2372                1, 189, 204, 204, 205, //
2373                1, 128, 2, //
2374                1, 63, 127, 255, 255, //
2375                1, 128, 0, //
2376                1, 127, 255, 255, 255 //
2377            ]
2378        );
2379
2380        assert!(rows.row(3) < rows.row(6));
2381        assert!(rows.row(0) < rows.row(1));
2382        assert!(rows.row(3) < rows.row(0));
2383        assert!(rows.row(4) < rows.row(1));
2384        assert!(rows.row(5) < rows.row(4));
2385
2386        let back = converter.convert_rows(&rows).unwrap();
2387        for (expected, actual) in cols.iter().zip(&back) {
2388            assert_eq!(expected, actual);
2389        }
2390    }
2391
2392    #[test]
2393    fn test_decimal32() {
2394        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2395            DECIMAL32_MAX_PRECISION,
2396            7,
2397        ))])
2398        .unwrap();
2399        let col = Arc::new(
2400            Decimal32Array::from_iter([
2401                None,
2402                Some(i32::MIN),
2403                Some(-13),
2404                Some(46_i32),
2405                Some(5456_i32),
2406                Some(i32::MAX),
2407            ])
2408            .with_precision_and_scale(9, 7)
2409            .unwrap(),
2410        ) as ArrayRef;
2411
2412        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2413        for i in 0..rows.num_rows() - 1 {
2414            assert!(rows.row(i) < rows.row(i + 1));
2415        }
2416
2417        let back = converter.convert_rows(&rows).unwrap();
2418        assert_eq!(back.len(), 1);
2419        assert_eq!(col.as_ref(), back[0].as_ref())
2420    }
2421
2422    #[test]
2423    fn test_decimal64() {
2424        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2425            DECIMAL64_MAX_PRECISION,
2426            7,
2427        ))])
2428        .unwrap();
2429        let col = Arc::new(
2430            Decimal64Array::from_iter([
2431                None,
2432                Some(i64::MIN),
2433                Some(-13),
2434                Some(46_i64),
2435                Some(5456_i64),
2436                Some(i64::MAX),
2437            ])
2438            .with_precision_and_scale(18, 7)
2439            .unwrap(),
2440        ) as ArrayRef;
2441
2442        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2443        for i in 0..rows.num_rows() - 1 {
2444            assert!(rows.row(i) < rows.row(i + 1));
2445        }
2446
2447        let back = converter.convert_rows(&rows).unwrap();
2448        assert_eq!(back.len(), 1);
2449        assert_eq!(col.as_ref(), back[0].as_ref())
2450    }
2451
2452    #[test]
2453    fn test_decimal128() {
2454        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2455            DECIMAL128_MAX_PRECISION,
2456            7,
2457        ))])
2458        .unwrap();
2459        let col = Arc::new(
2460            Decimal128Array::from_iter([
2461                None,
2462                Some(i128::MIN),
2463                Some(-13),
2464                Some(46_i128),
2465                Some(5456_i128),
2466                Some(i128::MAX),
2467            ])
2468            .with_precision_and_scale(38, 7)
2469            .unwrap(),
2470        ) as ArrayRef;
2471
2472        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2473        for i in 0..rows.num_rows() - 1 {
2474            assert!(rows.row(i) < rows.row(i + 1));
2475        }
2476
2477        let back = converter.convert_rows(&rows).unwrap();
2478        assert_eq!(back.len(), 1);
2479        assert_eq!(col.as_ref(), back[0].as_ref())
2480    }
2481
2482    #[test]
2483    fn test_decimal256() {
2484        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2485            DECIMAL256_MAX_PRECISION,
2486            7,
2487        ))])
2488        .unwrap();
2489        let col = Arc::new(
2490            Decimal256Array::from_iter([
2491                None,
2492                Some(i256::MIN),
2493                Some(i256::from_parts(0, -1)),
2494                Some(i256::from_parts(u128::MAX, -1)),
2495                Some(i256::from_parts(u128::MAX, 0)),
2496                Some(i256::from_parts(0, 46_i128)),
2497                Some(i256::from_parts(5, 46_i128)),
2498                Some(i256::MAX),
2499            ])
2500            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2501            .unwrap(),
2502        ) as ArrayRef;
2503
2504        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2505        for i in 0..rows.num_rows() - 1 {
2506            assert!(rows.row(i) < rows.row(i + 1));
2507        }
2508
2509        let back = converter.convert_rows(&rows).unwrap();
2510        assert_eq!(back.len(), 1);
2511        assert_eq!(col.as_ref(), back[0].as_ref())
2512    }
2513
2514    #[test]
2515    fn test_bool() {
2516        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2517
2518        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2519
2520        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2521        assert!(rows.row(2) > rows.row(1));
2522        assert!(rows.row(2) > rows.row(0));
2523        assert!(rows.row(1) > rows.row(0));
2524
2525        let cols = converter.convert_rows(&rows).unwrap();
2526        assert_eq!(&cols[0], &col);
2527
2528        let converter = RowConverter::new(vec![SortField::new_with_options(
2529            DataType::Boolean,
2530            SortOptions::default().desc().with_nulls_first(false),
2531        )])
2532        .unwrap();
2533
2534        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2535        assert!(rows.row(2) < rows.row(1));
2536        assert!(rows.row(2) < rows.row(0));
2537        assert!(rows.row(1) < rows.row(0));
2538        let cols = converter.convert_rows(&rows).unwrap();
2539        assert_eq!(&cols[0], &col);
2540    }
2541
2542    #[test]
2543    fn test_timezone() {
2544        let a =
2545            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2546        let d = a.data_type().clone();
2547
2548        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2549        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2550        let back = converter.convert_rows(&rows).unwrap();
2551        assert_eq!(back.len(), 1);
2552        assert_eq!(back[0].data_type(), &d);
2553
2554        // Test dictionary
2555        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2556        a.append(34).unwrap();
2557        a.append_null();
2558        a.append(345).unwrap();
2559
2560        // Construct dictionary with a timezone
2561        let dict = a.finish();
2562        let values = TimestampNanosecondArray::from(dict.values().to_data());
2563        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2564        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2565        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2566
2567        assert_eq!(dict_with_tz.data_type(), &d);
2568        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2569        let rows = converter
2570            .convert_columns(&[Arc::new(dict_with_tz) as _])
2571            .unwrap();
2572        let back = converter.convert_rows(&rows).unwrap();
2573        assert_eq!(back.len(), 1);
2574        assert_eq!(back[0].data_type(), &v);
2575    }
2576
2577    #[test]
2578    fn test_null_encoding() {
2579        let col = Arc::new(NullArray::new(10));
2580        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2581        let rows = converter.convert_columns(&[col]).unwrap();
2582        assert_eq!(rows.num_rows(), 10);
2583        // NULL element encoded as 2 bytes data
2584        assert_eq!(rows.row(1).data.len(), 2);
2585    }
2586
2587    #[test]
2588    fn test_variable_width() {
2589        let col = Arc::new(StringArray::from_iter([
2590            Some("hello"),
2591            Some("he"),
2592            None,
2593            Some("foo"),
2594            Some(""),
2595        ])) as ArrayRef;
2596
2597        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2598        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2599
2600        assert!(rows.row(1) < rows.row(0));
2601        assert!(rows.row(2) < rows.row(4));
2602        assert!(rows.row(3) < rows.row(0));
2603        assert!(rows.row(3) < rows.row(1));
2604
2605        let cols = converter.convert_rows(&rows).unwrap();
2606        assert_eq!(&cols[0], &col);
2607
2608        let col = Arc::new(BinaryArray::from_iter([
2609            None,
2610            Some(vec![0_u8; 0]),
2611            Some(vec![0_u8; 6]),
2612            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2613            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2614            Some(vec![0_u8; variable::BLOCK_SIZE]),
2615            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2616            Some(vec![1_u8; 6]),
2617            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2618            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2619            Some(vec![1_u8; variable::BLOCK_SIZE]),
2620            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2621            Some(vec![0xFF_u8; 6]),
2622            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2623            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2624            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2625            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2626        ])) as ArrayRef;
2627
2628        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2629        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2630
2631        for i in 0..rows.num_rows() {
2632            for j in i + 1..rows.num_rows() {
2633                assert!(
2634                    rows.row(i) < rows.row(j),
2635                    "{} < {} - {:?} < {:?}",
2636                    i,
2637                    j,
2638                    rows.row(i),
2639                    rows.row(j)
2640                );
2641            }
2642        }
2643
2644        let cols = converter.convert_rows(&rows).unwrap();
2645        assert_eq!(&cols[0], &col);
2646
2647        let converter = RowConverter::new(vec![SortField::new_with_options(
2648            DataType::Binary,
2649            SortOptions::default().desc().with_nulls_first(false),
2650        )])
2651        .unwrap();
2652        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2653
2654        for i in 0..rows.num_rows() {
2655            for j in i + 1..rows.num_rows() {
2656                assert!(
2657                    rows.row(i) > rows.row(j),
2658                    "{} > {} - {:?} > {:?}",
2659                    i,
2660                    j,
2661                    rows.row(i),
2662                    rows.row(j)
2663                );
2664            }
2665        }
2666
2667        let cols = converter.convert_rows(&rows).unwrap();
2668        assert_eq!(&cols[0], &col);
2669    }
2670
2671    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
2672    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2673        match b.data_type() {
2674            DataType::Dictionary(_, v) => {
2675                assert_eq!(a.data_type(), v.as_ref());
2676                let b = arrow_cast::cast(b, v).unwrap();
2677                assert_eq!(a, b.as_ref())
2678            }
2679            _ => assert_eq!(a, b),
2680        }
2681    }
2682
2683    #[test]
2684    fn test_string_dictionary() {
2685        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2686            Some("foo"),
2687            Some("hello"),
2688            Some("he"),
2689            None,
2690            Some("hello"),
2691            Some(""),
2692            Some("hello"),
2693            Some("hello"),
2694        ])) as ArrayRef;
2695
2696        let field = SortField::new(a.data_type().clone());
2697        let converter = RowConverter::new(vec![field]).unwrap();
2698        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2699
2700        assert!(rows_a.row(3) < rows_a.row(5));
2701        assert!(rows_a.row(2) < rows_a.row(1));
2702        assert!(rows_a.row(0) < rows_a.row(1));
2703        assert!(rows_a.row(3) < rows_a.row(0));
2704
2705        assert_eq!(rows_a.row(1), rows_a.row(4));
2706        assert_eq!(rows_a.row(1), rows_a.row(6));
2707        assert_eq!(rows_a.row(1), rows_a.row(7));
2708
2709        let cols = converter.convert_rows(&rows_a).unwrap();
2710        dictionary_eq(&cols[0], &a);
2711
2712        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2713            Some("hello"),
2714            None,
2715            Some("cupcakes"),
2716        ])) as ArrayRef;
2717
2718        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2719        assert_eq!(rows_a.row(1), rows_b.row(0));
2720        assert_eq!(rows_a.row(3), rows_b.row(1));
2721        assert!(rows_b.row(2) < rows_a.row(0));
2722
2723        let cols = converter.convert_rows(&rows_b).unwrap();
2724        dictionary_eq(&cols[0], &b);
2725
2726        let converter = RowConverter::new(vec![SortField::new_with_options(
2727            a.data_type().clone(),
2728            SortOptions::default().desc().with_nulls_first(false),
2729        )])
2730        .unwrap();
2731
2732        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2733        assert!(rows_c.row(3) > rows_c.row(5));
2734        assert!(rows_c.row(2) > rows_c.row(1));
2735        assert!(rows_c.row(0) > rows_c.row(1));
2736        assert!(rows_c.row(3) > rows_c.row(0));
2737
2738        let cols = converter.convert_rows(&rows_c).unwrap();
2739        dictionary_eq(&cols[0], &a);
2740
2741        let converter = RowConverter::new(vec![SortField::new_with_options(
2742            a.data_type().clone(),
2743            SortOptions::default().desc().with_nulls_first(true),
2744        )])
2745        .unwrap();
2746
2747        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2748        assert!(rows_c.row(3) < rows_c.row(5));
2749        assert!(rows_c.row(2) > rows_c.row(1));
2750        assert!(rows_c.row(0) > rows_c.row(1));
2751        assert!(rows_c.row(3) < rows_c.row(0));
2752
2753        let cols = converter.convert_rows(&rows_c).unwrap();
2754        dictionary_eq(&cols[0], &a);
2755    }
2756
2757    #[test]
2758    fn test_struct() {
2759        // Test basic
2760        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2761        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2762        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2763        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2764        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2765
2766        let sort_fields = vec![SortField::new(s1.data_type().clone())];
2767        let converter = RowConverter::new(sort_fields).unwrap();
2768        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2769
2770        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2771            assert!(a < b);
2772        }
2773
2774        let back = converter.convert_rows(&r1).unwrap();
2775        assert_eq!(back.len(), 1);
2776        assert_eq!(&back[0], &s1);
2777
2778        // Test struct nullability
2779        let data = s1
2780            .to_data()
2781            .into_builder()
2782            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2783            .null_count(2)
2784            .build()
2785            .unwrap();
2786
2787        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2788        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2789        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
2790        assert!(r2.row(0) < r2.row(1)); // Nulls first
2791        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
2792        assert_eq!(r1.row(1), r2.row(1)); // Values equal
2793
2794        let back = converter.convert_rows(&r2).unwrap();
2795        assert_eq!(back.len(), 1);
2796        assert_eq!(&back[0], &s2);
2797
2798        back[0].to_data().validate_full().unwrap();
2799    }
2800
2801    #[test]
2802    fn test_dictionary_in_struct() {
2803        let builder = StringDictionaryBuilder::<Int32Type>::new();
2804        let mut struct_builder = StructBuilder::new(
2805            vec![Field::new_dictionary(
2806                "foo",
2807                DataType::Int32,
2808                DataType::Utf8,
2809                true,
2810            )],
2811            vec![Box::new(builder)],
2812        );
2813
2814        let dict_builder = struct_builder
2815            .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2816            .unwrap();
2817
2818        // Flattened: ["a", null, "a", "b"]
2819        dict_builder.append_value("a");
2820        dict_builder.append_null();
2821        dict_builder.append_value("a");
2822        dict_builder.append_value("b");
2823
2824        for _ in 0..4 {
2825            struct_builder.append(true);
2826        }
2827
2828        let s = Arc::new(struct_builder.finish()) as ArrayRef;
2829        let sort_fields = vec![SortField::new(s.data_type().clone())];
2830        let converter = RowConverter::new(sort_fields).unwrap();
2831        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2832
2833        let back = converter.convert_rows(&r).unwrap();
2834        let [s2] = back.try_into().unwrap();
2835
2836        // RowConverter flattens Dictionary
2837        // s.ty = Struct("foo": Dictionary(Int32, Utf8)), s2.ty = Struct("foo": Utf8)
2838        assert_ne!(&s.data_type(), &s2.data_type());
2839        s2.to_data().validate_full().unwrap();
2840
2841        // Check if the logical data remains the same
2842        // Keys: [0, null, 0, 1]
2843        // Values: ["a", "b"]
2844        let s1_struct = s.as_struct();
2845        let s1_0 = s1_struct.column(0);
2846        let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2847        let keys = s1_idx_0.keys();
2848        let values = s1_idx_0.values().as_string::<i32>();
2849        // Flattened: ["a", null, "a", "b"]
2850        let s2_struct = s2.as_struct();
2851        let s2_0 = s2_struct.column(0);
2852        let s2_idx_0 = s2_0.as_string::<i32>();
2853
2854        for i in 0..keys.len() {
2855            if keys.is_null(i) {
2856                assert!(s2_idx_0.is_null(i));
2857            } else {
2858                let dict_index = keys.value(i) as usize;
2859                assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2860            }
2861        }
2862    }
2863
2864    #[test]
2865    fn test_dictionary_in_struct_empty() {
2866        let ty = DataType::Struct(
2867            vec![Field::new_dictionary(
2868                "foo",
2869                DataType::Int32,
2870                DataType::Int32,
2871                false,
2872            )]
2873            .into(),
2874        );
2875        let s = arrow_array::new_empty_array(&ty);
2876
2877        let sort_fields = vec![SortField::new(s.data_type().clone())];
2878        let converter = RowConverter::new(sort_fields).unwrap();
2879        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2880
2881        let back = converter.convert_rows(&r).unwrap();
2882        let [s2] = back.try_into().unwrap();
2883
2884        // RowConverter flattens Dictionary
2885        // s.ty = Struct("foo": Dictionary(Int32, Int32)), s2.ty = Struct("foo": Int32)
2886        assert_ne!(&s.data_type(), &s2.data_type());
2887        s2.to_data().validate_full().unwrap();
2888        assert_eq!(s.len(), 0);
2889        assert_eq!(s2.len(), 0);
2890    }
2891
2892    #[test]
2893    fn test_list_of_string_dictionary() {
2894        let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2895        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2896        builder.values().append("a").unwrap();
2897        builder.values().append("b").unwrap();
2898        builder.values().append("zero").unwrap();
2899        builder.values().append_null();
2900        builder.values().append("c").unwrap();
2901        builder.values().append("b").unwrap();
2902        builder.values().append("d").unwrap();
2903        builder.append(true);
2904        // List[1] = null
2905        builder.append(false);
2906        // List[2] = ["e", "zero", "a" (dict)]
2907        builder.values().append("e").unwrap();
2908        builder.values().append("zero").unwrap();
2909        builder.values().append("a").unwrap();
2910        builder.append(true);
2911
2912        let a = Arc::new(builder.finish()) as ArrayRef;
2913        let data_type = a.data_type().clone();
2914
2915        let field = SortField::new(data_type.clone());
2916        let converter = RowConverter::new(vec![field]).unwrap();
2917        let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2918
2919        let back = converter.convert_rows(&rows).unwrap();
2920        assert_eq!(back.len(), 1);
2921        let [a2] = back.try_into().unwrap();
2922
2923        // RowConverter flattens Dictionary
2924        // a.ty: List(Dictionary(Int32, Utf8)), a2.ty: List(Utf8)
2925        assert_ne!(&a.data_type(), &a2.data_type());
2926
2927        a2.to_data().validate_full().unwrap();
2928
2929        let a2_list = a2.as_list::<i32>();
2930        let a1_list = a.as_list::<i32>();
2931
2932        // Check if the logical data remains the same
2933        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2934        let a1_0 = a1_list.value(0);
2935        let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2936        let keys = a1_idx_0.keys();
2937        let values = a1_idx_0.values().as_string::<i32>();
2938        let a2_0 = a2_list.value(0);
2939        let a2_idx_0 = a2_0.as_string::<i32>();
2940
2941        for i in 0..keys.len() {
2942            if keys.is_null(i) {
2943                assert!(a2_idx_0.is_null(i));
2944            } else {
2945                let dict_index = keys.value(i) as usize;
2946                assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2947            }
2948        }
2949
2950        // List[1] = null
2951        assert!(a1_list.is_null(1));
2952        assert!(a2_list.is_null(1));
2953
2954        // List[2] = ["e", "zero", "a" (dict)]
2955        let a1_2 = a1_list.value(2);
2956        let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2957        let keys = a1_idx_2.keys();
2958        let values = a1_idx_2.values().as_string::<i32>();
2959        let a2_2 = a2_list.value(2);
2960        let a2_idx_2 = a2_2.as_string::<i32>();
2961
2962        for i in 0..keys.len() {
2963            if keys.is_null(i) {
2964                assert!(a2_idx_2.is_null(i));
2965            } else {
2966                let dict_index = keys.value(i) as usize;
2967                assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2968            }
2969        }
2970    }
2971
2972    #[test]
2973    fn test_primitive_dictionary() {
2974        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2975        builder.append(2).unwrap();
2976        builder.append(3).unwrap();
2977        builder.append(0).unwrap();
2978        builder.append_null();
2979        builder.append(5).unwrap();
2980        builder.append(3).unwrap();
2981        builder.append(-1).unwrap();
2982
2983        let a = builder.finish();
2984        let data_type = a.data_type().clone();
2985        let columns = [Arc::new(a) as ArrayRef];
2986
2987        let field = SortField::new(data_type.clone());
2988        let converter = RowConverter::new(vec![field]).unwrap();
2989        let rows = converter.convert_columns(&columns).unwrap();
2990        assert!(rows.row(0) < rows.row(1));
2991        assert!(rows.row(2) < rows.row(0));
2992        assert!(rows.row(3) < rows.row(2));
2993        assert!(rows.row(6) < rows.row(2));
2994        assert!(rows.row(3) < rows.row(6));
2995
2996        let back = converter.convert_rows(&rows).unwrap();
2997        assert_eq!(back.len(), 1);
2998        back[0].to_data().validate_full().unwrap();
2999    }
3000
3001    #[test]
3002    fn test_dictionary_nulls() {
3003        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
3004        let keys =
3005            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
3006
3007        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
3008        let data = keys
3009            .into_builder()
3010            .data_type(data_type.clone())
3011            .child_data(vec![values])
3012            .build()
3013            .unwrap();
3014
3015        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
3016        let field = SortField::new(data_type.clone());
3017        let converter = RowConverter::new(vec![field]).unwrap();
3018        let rows = converter.convert_columns(&columns).unwrap();
3019
3020        assert_eq!(rows.row(0), rows.row(1));
3021        assert_eq!(rows.row(3), rows.row(4));
3022        assert_eq!(rows.row(4), rows.row(5));
3023        assert!(rows.row(3) < rows.row(0));
3024    }
3025
3026    #[test]
3027    fn test_from_binary_shared_buffer() {
3028        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
3029        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
3030        let rows = converter.convert_columns(&[array]).unwrap();
3031        let binary_rows = rows.try_into_binary().expect("known-small rows");
3032        let _binary_rows_shared_buffer = binary_rows.clone();
3033
3034        let parsed = converter.from_binary(binary_rows);
3035
3036        converter.convert_rows(parsed.iter()).unwrap();
3037    }
3038
3039    #[test]
3040    #[should_panic(expected = "Encountered non UTF-8 data")]
3041    fn test_invalid_utf8() {
3042        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
3043        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
3044        let rows = converter.convert_columns(&[array]).unwrap();
3045        let binary_row = rows.row(0);
3046
3047        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3048        let parser = converter.parser();
3049        let utf8_row = parser.parse(binary_row.as_ref());
3050
3051        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
3052    }
3053
3054    #[test]
3055    #[should_panic(expected = "Encountered non UTF-8 data")]
3056    fn test_invalid_utf8_array() {
3057        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
3058        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
3059        let rows = converter.convert_columns(&[array]).unwrap();
3060        let binary_rows = rows.try_into_binary().expect("known-small rows");
3061
3062        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3063        let parsed = converter.from_binary(binary_rows);
3064
3065        converter.convert_rows(parsed.iter()).unwrap();
3066    }
3067
3068    #[test]
3069    #[should_panic(expected = "index out of bounds")]
3070    fn test_invalid_empty() {
3071        let binary_row: &[u8] = &[];
3072
3073        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3074        let parser = converter.parser();
3075        let utf8_row = parser.parse(binary_row.as_ref());
3076
3077        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
3078    }
3079
3080    #[test]
3081    #[should_panic(expected = "index out of bounds")]
3082    fn test_invalid_empty_array() {
3083        let row: &[u8] = &[];
3084        let binary_rows = BinaryArray::from(vec![row]);
3085
3086        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3087        let parsed = converter.from_binary(binary_rows);
3088
3089        converter.convert_rows(parsed.iter()).unwrap();
3090    }
3091
3092    #[test]
3093    #[should_panic(expected = "index out of bounds")]
3094    fn test_invalid_truncated() {
3095        let binary_row: &[u8] = &[0x02];
3096
3097        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3098        let parser = converter.parser();
3099        let utf8_row = parser.parse(binary_row.as_ref());
3100
3101        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
3102    }
3103
3104    #[test]
3105    #[should_panic(expected = "index out of bounds")]
3106    fn test_invalid_truncated_array() {
3107        let row: &[u8] = &[0x02];
3108        let binary_rows = BinaryArray::from(vec![row]);
3109
3110        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3111        let parsed = converter.from_binary(binary_rows);
3112
3113        converter.convert_rows(parsed.iter()).unwrap();
3114    }
3115
3116    #[test]
3117    #[should_panic(expected = "rows were not produced by this RowConverter")]
3118    fn test_different_converter() {
3119        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
3120        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3121        let rows = converter.convert_columns(&[values]).unwrap();
3122
3123        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3124        let _ = converter.convert_rows(&rows);
3125    }
3126
3127    fn test_single_list<O: OffsetSizeTrait>() {
3128        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
3129        builder.values().append_value(32);
3130        builder.values().append_value(52);
3131        builder.values().append_value(32);
3132        builder.append(true);
3133        builder.values().append_value(32);
3134        builder.values().append_value(52);
3135        builder.values().append_value(12);
3136        builder.append(true);
3137        builder.values().append_value(32);
3138        builder.values().append_value(52);
3139        builder.append(true);
3140        builder.values().append_value(32); // MASKED
3141        builder.values().append_value(52); // MASKED
3142        builder.append(false);
3143        builder.values().append_value(32);
3144        builder.values().append_null();
3145        builder.append(true);
3146        builder.append(true);
3147        builder.values().append_value(17); // MASKED
3148        builder.values().append_null(); // MASKED
3149        builder.append(false);
3150
3151        let list = Arc::new(builder.finish()) as ArrayRef;
3152        let d = list.data_type().clone();
3153
3154        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3155
3156        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3157        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3158        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
3159        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
3160        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
3161        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
3162        assert!(rows.row(3) < rows.row(5)); // null < []
3163        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3164
3165        let back = converter.convert_rows(&rows).unwrap();
3166        assert_eq!(back.len(), 1);
3167        back[0].to_data().validate_full().unwrap();
3168        assert_eq!(&back[0], &list);
3169
3170        let options = SortOptions::default().asc().with_nulls_first(false);
3171        let field = SortField::new_with_options(d.clone(), options);
3172        let converter = RowConverter::new(vec![field]).unwrap();
3173        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3174
3175        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3176        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
3177        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
3178        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
3179        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
3180        assert!(rows.row(3) > rows.row(5)); // null > []
3181        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3182
3183        let back = converter.convert_rows(&rows).unwrap();
3184        assert_eq!(back.len(), 1);
3185        back[0].to_data().validate_full().unwrap();
3186        assert_eq!(&back[0], &list);
3187
3188        let options = SortOptions::default().desc().with_nulls_first(false);
3189        let field = SortField::new_with_options(d.clone(), options);
3190        let converter = RowConverter::new(vec![field]).unwrap();
3191        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3192
3193        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3194        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
3195        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
3196        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
3197        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
3198        assert!(rows.row(3) > rows.row(5)); // null > []
3199        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3200
3201        let back = converter.convert_rows(&rows).unwrap();
3202        assert_eq!(back.len(), 1);
3203        back[0].to_data().validate_full().unwrap();
3204        assert_eq!(&back[0], &list);
3205
3206        let options = SortOptions::default().desc().with_nulls_first(true);
3207        let field = SortField::new_with_options(d, options);
3208        let converter = RowConverter::new(vec![field]).unwrap();
3209        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3210
3211        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3212        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
3213        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
3214        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
3215        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
3216        assert!(rows.row(3) < rows.row(5)); // null < []
3217        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3218
3219        let back = converter.convert_rows(&rows).unwrap();
3220        assert_eq!(back.len(), 1);
3221        back[0].to_data().validate_full().unwrap();
3222        assert_eq!(&back[0], &list);
3223
3224        let sliced_list = list.slice(1, 5);
3225        let rows_on_sliced_list = converter
3226            .convert_columns(&[Arc::clone(&sliced_list)])
3227            .unwrap();
3228
3229        assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); // [32, 52] > [32, 52, 12]
3230        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52]
3231        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null] < [32, 52]
3232        assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); // [] > [32, 52]
3233        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < []
3234
3235        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3236        assert_eq!(back.len(), 1);
3237        back[0].to_data().validate_full().unwrap();
3238        assert_eq!(&back[0], &sliced_list);
3239    }
3240
3241    fn test_nested_list<O: OffsetSizeTrait>() {
3242        let mut builder =
3243            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
3244
3245        builder.values().values().append_value(1);
3246        builder.values().values().append_value(2);
3247        builder.values().append(true);
3248        builder.values().values().append_value(1);
3249        builder.values().values().append_null();
3250        builder.values().append(true);
3251        builder.append(true);
3252
3253        builder.values().values().append_value(1);
3254        builder.values().values().append_null();
3255        builder.values().append(true);
3256        builder.values().values().append_value(1);
3257        builder.values().values().append_null();
3258        builder.values().append(true);
3259        builder.append(true);
3260
3261        builder.values().values().append_value(1);
3262        builder.values().values().append_null();
3263        builder.values().append(true);
3264        builder.values().append(false);
3265        builder.append(true);
3266        builder.append(false);
3267
3268        builder.values().values().append_value(1);
3269        builder.values().values().append_value(2);
3270        builder.values().append(true);
3271        builder.append(true);
3272
3273        let list = Arc::new(builder.finish()) as ArrayRef;
3274        let d = list.data_type().clone();
3275
3276        // [
3277        //   [[1, 2], [1, null]],
3278        //   [[1, null], [1, null]],
3279        //   [[1, null], null]
3280        //   null
3281        //   [[1, 2]]
3282        // ]
3283        let options = SortOptions::default().asc().with_nulls_first(true);
3284        let field = SortField::new_with_options(d.clone(), options);
3285        let converter = RowConverter::new(vec![field]).unwrap();
3286        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3287
3288        assert!(rows.row(0) > rows.row(1));
3289        assert!(rows.row(1) > rows.row(2));
3290        assert!(rows.row(2) > rows.row(3));
3291        assert!(rows.row(4) < rows.row(0));
3292        assert!(rows.row(4) > rows.row(1));
3293
3294        let back = converter.convert_rows(&rows).unwrap();
3295        assert_eq!(back.len(), 1);
3296        back[0].to_data().validate_full().unwrap();
3297        assert_eq!(&back[0], &list);
3298
3299        let options = SortOptions::default().desc().with_nulls_first(true);
3300        let field = SortField::new_with_options(d.clone(), options);
3301        let converter = RowConverter::new(vec![field]).unwrap();
3302        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3303
3304        assert!(rows.row(0) > rows.row(1));
3305        assert!(rows.row(1) > rows.row(2));
3306        assert!(rows.row(2) > rows.row(3));
3307        assert!(rows.row(4) > rows.row(0));
3308        assert!(rows.row(4) > rows.row(1));
3309
3310        let back = converter.convert_rows(&rows).unwrap();
3311        assert_eq!(back.len(), 1);
3312        back[0].to_data().validate_full().unwrap();
3313        assert_eq!(&back[0], &list);
3314
3315        let options = SortOptions::default().desc().with_nulls_first(false);
3316        let field = SortField::new_with_options(d, options);
3317        let converter = RowConverter::new(vec![field]).unwrap();
3318        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3319
3320        assert!(rows.row(0) < rows.row(1));
3321        assert!(rows.row(1) < rows.row(2));
3322        assert!(rows.row(2) < rows.row(3));
3323        assert!(rows.row(4) > rows.row(0));
3324        assert!(rows.row(4) < rows.row(1));
3325
3326        let back = converter.convert_rows(&rows).unwrap();
3327        assert_eq!(back.len(), 1);
3328        back[0].to_data().validate_full().unwrap();
3329        assert_eq!(&back[0], &list);
3330
3331        let sliced_list = list.slice(1, 3);
3332        let rows = converter
3333            .convert_columns(&[Arc::clone(&sliced_list)])
3334            .unwrap();
3335
3336        assert!(rows.row(0) < rows.row(1));
3337        assert!(rows.row(1) < rows.row(2));
3338
3339        let back = converter.convert_rows(&rows).unwrap();
3340        assert_eq!(back.len(), 1);
3341        back[0].to_data().validate_full().unwrap();
3342        assert_eq!(&back[0], &sliced_list);
3343    }
3344
3345    #[test]
3346    fn test_list() {
3347        test_single_list::<i32>();
3348        test_nested_list::<i32>();
3349    }
3350
3351    #[test]
3352    fn test_large_list() {
3353        test_single_list::<i64>();
3354        test_nested_list::<i64>();
3355    }
3356
3357    fn test_single_list_view<O: OffsetSizeTrait>() {
3358        let mut builder = GenericListViewBuilder::<O, _>::new(Int32Builder::new());
3359        builder.values().append_value(32);
3360        builder.values().append_value(52);
3361        builder.values().append_value(32);
3362        builder.append(true);
3363        builder.values().append_value(32);
3364        builder.values().append_value(52);
3365        builder.values().append_value(12);
3366        builder.append(true);
3367        builder.values().append_value(32);
3368        builder.values().append_value(52);
3369        builder.append(true);
3370        builder.values().append_value(32); // MASKED
3371        builder.values().append_value(52); // MASKED
3372        builder.append(false);
3373        builder.values().append_value(32);
3374        builder.values().append_null();
3375        builder.append(true);
3376        builder.append(true);
3377        builder.values().append_value(17); // MASKED
3378        builder.values().append_null(); // MASKED
3379        builder.append(false);
3380
3381        let list = Arc::new(builder.finish()) as ArrayRef;
3382        let d = list.data_type().clone();
3383
3384        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3385
3386        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3387        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3388        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
3389        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
3390        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
3391        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
3392        assert!(rows.row(3) < rows.row(5)); // null < []
3393        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3394
3395        let back = converter.convert_rows(&rows).unwrap();
3396        assert_eq!(back.len(), 1);
3397        back[0].to_data().validate_full().unwrap();
3398
3399        // Verify the content matches (ListView may have different physical layout but same logical content)
3400        let back_list_view = back[0]
3401            .as_any()
3402            .downcast_ref::<GenericListViewArray<O>>()
3403            .unwrap();
3404        let orig_list_view = list
3405            .as_any()
3406            .downcast_ref::<GenericListViewArray<O>>()
3407            .unwrap();
3408
3409        assert_eq!(back_list_view.len(), orig_list_view.len());
3410        for i in 0..back_list_view.len() {
3411            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3412            if back_list_view.is_valid(i) {
3413                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3414            }
3415        }
3416
3417        let options = SortOptions::default().asc().with_nulls_first(false);
3418        let field = SortField::new_with_options(d.clone(), options);
3419        let converter = RowConverter::new(vec![field]).unwrap();
3420        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3421
3422        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3423        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
3424        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
3425        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
3426        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
3427        assert!(rows.row(3) > rows.row(5)); // null > []
3428        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3429
3430        let back = converter.convert_rows(&rows).unwrap();
3431        assert_eq!(back.len(), 1);
3432        back[0].to_data().validate_full().unwrap();
3433
3434        let options = SortOptions::default().desc().with_nulls_first(false);
3435        let field = SortField::new_with_options(d.clone(), options);
3436        let converter = RowConverter::new(vec![field]).unwrap();
3437        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3438
3439        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3440        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
3441        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
3442        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
3443        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
3444        assert!(rows.row(3) > rows.row(5)); // null > []
3445        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3446
3447        let back = converter.convert_rows(&rows).unwrap();
3448        assert_eq!(back.len(), 1);
3449        back[0].to_data().validate_full().unwrap();
3450
3451        let options = SortOptions::default().desc().with_nulls_first(true);
3452        let field = SortField::new_with_options(d, options);
3453        let converter = RowConverter::new(vec![field]).unwrap();
3454        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3455
3456        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3457        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
3458        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
3459        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
3460        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
3461        assert!(rows.row(3) < rows.row(5)); // null < []
3462        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3463
3464        let back = converter.convert_rows(&rows).unwrap();
3465        assert_eq!(back.len(), 1);
3466        back[0].to_data().validate_full().unwrap();
3467
3468        let sliced_list = list.slice(1, 5);
3469        let rows_on_sliced_list = converter
3470            .convert_columns(&[Arc::clone(&sliced_list)])
3471            .unwrap();
3472
3473        assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); // [32, 52] > [32, 52, 12]
3474        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52]
3475        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null] < [32, 52]
3476        assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); // [] > [32, 52]
3477        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < []
3478
3479        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3480        assert_eq!(back.len(), 1);
3481        back[0].to_data().validate_full().unwrap();
3482    }
3483
3484    fn test_nested_list_view<O: OffsetSizeTrait>() {
3485        let mut builder = GenericListViewBuilder::<O, _>::new(GenericListViewBuilder::<O, _>::new(
3486            Int32Builder::new(),
3487        ));
3488
3489        // Row 0: [[1, 2], [1, null]]
3490        builder.values().values().append_value(1);
3491        builder.values().values().append_value(2);
3492        builder.values().append(true);
3493        builder.values().values().append_value(1);
3494        builder.values().values().append_null();
3495        builder.values().append(true);
3496        builder.append(true);
3497
3498        // Row 1: [[1, null], [1, null]]
3499        builder.values().values().append_value(1);
3500        builder.values().values().append_null();
3501        builder.values().append(true);
3502        builder.values().values().append_value(1);
3503        builder.values().values().append_null();
3504        builder.values().append(true);
3505        builder.append(true);
3506
3507        // Row 2: [[1, null], null]
3508        builder.values().values().append_value(1);
3509        builder.values().values().append_null();
3510        builder.values().append(true);
3511        builder.values().append(false);
3512        builder.append(true);
3513
3514        // Row 3: null
3515        builder.append(false);
3516
3517        // Row 4: [[1, 2]]
3518        builder.values().values().append_value(1);
3519        builder.values().values().append_value(2);
3520        builder.values().append(true);
3521        builder.append(true);
3522
3523        let list = Arc::new(builder.finish()) as ArrayRef;
3524        let d = list.data_type().clone();
3525
3526        // [
3527        //   [[1, 2], [1, null]],
3528        //   [[1, null], [1, null]],
3529        //   [[1, null], null]
3530        //   null
3531        //   [[1, 2]]
3532        // ]
3533        let options = SortOptions::default().asc().with_nulls_first(true);
3534        let field = SortField::new_with_options(d.clone(), options);
3535        let converter = RowConverter::new(vec![field]).unwrap();
3536        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3537
3538        assert!(rows.row(0) > rows.row(1));
3539        assert!(rows.row(1) > rows.row(2));
3540        assert!(rows.row(2) > rows.row(3));
3541        assert!(rows.row(4) < rows.row(0));
3542        assert!(rows.row(4) > rows.row(1));
3543
3544        let back = converter.convert_rows(&rows).unwrap();
3545        assert_eq!(back.len(), 1);
3546        back[0].to_data().validate_full().unwrap();
3547
3548        // Verify the content matches (ListView may have different physical layout but same logical content)
3549        let back_list_view = back[0]
3550            .as_any()
3551            .downcast_ref::<GenericListViewArray<O>>()
3552            .unwrap();
3553        let orig_list_view = list
3554            .as_any()
3555            .downcast_ref::<GenericListViewArray<O>>()
3556            .unwrap();
3557
3558        assert_eq!(back_list_view.len(), orig_list_view.len());
3559        for i in 0..back_list_view.len() {
3560            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3561            if back_list_view.is_valid(i) {
3562                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3563            }
3564        }
3565
3566        let options = SortOptions::default().desc().with_nulls_first(true);
3567        let field = SortField::new_with_options(d.clone(), options);
3568        let converter = RowConverter::new(vec![field]).unwrap();
3569        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3570
3571        assert!(rows.row(0) > rows.row(1));
3572        assert!(rows.row(1) > rows.row(2));
3573        assert!(rows.row(2) > rows.row(3));
3574        assert!(rows.row(4) > rows.row(0));
3575        assert!(rows.row(4) > rows.row(1));
3576
3577        let back = converter.convert_rows(&rows).unwrap();
3578        assert_eq!(back.len(), 1);
3579        back[0].to_data().validate_full().unwrap();
3580
3581        // Verify the content matches
3582        let back_list_view = back[0]
3583            .as_any()
3584            .downcast_ref::<GenericListViewArray<O>>()
3585            .unwrap();
3586
3587        assert_eq!(back_list_view.len(), orig_list_view.len());
3588        for i in 0..back_list_view.len() {
3589            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3590            if back_list_view.is_valid(i) {
3591                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3592            }
3593        }
3594
3595        let options = SortOptions::default().desc().with_nulls_first(false);
3596        let field = SortField::new_with_options(d.clone(), options);
3597        let converter = RowConverter::new(vec![field]).unwrap();
3598        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3599
3600        assert!(rows.row(0) < rows.row(1));
3601        assert!(rows.row(1) < rows.row(2));
3602        assert!(rows.row(2) < rows.row(3));
3603        assert!(rows.row(4) > rows.row(0));
3604        assert!(rows.row(4) < rows.row(1));
3605
3606        let back = converter.convert_rows(&rows).unwrap();
3607        assert_eq!(back.len(), 1);
3608        back[0].to_data().validate_full().unwrap();
3609
3610        // Verify the content matches
3611        let back_list_view = back[0]
3612            .as_any()
3613            .downcast_ref::<GenericListViewArray<O>>()
3614            .unwrap();
3615
3616        assert_eq!(back_list_view.len(), orig_list_view.len());
3617        for i in 0..back_list_view.len() {
3618            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3619            if back_list_view.is_valid(i) {
3620                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3621            }
3622        }
3623
3624        let sliced_list = list.slice(1, 3);
3625        let rows = converter
3626            .convert_columns(&[Arc::clone(&sliced_list)])
3627            .unwrap();
3628
3629        assert!(rows.row(0) < rows.row(1));
3630        assert!(rows.row(1) < rows.row(2));
3631
3632        let back = converter.convert_rows(&rows).unwrap();
3633        assert_eq!(back.len(), 1);
3634        back[0].to_data().validate_full().unwrap();
3635    }
3636
3637    #[test]
3638    fn test_list_view() {
3639        test_single_list_view::<i32>();
3640        test_nested_list_view::<i32>();
3641    }
3642
3643    #[test]
3644    fn test_large_list_view() {
3645        test_single_list_view::<i64>();
3646        test_nested_list_view::<i64>();
3647    }
3648
3649    fn test_list_view_with_shared_values<O: OffsetSizeTrait>() {
3650        // Create a values array: [1, 2, 3, 4, 5, 6, 7, 8]
3651        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
3652        let field = Arc::new(Field::new_list_field(DataType::Int32, true));
3653
3654        // Create a ListView where:
3655        // - Row 0: offset=0, size=3 -> [1, 2, 3]
3656        // - Row 1: offset=0, size=3 -> [1, 2, 3] (same offset+size as row 0)
3657        // - Row 2: offset=5, size=2 -> [6, 7] (non-monotonic offset)
3658        // - Row 3: offset=2, size=2 -> [3, 4] (offset goes back)
3659        // - Row 4: offset=1, size=4 -> [2, 3, 4, 5] (subset of values that contains row 3's range)
3660        // - Row 5: offset=2, size=1 -> [3] (subset of row 3 and row 4)
3661        let offsets = ScalarBuffer::<O>::from(vec![
3662            O::from_usize(0).unwrap(),
3663            O::from_usize(0).unwrap(),
3664            O::from_usize(5).unwrap(),
3665            O::from_usize(2).unwrap(),
3666            O::from_usize(1).unwrap(),
3667            O::from_usize(2).unwrap(),
3668        ]);
3669        let sizes = ScalarBuffer::<O>::from(vec![
3670            O::from_usize(3).unwrap(),
3671            O::from_usize(3).unwrap(),
3672            O::from_usize(2).unwrap(),
3673            O::from_usize(2).unwrap(),
3674            O::from_usize(4).unwrap(),
3675            O::from_usize(1).unwrap(),
3676        ]);
3677
3678        let list_view: GenericListViewArray<O> =
3679            GenericListViewArray::try_new(field, offsets, sizes, Arc::new(values), None).unwrap();
3680
3681        let d = list_view.data_type().clone();
3682        let list = Arc::new(list_view) as ArrayRef;
3683
3684        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3685        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3686
3687        // Row 0 and Row 1 have the same content [1, 2, 3], so they should be equal
3688        assert_eq!(rows.row(0), rows.row(1));
3689
3690        // [1, 2, 3] < [6, 7] (comparing first elements: 1 < 6)
3691        assert!(rows.row(0) < rows.row(2));
3692
3693        // [3, 4] > [1, 2, 3] (comparing first elements: 3 > 1)
3694        assert!(rows.row(3) > rows.row(0));
3695
3696        // [2, 3, 4, 5] > [1, 2, 3] (comparing first elements: 2 > 1)
3697        assert!(rows.row(4) > rows.row(0));
3698
3699        // [3] < [3, 4] (same prefix but shorter)
3700        assert!(rows.row(5) < rows.row(3));
3701
3702        // [3] < [2, 3, 4, 5] (comparing first elements: 3 > 2)
3703        assert!(rows.row(5) > rows.row(4));
3704
3705        // Round-trip conversion
3706        let back = converter.convert_rows(&rows).unwrap();
3707        assert_eq!(back.len(), 1);
3708        back[0].to_data().validate_full().unwrap();
3709
3710        // Verify logical content matches
3711        let back_list_view = back[0]
3712            .as_any()
3713            .downcast_ref::<GenericListViewArray<O>>()
3714            .unwrap();
3715        let orig_list_view = list
3716            .as_any()
3717            .downcast_ref::<GenericListViewArray<O>>()
3718            .unwrap();
3719
3720        assert_eq!(back_list_view.len(), orig_list_view.len());
3721        for i in 0..back_list_view.len() {
3722            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3723            if back_list_view.is_valid(i) {
3724                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3725            }
3726        }
3727
3728        // Test with descending order
3729        let options = SortOptions::default().desc();
3730        let field = SortField::new_with_options(d, options);
3731        let converter = RowConverter::new(vec![field]).unwrap();
3732        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3733
3734        // In descending order, comparisons are reversed
3735        assert_eq!(rows.row(0), rows.row(1)); // Equal rows stay equal
3736        assert!(rows.row(0) > rows.row(2)); // [1, 2, 3] > [6, 7] in desc
3737        assert!(rows.row(3) < rows.row(0)); // [3, 4] < [1, 2, 3] in desc
3738
3739        let back = converter.convert_rows(&rows).unwrap();
3740        assert_eq!(back.len(), 1);
3741        back[0].to_data().validate_full().unwrap();
3742    }
3743
3744    #[test]
3745    fn test_list_view_shared_values() {
3746        test_list_view_with_shared_values::<i32>();
3747    }
3748
3749    #[test]
3750    fn test_large_list_view_shared_values() {
3751        test_list_view_with_shared_values::<i64>();
3752    }
3753
3754    #[test]
3755    fn test_fixed_size_list() {
3756        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3757        builder.values().append_value(32);
3758        builder.values().append_value(52);
3759        builder.values().append_value(32);
3760        builder.append(true);
3761        builder.values().append_value(32);
3762        builder.values().append_value(52);
3763        builder.values().append_value(12);
3764        builder.append(true);
3765        builder.values().append_value(32);
3766        builder.values().append_value(52);
3767        builder.values().append_null();
3768        builder.append(true);
3769        builder.values().append_value(32); // MASKED
3770        builder.values().append_value(52); // MASKED
3771        builder.values().append_value(13); // MASKED
3772        builder.append(false);
3773        builder.values().append_value(32);
3774        builder.values().append_null();
3775        builder.values().append_null();
3776        builder.append(true);
3777        builder.values().append_null();
3778        builder.values().append_null();
3779        builder.values().append_null();
3780        builder.append(true);
3781        builder.values().append_value(17); // MASKED
3782        builder.values().append_null(); // MASKED
3783        builder.values().append_value(77); // MASKED
3784        builder.append(false);
3785
3786        let list = Arc::new(builder.finish()) as ArrayRef;
3787        let d = list.data_type().clone();
3788
3789        // Default sorting (ascending, nulls first)
3790        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3791
3792        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3793        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3794        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12]
3795        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
3796        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
3797        assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null]
3798        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
3799        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3800
3801        let back = converter.convert_rows(&rows).unwrap();
3802        assert_eq!(back.len(), 1);
3803        back[0].to_data().validate_full().unwrap();
3804        assert_eq!(&back[0], &list);
3805
3806        // Ascending, null last
3807        let options = SortOptions::default().asc().with_nulls_first(false);
3808        let field = SortField::new_with_options(d.clone(), options);
3809        let converter = RowConverter::new(vec![field]).unwrap();
3810        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3811        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3812        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
3813        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
3814        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
3815        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
3816        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
3817        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3818
3819        let back = converter.convert_rows(&rows).unwrap();
3820        assert_eq!(back.len(), 1);
3821        back[0].to_data().validate_full().unwrap();
3822        assert_eq!(&back[0], &list);
3823
3824        // Descending, nulls last
3825        let options = SortOptions::default().desc().with_nulls_first(false);
3826        let field = SortField::new_with_options(d.clone(), options);
3827        let converter = RowConverter::new(vec![field]).unwrap();
3828        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3829        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3830        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
3831        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
3832        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
3833        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
3834        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
3835        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3836
3837        let back = converter.convert_rows(&rows).unwrap();
3838        assert_eq!(back.len(), 1);
3839        back[0].to_data().validate_full().unwrap();
3840        assert_eq!(&back[0], &list);
3841
3842        // Descending, nulls first
3843        let options = SortOptions::default().desc().with_nulls_first(true);
3844        let field = SortField::new_with_options(d, options);
3845        let converter = RowConverter::new(vec![field]).unwrap();
3846        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3847
3848        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3849        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12]
3850        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
3851        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
3852        assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null]
3853        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
3854        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3855
3856        let back = converter.convert_rows(&rows).unwrap();
3857        assert_eq!(back.len(), 1);
3858        back[0].to_data().validate_full().unwrap();
3859        assert_eq!(&back[0], &list);
3860
3861        let sliced_list = list.slice(1, 5);
3862        let rows_on_sliced_list = converter
3863            .convert_columns(&[Arc::clone(&sliced_list)])
3864            .unwrap();
3865
3866        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52, null]
3867        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null, null] < [32, 52, null]
3868        assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); // [null, null, null] > [32, 52, null]
3869        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < [null, null, null]
3870
3871        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3872        assert_eq!(back.len(), 1);
3873        back[0].to_data().validate_full().unwrap();
3874        assert_eq!(&back[0], &sliced_list);
3875    }
3876
3877    #[test]
3878    fn test_two_fixed_size_lists() {
3879        let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3880        // 0: [100]
3881        first.values().append_value(100);
3882        first.append(true);
3883        // 1: [101]
3884        first.values().append_value(101);
3885        first.append(true);
3886        // 2: [102]
3887        first.values().append_value(102);
3888        first.append(true);
3889        // 3: [null]
3890        first.values().append_null();
3891        first.append(true);
3892        // 4: null
3893        first.values().append_null(); // MASKED
3894        first.append(false);
3895        let first = Arc::new(first.finish()) as ArrayRef;
3896        let first_type = first.data_type().clone();
3897
3898        let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3899        // 0: [200]
3900        second.values().append_value(200);
3901        second.append(true);
3902        // 1: [201]
3903        second.values().append_value(201);
3904        second.append(true);
3905        // 2: [202]
3906        second.values().append_value(202);
3907        second.append(true);
3908        // 3: [null]
3909        second.values().append_null();
3910        second.append(true);
3911        // 4: null
3912        second.values().append_null(); // MASKED
3913        second.append(false);
3914        let second = Arc::new(second.finish()) as ArrayRef;
3915        let second_type = second.data_type().clone();
3916
3917        let converter = RowConverter::new(vec![
3918            SortField::new(first_type.clone()),
3919            SortField::new(second_type.clone()),
3920        ])
3921        .unwrap();
3922
3923        let rows = converter
3924            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3925            .unwrap();
3926
3927        let back = converter.convert_rows(&rows).unwrap();
3928        assert_eq!(back.len(), 2);
3929        back[0].to_data().validate_full().unwrap();
3930        assert_eq!(&back[0], &first);
3931        back[1].to_data().validate_full().unwrap();
3932        assert_eq!(&back[1], &second);
3933    }
3934
3935    #[test]
3936    fn test_fixed_size_list_with_variable_width_content() {
3937        let mut first = FixedSizeListBuilder::new(
3938            StructBuilder::from_fields(
3939                vec![
3940                    Field::new(
3941                        "timestamp",
3942                        DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3943                        false,
3944                    ),
3945                    Field::new("offset_minutes", DataType::Int16, false),
3946                    Field::new("time_zone", DataType::Utf8, false),
3947                ],
3948                1,
3949            ),
3950            1,
3951        );
3952        // 0: null
3953        first
3954            .values()
3955            .field_builder::<TimestampMicrosecondBuilder>(0)
3956            .unwrap()
3957            .append_null();
3958        first
3959            .values()
3960            .field_builder::<Int16Builder>(1)
3961            .unwrap()
3962            .append_null();
3963        first
3964            .values()
3965            .field_builder::<StringBuilder>(2)
3966            .unwrap()
3967            .append_null();
3968        first.values().append(false);
3969        first.append(false);
3970        // 1: [null]
3971        first
3972            .values()
3973            .field_builder::<TimestampMicrosecondBuilder>(0)
3974            .unwrap()
3975            .append_null();
3976        first
3977            .values()
3978            .field_builder::<Int16Builder>(1)
3979            .unwrap()
3980            .append_null();
3981        first
3982            .values()
3983            .field_builder::<StringBuilder>(2)
3984            .unwrap()
3985            .append_null();
3986        first.values().append(false);
3987        first.append(true);
3988        // 2: [1970-01-01 00:00:00.000000 UTC]
3989        first
3990            .values()
3991            .field_builder::<TimestampMicrosecondBuilder>(0)
3992            .unwrap()
3993            .append_value(0);
3994        first
3995            .values()
3996            .field_builder::<Int16Builder>(1)
3997            .unwrap()
3998            .append_value(0);
3999        first
4000            .values()
4001            .field_builder::<StringBuilder>(2)
4002            .unwrap()
4003            .append_value("UTC");
4004        first.values().append(true);
4005        first.append(true);
4006        // 3: [2005-09-10 13:30:00.123456 Europe/Warsaw]
4007        first
4008            .values()
4009            .field_builder::<TimestampMicrosecondBuilder>(0)
4010            .unwrap()
4011            .append_value(1126351800123456);
4012        first
4013            .values()
4014            .field_builder::<Int16Builder>(1)
4015            .unwrap()
4016            .append_value(120);
4017        first
4018            .values()
4019            .field_builder::<StringBuilder>(2)
4020            .unwrap()
4021            .append_value("Europe/Warsaw");
4022        first.values().append(true);
4023        first.append(true);
4024        let first = Arc::new(first.finish()) as ArrayRef;
4025        let first_type = first.data_type().clone();
4026
4027        let mut second = StringBuilder::new();
4028        second.append_value("somewhere near");
4029        second.append_null();
4030        second.append_value("Greenwich");
4031        second.append_value("Warsaw");
4032        let second = Arc::new(second.finish()) as ArrayRef;
4033        let second_type = second.data_type().clone();
4034
4035        let converter = RowConverter::new(vec![
4036            SortField::new(first_type.clone()),
4037            SortField::new(second_type.clone()),
4038        ])
4039        .unwrap();
4040
4041        let rows = converter
4042            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
4043            .unwrap();
4044
4045        let back = converter.convert_rows(&rows).unwrap();
4046        assert_eq!(back.len(), 2);
4047        back[0].to_data().validate_full().unwrap();
4048        assert_eq!(&back[0], &first);
4049        back[1].to_data().validate_full().unwrap();
4050        assert_eq!(&back[1], &second);
4051    }
4052
4053    fn generate_primitive_array<K>(
4054        rng: &mut impl RngCore,
4055        len: usize,
4056        valid_percent: f64,
4057    ) -> PrimitiveArray<K>
4058    where
4059        K: ArrowPrimitiveType,
4060        StandardUniform: Distribution<K::Native>,
4061    {
4062        (0..len)
4063            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
4064            .collect()
4065    }
4066
4067    fn generate_boolean_array(
4068        rng: &mut impl RngCore,
4069        len: usize,
4070        valid_percent: f64,
4071    ) -> BooleanArray {
4072        (0..len)
4073            .map(|_| rng.random_bool(valid_percent).then(|| rng.random_bool(0.5)))
4074            .collect()
4075    }
4076
4077    fn generate_strings<O: OffsetSizeTrait>(
4078        rng: &mut impl RngCore,
4079        len: usize,
4080        valid_percent: f64,
4081    ) -> GenericStringArray<O> {
4082        (0..len)
4083            .map(|_| {
4084                rng.random_bool(valid_percent).then(|| {
4085                    let len = rng.random_range(0..100);
4086                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
4087                    String::from_utf8(bytes).unwrap()
4088                })
4089            })
4090            .collect()
4091    }
4092
4093    fn generate_string_view(
4094        rng: &mut impl RngCore,
4095        len: usize,
4096        valid_percent: f64,
4097    ) -> StringViewArray {
4098        (0..len)
4099            .map(|_| {
4100                rng.random_bool(valid_percent).then(|| {
4101                    let len = rng.random_range(0..100);
4102                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
4103                    String::from_utf8(bytes).unwrap()
4104                })
4105            })
4106            .collect()
4107    }
4108
4109    fn generate_byte_view(
4110        rng: &mut impl RngCore,
4111        len: usize,
4112        valid_percent: f64,
4113    ) -> BinaryViewArray {
4114        (0..len)
4115            .map(|_| {
4116                rng.random_bool(valid_percent).then(|| {
4117                    let len = rng.random_range(0..100);
4118                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
4119                    bytes
4120                })
4121            })
4122            .collect()
4123    }
4124
4125    fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
4126        let edge_cases = vec![
4127            Some("bar".to_string()),
4128            Some("bar\0".to_string()),
4129            Some("LongerThan12Bytes".to_string()),
4130            Some("LongerThan12Bytez".to_string()),
4131            Some("LongerThan12Bytes\0".to_string()),
4132            Some("LongerThan12Byt".to_string()),
4133            Some("backend one".to_string()),
4134            Some("backend two".to_string()),
4135            Some("a".repeat(257)),
4136            Some("a".repeat(300)),
4137        ];
4138
4139        // Fill up to `len` by repeating edge cases and trimming
4140        let mut values = Vec::with_capacity(len);
4141        for i in 0..len {
4142            values.push(
4143                edge_cases
4144                    .get(i % edge_cases.len())
4145                    .cloned()
4146                    .unwrap_or(None),
4147            );
4148        }
4149
4150        StringViewArray::from(values)
4151    }
4152
4153    fn generate_dictionary<K>(
4154        rng: &mut impl RngCore,
4155        values: ArrayRef,
4156        len: usize,
4157        valid_percent: f64,
4158    ) -> DictionaryArray<K>
4159    where
4160        K: ArrowDictionaryKeyType,
4161        K::Native: SampleUniform,
4162    {
4163        let min_key = K::Native::from_usize(0).unwrap();
4164        let max_key = K::Native::from_usize(values.len()).unwrap();
4165        let keys: PrimitiveArray<K> = (0..len)
4166            .map(|_| {
4167                rng.random_bool(valid_percent)
4168                    .then(|| rng.random_range(min_key..max_key))
4169            })
4170            .collect();
4171
4172        let data_type =
4173            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
4174
4175        let data = keys
4176            .into_data()
4177            .into_builder()
4178            .data_type(data_type)
4179            .add_child_data(values.to_data())
4180            .build()
4181            .unwrap();
4182
4183        DictionaryArray::from(data)
4184    }
4185
4186    fn generate_fixed_size_binary(
4187        rng: &mut impl RngCore,
4188        len: usize,
4189        valid_percent: f64,
4190    ) -> FixedSizeBinaryArray {
4191        let width = rng.random_range(0..20);
4192        let mut builder = FixedSizeBinaryBuilder::new(width);
4193
4194        let mut b = vec![0; width as usize];
4195        for _ in 0..len {
4196            match rng.random_bool(valid_percent) {
4197                true => {
4198                    b.iter_mut().for_each(|x| *x = rng.random());
4199                    builder.append_value(&b).unwrap();
4200                }
4201                false => builder.append_null(),
4202            }
4203        }
4204
4205        builder.finish()
4206    }
4207
4208    fn generate_struct(rng: &mut impl RngCore, len: usize, valid_percent: f64) -> StructArray {
4209        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
4210        let a = generate_primitive_array::<Int32Type>(rng, len, valid_percent);
4211        let b = generate_strings::<i32>(rng, len, valid_percent);
4212        let fields = Fields::from(vec![
4213            Field::new("a", DataType::Int32, true),
4214            Field::new("b", DataType::Utf8, true),
4215        ]);
4216        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
4217        StructArray::new(fields, values, Some(nulls))
4218    }
4219
4220    fn generate_list<R: RngCore, F>(
4221        rng: &mut R,
4222        len: usize,
4223        valid_percent: f64,
4224        values: F,
4225    ) -> ListArray
4226    where
4227        F: FnOnce(&mut R, usize) -> ArrayRef,
4228    {
4229        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
4230        let values_len = offsets.last().unwrap().to_usize().unwrap();
4231        let values = values(rng, values_len);
4232        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
4233        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
4234        ListArray::new(field, offsets, values, Some(nulls))
4235    }
4236
4237    fn generate_list_view<F>(
4238        rng: &mut impl RngCore,
4239        len: usize,
4240        valid_percent: f64,
4241        values: F,
4242    ) -> ListViewArray
4243    where
4244        F: FnOnce(usize) -> ArrayRef,
4245    {
4246        // Generate sizes first, then create a values array large enough
4247        let sizes: Vec<i32> = (0..len).map(|_| rng.random_range(0..10)).collect();
4248        let values_len: usize = sizes.iter().map(|s| *s as usize).sum::<usize>().max(1);
4249        let values = values(values_len);
4250
4251        // Generate offsets that can overlap, be non-monotonic, or share ranges
4252        let offsets: Vec<i32> = sizes
4253            .iter()
4254            .map(|&size| {
4255                if size == 0 {
4256                    0
4257                } else {
4258                    rng.random_range(0..=(values_len as i32 - size))
4259                }
4260            })
4261            .collect();
4262
4263        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
4264        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
4265        ListViewArray::new(
4266            field,
4267            ScalarBuffer::from(offsets),
4268            ScalarBuffer::from(sizes),
4269            values,
4270            Some(nulls),
4271        )
4272    }
4273
4274    fn generate_nulls(rng: &mut impl RngCore, len: usize) -> Option<NullBuffer> {
4275        Some(NullBuffer::from_iter(
4276            (0..len).map(|_| rng.random_bool(0.8)),
4277        ))
4278    }
4279
4280    fn change_underlying_null_values_for_primitive<T: ArrowPrimitiveType>(
4281        array: &PrimitiveArray<T>,
4282    ) -> PrimitiveArray<T> {
4283        let (dt, values, nulls) = array.clone().into_parts();
4284
4285        let new_values = ScalarBuffer::<T::Native>::from_iter(
4286            values
4287                .iter()
4288                .zip(nulls.as_ref().unwrap().iter())
4289                .map(|(val, is_valid)| {
4290                    if is_valid {
4291                        *val
4292                    } else {
4293                        val.add_wrapping(T::Native::usize_as(1))
4294                    }
4295                }),
4296        );
4297
4298        PrimitiveArray::new(new_values, nulls).with_data_type(dt)
4299    }
4300
4301    fn change_underline_null_values_for_byte_array<T: ByteArrayType>(
4302        array: &GenericByteArray<T>,
4303    ) -> GenericByteArray<T> {
4304        let (offsets, values, nulls) = array.clone().into_parts();
4305
4306        let new_offsets = OffsetBuffer::<T::Offset>::from_lengths(
4307            offsets
4308                .lengths()
4309                .zip(nulls.as_ref().unwrap().iter())
4310                .map(|(len, is_valid)| if is_valid { len } else { len + 1 }),
4311        );
4312
4313        let mut new_bytes = Vec::<u8>::with_capacity(new_offsets[new_offsets.len() - 1].as_usize());
4314
4315        offsets
4316            .windows(2)
4317            .zip(nulls.as_ref().unwrap().iter())
4318            .for_each(|(start_and_end, is_valid)| {
4319                let start = start_and_end[0].as_usize();
4320                let end = start_and_end[1].as_usize();
4321                new_bytes.extend_from_slice(&values.as_slice()[start..end]);
4322
4323                // add an extra byte
4324                if !is_valid {
4325                    new_bytes.push(b'c');
4326                }
4327            });
4328
4329        GenericByteArray::<T>::new(new_offsets, Buffer::from_vec(new_bytes), nulls)
4330    }
4331
4332    fn change_underline_null_values_for_list_array<O: OffsetSizeTrait>(
4333        array: &GenericListArray<O>,
4334    ) -> GenericListArray<O> {
4335        let (field, offsets, values, nulls) = array.clone().into_parts();
4336
4337        let (new_values, new_offsets) = {
4338            let concat_values = offsets
4339                .windows(2)
4340                .zip(nulls.as_ref().unwrap().iter())
4341                .map(|(start_and_end, is_valid)| {
4342                    let start = start_and_end[0].as_usize();
4343                    let end = start_and_end[1].as_usize();
4344                    if is_valid {
4345                        return (start, end - start);
4346                    }
4347
4348                    // If reached end, we take one less
4349                    if end == values.len() {
4350                        (start, (end - start).saturating_sub(1))
4351                    } else {
4352                        (start, end - start + 1)
4353                    }
4354                })
4355                .map(|(start, length)| values.slice(start, length))
4356                .collect::<Vec<_>>();
4357
4358            let new_offsets =
4359                OffsetBuffer::<O>::from_lengths(concat_values.iter().map(|s| s.len()));
4360
4361            let new_values = {
4362                let values = concat_values.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
4363                arrow_select::concat::concat(&values).expect("should be able to concat")
4364            };
4365
4366            (new_values, new_offsets)
4367        };
4368
4369        GenericListArray::<O>::new(field, new_offsets, new_values, nulls)
4370    }
4371
4372    fn change_underline_null_values(array: &ArrayRef) -> ArrayRef {
4373        if array.null_count() == 0 {
4374            return Arc::clone(array);
4375        }
4376
4377        downcast_primitive_array!(
4378            array => {
4379                let output = change_underlying_null_values_for_primitive(array);
4380
4381                Arc::new(output)
4382            }
4383
4384            DataType::Utf8 => {
4385                Arc::new(change_underline_null_values_for_byte_array(array.as_string::<i32>()))
4386            }
4387            DataType::LargeUtf8 => {
4388                Arc::new(change_underline_null_values_for_byte_array(array.as_string::<i64>()))
4389            }
4390            DataType::Binary => {
4391                Arc::new(change_underline_null_values_for_byte_array(array.as_binary::<i32>()))
4392            }
4393            DataType::LargeBinary => {
4394                Arc::new(change_underline_null_values_for_byte_array(array.as_binary::<i64>()))
4395            }
4396            DataType::List(_) => {
4397                Arc::new(change_underline_null_values_for_list_array(array.as_list::<i32>()))
4398            }
4399            DataType::LargeList(_) => {
4400                Arc::new(change_underline_null_values_for_list_array(array.as_list::<i64>()))
4401            }
4402            _ => {
4403                Arc::clone(array)
4404            }
4405        )
4406    }
4407
4408    fn generate_column(rng: &mut (impl RngCore + Clone), len: usize) -> ArrayRef {
4409        match rng.random_range(0..23) {
4410            0 => Arc::new(generate_primitive_array::<Int32Type>(rng, len, 0.8)),
4411            1 => Arc::new(generate_primitive_array::<UInt32Type>(rng, len, 0.8)),
4412            2 => Arc::new(generate_primitive_array::<Int64Type>(rng, len, 0.8)),
4413            3 => Arc::new(generate_primitive_array::<UInt64Type>(rng, len, 0.8)),
4414            4 => Arc::new(generate_primitive_array::<Float32Type>(rng, len, 0.8)),
4415            5 => Arc::new(generate_primitive_array::<Float64Type>(rng, len, 0.8)),
4416            6 => Arc::new(generate_strings::<i32>(rng, len, 0.8)),
4417            7 => {
4418                let dict_values_len = rng.random_range(1..len);
4419                // Cannot test dictionaries containing null values because of #2687
4420                let strings = Arc::new(generate_strings::<i32>(rng, dict_values_len, 1.0));
4421                Arc::new(generate_dictionary::<Int64Type>(rng, strings, len, 0.8))
4422            }
4423            8 => {
4424                let dict_values_len = rng.random_range(1..len);
4425                // Cannot test dictionaries containing null values because of #2687
4426                let values = Arc::new(generate_primitive_array::<Int64Type>(
4427                    rng,
4428                    dict_values_len,
4429                    1.0,
4430                ));
4431                Arc::new(generate_dictionary::<Int64Type>(rng, values, len, 0.8))
4432            }
4433            9 => Arc::new(generate_fixed_size_binary(rng, len, 0.8)),
4434            10 => Arc::new(generate_struct(rng, len, 0.8)),
4435            11 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
4436                Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
4437            })),
4438            12 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
4439                Arc::new(generate_strings::<i32>(rng, values_len, 0.8))
4440            })),
4441            13 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
4442                Arc::new(generate_struct(rng, values_len, 0.8))
4443            })),
4444            14 => Arc::new(generate_string_view(rng, len, 0.8)),
4445            15 => Arc::new(generate_byte_view(rng, len, 0.8)),
4446            16 => Arc::new(generate_fixed_stringview_column(len)),
4447            17 => Arc::new(
4448                generate_list(&mut rng.clone(), len + 1000, 0.8, |rng, values_len| {
4449                    Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
4450                })
4451                .slice(500, len),
4452            ),
4453            18 => Arc::new(generate_boolean_array(rng, len, 0.8)),
4454            19 => Arc::new(generate_list_view(
4455                &mut rng.clone(),
4456                len,
4457                0.8,
4458                |values_len| Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8)),
4459            )),
4460            20 => Arc::new(generate_list_view(
4461                &mut rng.clone(),
4462                len,
4463                0.8,
4464                |values_len| Arc::new(generate_strings::<i32>(rng, values_len, 0.8)),
4465            )),
4466            21 => Arc::new(generate_list_view(
4467                &mut rng.clone(),
4468                len,
4469                0.8,
4470                |values_len| Arc::new(generate_struct(rng, values_len, 0.8)),
4471            )),
4472            22 => Arc::new(
4473                generate_list_view(&mut rng.clone(), len + 1000, 0.8, |values_len| {
4474                    Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
4475                })
4476                .slice(500, len),
4477            ),
4478            _ => unreachable!(),
4479        }
4480    }
4481
4482    fn print_row(cols: &[SortColumn], row: usize) -> String {
4483        let t: Vec<_> = cols
4484            .iter()
4485            .map(|x| match x.values.is_valid(row) {
4486                true => {
4487                    let opts = FormatOptions::default().with_null("NULL");
4488                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
4489                    formatter.value(row).to_string()
4490                }
4491                false => "NULL".to_string(),
4492            })
4493            .collect();
4494        t.join(",")
4495    }
4496
4497    fn print_col_types(cols: &[SortColumn]) -> String {
4498        let t: Vec<_> = cols
4499            .iter()
4500            .map(|x| x.values.data_type().to_string())
4501            .collect();
4502        t.join(",")
4503    }
4504
4505    #[derive(Debug, PartialEq)]
4506    enum Nulls {
4507        /// Keep the generated array as is
4508        AsIs,
4509
4510        /// Replace the null buffer with different null buffer to point to different positions as null
4511        Different,
4512
4513        /// Remove all nulls
4514        None,
4515    }
4516
4517    #[test]
4518    #[cfg_attr(miri, ignore)]
4519    fn fuzz_test() {
4520        let mut rng = StdRng::seed_from_u64(42);
4521        for _ in 0..100 {
4522            for null_behavior in [Nulls::AsIs, Nulls::Different, Nulls::None] {
4523                let num_columns = rng.random_range(1..5);
4524                let len = rng.random_range(5..100);
4525                let mut arrays: Vec<_> = (0..num_columns)
4526                    .map(|_| generate_column(&mut rng, len))
4527                    .collect();
4528
4529                match null_behavior {
4530                    Nulls::AsIs => {
4531                        // Keep as is
4532                    }
4533                    Nulls::Different => {
4534                        // Replace nulls with different nulls to allow for testing different underlying null values
4535                        arrays = arrays
4536                            .into_iter()
4537                            .map(|a| replace_array_nulls(a, generate_nulls(&mut rng, len)))
4538                            .collect()
4539                    }
4540                    Nulls::None => {
4541                        // Remove nulls
4542                        arrays = arrays
4543                            .into_iter()
4544                            .map(|a| replace_array_nulls(a, None))
4545                            .collect()
4546                    }
4547                }
4548
4549                let options: Vec<_> = (0..num_columns)
4550                    .map(|_| SortOptions {
4551                        descending: rng.random_bool(0.5),
4552                        nulls_first: rng.random_bool(0.5),
4553                    })
4554                    .collect();
4555
4556                let sort_columns: Vec<_> = options
4557                    .iter()
4558                    .zip(&arrays)
4559                    .map(|(o, c)| SortColumn {
4560                        values: Arc::clone(c),
4561                        options: Some(*o),
4562                    })
4563                    .collect();
4564
4565                let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
4566
4567                let columns: Vec<SortField> = options
4568                    .into_iter()
4569                    .zip(&arrays)
4570                    .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
4571                    .collect();
4572
4573                let converter = RowConverter::new(columns).unwrap();
4574                let rows = converter.convert_columns(&arrays).unwrap();
4575
4576                // Assert that the underlying null values are not taken into account when converting
4577                // even for different inputs
4578                if !matches!(null_behavior, Nulls::None) {
4579                    assert_same_rows_when_changing_input_underlying_null_values(
4580                        &arrays, &converter, &rows,
4581                    );
4582                }
4583
4584                for i in 0..len {
4585                    for j in 0..len {
4586                        let row_i = rows.row(i);
4587                        let row_j = rows.row(j);
4588                        let row_cmp = row_i.cmp(&row_j);
4589                        let lex_cmp = comparator.compare(i, j);
4590                        assert_eq!(
4591                            row_cmp,
4592                            lex_cmp,
4593                            "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
4594                            print_row(&sort_columns, i),
4595                            print_row(&sort_columns, j),
4596                            row_i,
4597                            row_j,
4598                            print_col_types(&sort_columns)
4599                        );
4600                    }
4601                }
4602
4603                // Validate rows length iterator
4604                {
4605                    let mut rows_iter = rows.iter();
4606                    let mut rows_lengths_iter = rows.lengths();
4607                    for (index, row) in rows_iter.by_ref().enumerate() {
4608                        let len = rows_lengths_iter
4609                            .next()
4610                            .expect("Reached end of length iterator while still have rows");
4611                        assert_eq!(
4612                            row.data.len(),
4613                            len,
4614                            "Row length mismatch: {} vs {}",
4615                            row.data.len(),
4616                            len
4617                        );
4618                        assert_eq!(
4619                            len,
4620                            rows.row_len(index),
4621                            "Row length mismatch at index {}: {} vs {}",
4622                            index,
4623                            len,
4624                            rows.row_len(index)
4625                        );
4626                    }
4627
4628                    assert_eq!(
4629                        rows_lengths_iter.next(),
4630                        None,
4631                        "Length iterator did not reach end"
4632                    );
4633                }
4634
4635                // Convert rows produced from convert_columns().
4636                // Note: validate_utf8 is set to false since Row is initialized through empty_rows()
4637                let back = converter.convert_rows(&rows).unwrap();
4638                for (actual, expected) in back.iter().zip(&arrays) {
4639                    actual.to_data().validate_full().unwrap();
4640                    dictionary_eq(actual, expected)
4641                }
4642
4643                // Check that we can convert rows into ByteArray and then parse, convert it back to array
4644                // Note: validate_utf8 is set to true since Row is initialized through RowParser
4645                let rows = rows.try_into_binary().expect("reasonable size");
4646                let parser = converter.parser();
4647                let back = converter
4648                    .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
4649                    .unwrap();
4650                for (actual, expected) in back.iter().zip(&arrays) {
4651                    actual.to_data().validate_full().unwrap();
4652                    dictionary_eq(actual, expected)
4653                }
4654
4655                let rows = converter.from_binary(rows);
4656                let back = converter.convert_rows(&rows).unwrap();
4657                for (actual, expected) in back.iter().zip(&arrays) {
4658                    actual.to_data().validate_full().unwrap();
4659                    dictionary_eq(actual, expected)
4660                }
4661            }
4662        }
4663    }
4664
4665    fn replace_array_nulls(array: ArrayRef, new_nulls: Option<NullBuffer>) -> ArrayRef {
4666        make_array(
4667            array
4668                .into_data()
4669                .into_builder()
4670                // Replace the nulls
4671                .nulls(new_nulls)
4672                .build()
4673                .unwrap(),
4674        )
4675    }
4676
4677    fn assert_same_rows_when_changing_input_underlying_null_values(
4678        arrays: &[ArrayRef],
4679        converter: &RowConverter,
4680        rows: &Rows,
4681    ) {
4682        let arrays_with_different_data_behind_nulls = arrays
4683            .iter()
4684            .map(|arr| change_underline_null_values(arr))
4685            .collect::<Vec<_>>();
4686
4687        // Skip assertion if we did not change anything
4688        if arrays
4689            .iter()
4690            .zip(arrays_with_different_data_behind_nulls.iter())
4691            .all(|(a, b)| Arc::ptr_eq(a, b))
4692        {
4693            return;
4694        }
4695
4696        let rows_with_different_nulls = converter
4697            .convert_columns(&arrays_with_different_data_behind_nulls)
4698            .unwrap();
4699
4700        assert_eq!(
4701            rows.iter().collect::<Vec<_>>(),
4702            rows_with_different_nulls.iter().collect::<Vec<_>>(),
4703            "Different underlying nulls should not output different rows"
4704        )
4705    }
4706
4707    #[test]
4708    fn test_clear() {
4709        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
4710        let mut rows = converter.empty_rows(3, 128);
4711
4712        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
4713        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
4714        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
4715
4716        for array in arrays.iter() {
4717            rows.clear();
4718            converter
4719                .append(&mut rows, std::slice::from_ref(array))
4720                .unwrap();
4721            let back = converter.convert_rows(&rows).unwrap();
4722            assert_eq!(&back[0], array);
4723        }
4724
4725        let mut rows_expected = converter.empty_rows(3, 128);
4726        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
4727
4728        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
4729            assert_eq!(
4730                actual, expected,
4731                "For row {i}: expected {expected:?}, actual: {actual:?}",
4732            );
4733        }
4734    }
4735
4736    #[test]
4737    fn test_append_codec_dictionary_binary() {
4738        use DataType::*;
4739        // Dictionary RowConverter
4740        let converter = RowConverter::new(vec![SortField::new(Dictionary(
4741            Box::new(Int32),
4742            Box::new(Binary),
4743        ))])
4744        .unwrap();
4745        let mut rows = converter.empty_rows(4, 128);
4746
4747        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
4748        let values = BinaryArray::from(vec![
4749            Some("a".as_bytes()),
4750            Some(b"b"),
4751            Some(b"c"),
4752            Some(b"d"),
4753        ]);
4754        let dict_array = DictionaryArray::new(keys, Arc::new(values));
4755
4756        rows.clear();
4757        let array = Arc::new(dict_array) as ArrayRef;
4758        converter
4759            .append(&mut rows, std::slice::from_ref(&array))
4760            .unwrap();
4761        let back = converter.convert_rows(&rows).unwrap();
4762
4763        dictionary_eq(&back[0], &array);
4764    }
4765
4766    #[test]
4767    fn test_list_prefix() {
4768        let mut a = ListBuilder::new(Int8Builder::new());
4769        a.append_value([None]);
4770        a.append_value([None, None]);
4771        let a = a.finish();
4772
4773        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
4774        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
4775        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
4776    }
4777
4778    #[test]
4779    fn map_should_be_marked_as_unsupported() {
4780        let map_data_type = Field::new_map(
4781            "map",
4782            "entries",
4783            Field::new("key", DataType::Utf8, false),
4784            Field::new("value", DataType::Utf8, true),
4785            false,
4786            true,
4787        )
4788        .data_type()
4789        .clone();
4790
4791        let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
4792
4793        assert!(!is_supported, "Map should not be supported");
4794    }
4795
4796    #[test]
4797    fn should_fail_to_create_row_converter_for_unsupported_map_type() {
4798        let map_data_type = Field::new_map(
4799            "map",
4800            "entries",
4801            Field::new("key", DataType::Utf8, false),
4802            Field::new("value", DataType::Utf8, true),
4803            false,
4804            true,
4805        )
4806        .data_type()
4807        .clone();
4808
4809        let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
4810
4811        match converter {
4812            Err(ArrowError::NotYetImplemented(message)) => {
4813                assert!(
4814                    message.contains("Row format support not yet implemented for"),
4815                    "Expected NotYetImplemented error for map data type, got: {message}",
4816                );
4817            }
4818            Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
4819            Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
4820        }
4821    }
4822
4823    #[test]
4824    fn test_values_buffer_smaller_when_utf8_validation_disabled() {
4825        fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
4826            // 1. Convert cols into rows
4827            let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
4828
4829            // 2a. Convert rows into colsa (validate_utf8 = false)
4830            let rows = converter.convert_columns(&[col]).unwrap();
4831            let converted = converter.convert_rows(&rows).unwrap();
4832            let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
4833
4834            // 2b. Convert rows into cols (validate_utf8 = true since Row is initialized through RowParser)
4835            let rows = rows.try_into_binary().expect("reasonable size");
4836            let parser = converter.parser();
4837            let converted = converter
4838                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
4839                .unwrap();
4840            let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
4841            (unchecked_values_len, checked_values_len)
4842        }
4843
4844        // Case1. StringViewArray with inline strings
4845        let col = Arc::new(StringViewArray::from_iter([
4846            Some("hello"), // short(5)
4847            None,          // null
4848            Some("short"), // short(5)
4849            Some("tiny"),  // short(4)
4850        ])) as ArrayRef;
4851
4852        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
4853        // Since there are no long (>12) strings, len of values buffer is 0
4854        assert_eq!(unchecked_values_len, 0);
4855        // When utf8 validation enabled, values buffer includes inline strings (5+5+4)
4856        assert_eq!(checked_values_len, 14);
4857
4858        // Case2. StringViewArray with long(>12) strings
4859        let col = Arc::new(StringViewArray::from_iter([
4860            Some("this is a very long string over 12 bytes"),
4861            Some("another long string to test the buffer"),
4862        ])) as ArrayRef;
4863
4864        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
4865        // Since there are no inline strings, expected length of values buffer is the same
4866        assert!(unchecked_values_len > 0);
4867        assert_eq!(unchecked_values_len, checked_values_len);
4868
4869        // Case3. StringViewArray with both short and long strings
4870        let col = Arc::new(StringViewArray::from_iter([
4871            Some("tiny"),          // 4 (short)
4872            Some("thisisexact13"), // 13 (long)
4873            None,
4874            Some("short"), // 5 (short)
4875        ])) as ArrayRef;
4876
4877        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
4878        // Since there is single long string, len of values buffer is 13
4879        assert_eq!(unchecked_values_len, 13);
4880        assert!(checked_values_len > unchecked_values_len);
4881    }
4882
4883    #[test]
4884    fn test_sparse_union() {
4885        // create a sparse union with Int32 (type_id = 0) and Utf8 (type_id = 1)
4886        let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
4887        let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
4888
4889        // [1, "b", 3, "d", 5]
4890        let type_ids = vec![0, 1, 0, 1, 0].into();
4891
4892        let union_fields = [
4893            (0, Arc::new(Field::new("int", DataType::Int32, false))),
4894            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4895        ]
4896        .into_iter()
4897        .collect();
4898
4899        let union_array = UnionArray::try_new(
4900            union_fields,
4901            type_ids,
4902            None,
4903            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4904        )
4905        .unwrap();
4906
4907        let union_type = union_array.data_type().clone();
4908        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4909
4910        let rows = converter
4911            .convert_columns(&[Arc::new(union_array.clone())])
4912            .unwrap();
4913
4914        // round trip
4915        let back = converter.convert_rows(&rows).unwrap();
4916        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4917
4918        assert_eq!(union_array.len(), back_union.len());
4919        for i in 0..union_array.len() {
4920            assert_eq!(union_array.type_id(i), back_union.type_id(i));
4921        }
4922    }
4923
4924    #[test]
4925    fn test_sparse_union_with_nulls() {
4926        // create a sparse union with Int32 (type_id = 0) and Utf8 (type_id = 1)
4927        let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
4928        let str_array = StringArray::from(vec![None::<&str>; 5]);
4929
4930        // [1, null (both children null), 3, null (both children null), 5]
4931        let type_ids = vec![0, 1, 0, 1, 0].into();
4932
4933        let union_fields = [
4934            (0, Arc::new(Field::new("int", DataType::Int32, true))),
4935            (1, Arc::new(Field::new("str", DataType::Utf8, true))),
4936        ]
4937        .into_iter()
4938        .collect();
4939
4940        let union_array = UnionArray::try_new(
4941            union_fields,
4942            type_ids,
4943            None,
4944            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4945        )
4946        .unwrap();
4947
4948        let union_type = union_array.data_type().clone();
4949        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4950
4951        let rows = converter
4952            .convert_columns(&[Arc::new(union_array.clone())])
4953            .unwrap();
4954
4955        // round trip
4956        let back = converter.convert_rows(&rows).unwrap();
4957        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4958
4959        assert_eq!(union_array.len(), back_union.len());
4960        for i in 0..union_array.len() {
4961            let expected_null = union_array.is_null(i);
4962            let actual_null = back_union.is_null(i);
4963            assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
4964            if !expected_null {
4965                assert_eq!(union_array.type_id(i), back_union.type_id(i));
4966            }
4967        }
4968    }
4969
4970    #[test]
4971    fn test_dense_union() {
4972        // create a dense union with Int32 (type_id = 0) and use Utf8 (type_id = 1)
4973        let int_array = Int32Array::from(vec![1, 3, 5]);
4974        let str_array = StringArray::from(vec!["a", "b"]);
4975
4976        let type_ids = vec![0, 1, 0, 1, 0].into();
4977
4978        // [1, "a", 3, "b", 5]
4979        let offsets = vec![0, 0, 1, 1, 2].into();
4980
4981        let union_fields = [
4982            (0, Arc::new(Field::new("int", DataType::Int32, false))),
4983            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4984        ]
4985        .into_iter()
4986        .collect();
4987
4988        let union_array = UnionArray::try_new(
4989            union_fields,
4990            type_ids,
4991            Some(offsets), // Dense mode
4992            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4993        )
4994        .unwrap();
4995
4996        let union_type = union_array.data_type().clone();
4997        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4998
4999        let rows = converter
5000            .convert_columns(&[Arc::new(union_array.clone())])
5001            .unwrap();
5002
5003        // round trip
5004        let back = converter.convert_rows(&rows).unwrap();
5005        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
5006
5007        assert_eq!(union_array.len(), back_union.len());
5008        for i in 0..union_array.len() {
5009            assert_eq!(union_array.type_id(i), back_union.type_id(i));
5010        }
5011    }
5012
5013    #[test]
5014    fn test_dense_union_with_nulls() {
5015        // create a dense union with Int32 (type_id = 0) and Utf8 (type_id = 1)
5016        let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
5017        let str_array = StringArray::from(vec![Some("a"), None]);
5018
5019        // [1, "a", 5, null (str null), null (int null)]
5020        let type_ids = vec![0, 1, 0, 1, 0].into();
5021        let offsets = vec![0, 0, 1, 1, 2].into();
5022
5023        let union_fields = [
5024            (0, Arc::new(Field::new("int", DataType::Int32, true))),
5025            (1, Arc::new(Field::new("str", DataType::Utf8, true))),
5026        ]
5027        .into_iter()
5028        .collect();
5029
5030        let union_array = UnionArray::try_new(
5031            union_fields,
5032            type_ids,
5033            Some(offsets),
5034            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
5035        )
5036        .unwrap();
5037
5038        let union_type = union_array.data_type().clone();
5039        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
5040
5041        let rows = converter
5042            .convert_columns(&[Arc::new(union_array.clone())])
5043            .unwrap();
5044
5045        // round trip
5046        let back = converter.convert_rows(&rows).unwrap();
5047        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
5048
5049        assert_eq!(union_array.len(), back_union.len());
5050        for i in 0..union_array.len() {
5051            let expected_null = union_array.is_null(i);
5052            let actual_null = back_union.is_null(i);
5053            assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
5054            if !expected_null {
5055                assert_eq!(union_array.type_id(i), back_union.type_id(i));
5056            }
5057        }
5058    }
5059
5060    #[test]
5061    fn test_union_ordering() {
5062        let int_array = Int32Array::from(vec![100, 5, 20]);
5063        let str_array = StringArray::from(vec!["z", "a"]);
5064
5065        // [100, "z", 5, "a", 20]
5066        let type_ids = vec![0, 1, 0, 1, 0].into();
5067        let offsets = vec![0, 0, 1, 1, 2].into();
5068
5069        let union_fields = [
5070            (0, Arc::new(Field::new("int", DataType::Int32, false))),
5071            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
5072        ]
5073        .into_iter()
5074        .collect();
5075
5076        let union_array = UnionArray::try_new(
5077            union_fields,
5078            type_ids,
5079            Some(offsets),
5080            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
5081        )
5082        .unwrap();
5083
5084        let union_type = union_array.data_type().clone();
5085        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
5086
5087        let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
5088
5089        /*
5090        expected ordering
5091
5092        row 2: 5    - type_id 0
5093        row 4: 20   - type_id 0
5094        row 0: 100  - type id 0
5095        row 3: "a"  - type id 1
5096        row 1: "z"  - type id 1
5097        */
5098
5099        // 5 < "z"
5100        assert!(rows.row(2) < rows.row(1));
5101
5102        // 100 < "a"
5103        assert!(rows.row(0) < rows.row(3));
5104
5105        // among ints
5106        // 5 < 20
5107        assert!(rows.row(2) < rows.row(4));
5108        // 20 < 100
5109        assert!(rows.row(4) < rows.row(0));
5110
5111        // among strigns
5112        // "a" < "z"
5113        assert!(rows.row(3) < rows.row(1));
5114    }
5115
5116    #[test]
5117    fn test_row_converter_roundtrip_with_many_union_columns() {
5118        // col 1: Union(Int32, Utf8) [67, "hello"]
5119        let fields1 = UnionFields::try_new(
5120            vec![0, 1],
5121            vec![
5122                Field::new("int", DataType::Int32, true),
5123                Field::new("string", DataType::Utf8, true),
5124            ],
5125        )
5126        .unwrap();
5127
5128        let int_array1 = Int32Array::from(vec![Some(67), None]);
5129        let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
5130        let type_ids1 = vec![0i8, 1].into();
5131
5132        let union_array1 = UnionArray::try_new(
5133            fields1.clone(),
5134            type_ids1,
5135            None,
5136            vec![
5137                Arc::new(int_array1) as ArrayRef,
5138                Arc::new(string_array1) as ArrayRef,
5139            ],
5140        )
5141        .unwrap();
5142
5143        // col 2: Union(Int32, Utf8) [100, "world"]
5144        let fields2 = UnionFields::try_new(
5145            vec![0, 1],
5146            vec![
5147                Field::new("int", DataType::Int32, true),
5148                Field::new("string", DataType::Utf8, true),
5149            ],
5150        )
5151        .unwrap();
5152
5153        let int_array2 = Int32Array::from(vec![Some(100), None]);
5154        let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
5155        let type_ids2 = vec![0i8, 1].into();
5156
5157        let union_array2 = UnionArray::try_new(
5158            fields2.clone(),
5159            type_ids2,
5160            None,
5161            vec![
5162                Arc::new(int_array2) as ArrayRef,
5163                Arc::new(string_array2) as ArrayRef,
5164            ],
5165        )
5166        .unwrap();
5167
5168        // create a row converter with 2 union columns
5169        let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
5170        let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
5171
5172        let sort_field1 = SortField::new(field1.data_type().clone());
5173        let sort_field2 = SortField::new(field2.data_type().clone());
5174
5175        let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
5176
5177        let rows = converter
5178            .convert_columns(&[
5179                Arc::new(union_array1.clone()) as ArrayRef,
5180                Arc::new(union_array2.clone()) as ArrayRef,
5181            ])
5182            .unwrap();
5183
5184        // roundtrip
5185        let out = converter.convert_rows(&rows).unwrap();
5186
5187        let [col1, col2] = out.as_slice() else {
5188            panic!("expected 2 columns")
5189        };
5190
5191        let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
5192        let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
5193
5194        for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
5195            assert_eq!(expected.len(), got.len());
5196            assert_eq!(expected.type_ids(), got.type_ids());
5197
5198            for i in 0..expected.len() {
5199                assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
5200            }
5201        }
5202    }
5203
5204    #[test]
5205    fn test_row_converter_roundtrip_with_one_union_column() {
5206        let fields = UnionFields::try_new(
5207            vec![0, 1],
5208            vec![
5209                Field::new("int", DataType::Int32, true),
5210                Field::new("string", DataType::Utf8, true),
5211            ],
5212        )
5213        .unwrap();
5214
5215        let int_array = Int32Array::from(vec![Some(67), None]);
5216        let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
5217        let type_ids = vec![0i8, 1].into();
5218
5219        let union_array = UnionArray::try_new(
5220            fields.clone(),
5221            type_ids,
5222            None,
5223            vec![
5224                Arc::new(int_array) as ArrayRef,
5225                Arc::new(string_array) as ArrayRef,
5226            ],
5227        )
5228        .unwrap();
5229
5230        let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
5231        let sort_field = SortField::new(field.data_type().clone());
5232        let converter = RowConverter::new(vec![sort_field]).unwrap();
5233
5234        let rows = converter
5235            .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
5236            .unwrap();
5237
5238        // roundtrip
5239        let out = converter.convert_rows(&rows).unwrap();
5240
5241        let [col1] = out.as_slice() else {
5242            panic!("expected 1 column")
5243        };
5244
5245        let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
5246        assert_eq!(col.len(), union_array.len());
5247        assert_eq!(col.type_ids(), union_array.type_ids());
5248
5249        for i in 0..col.len() {
5250            assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
5251        }
5252    }
5253
5254    #[test]
5255    fn test_row_converter_roundtrip_with_non_default_union_type_ids() {
5256        // test with non-sequential type IDs (70, 85) instead of (0, 1)
5257        let fields = UnionFields::try_new(
5258            vec![70, 85],
5259            vec![
5260                Field::new("int", DataType::Int32, true),
5261                Field::new("string", DataType::Utf8, true),
5262            ],
5263        )
5264        .unwrap();
5265
5266        let int_array = Int32Array::from(vec![Some(67), None]);
5267        let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
5268        let type_ids = vec![70i8, 85].into();
5269
5270        let union_array = UnionArray::try_new(
5271            fields.clone(),
5272            type_ids,
5273            None,
5274            vec![
5275                Arc::new(int_array) as ArrayRef,
5276                Arc::new(string_array) as ArrayRef,
5277            ],
5278        )
5279        .unwrap();
5280
5281        let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
5282        let sort_field = SortField::new(field.data_type().clone());
5283        let converter = RowConverter::new(vec![sort_field]).unwrap();
5284
5285        let rows = converter
5286            .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
5287            .unwrap();
5288
5289        // roundtrip
5290        let out = converter.convert_rows(&rows).unwrap();
5291
5292        let [col1] = out.as_slice() else {
5293            panic!("expected 1 column")
5294        };
5295
5296        let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
5297        assert_eq!(col.len(), union_array.len());
5298        assert_eq!(col.type_ids(), union_array.type_ids());
5299
5300        for i in 0..col.len() {
5301            assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
5302        }
5303    }
5304
5305    #[test]
5306    fn rows_size_should_count_for_capacity() {
5307        let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
5308
5309        let empty_rows_size_with_preallocate_rows_and_data = {
5310            let rows = row_converter.empty_rows(1000, 1000);
5311
5312            rows.size()
5313        };
5314        let empty_rows_size_with_preallocate_rows = {
5315            let rows = row_converter.empty_rows(1000, 0);
5316
5317            rows.size()
5318        };
5319        let empty_rows_size_with_preallocate_data = {
5320            let rows = row_converter.empty_rows(0, 1000);
5321
5322            rows.size()
5323        };
5324        let empty_rows_size_without_preallocate = {
5325            let rows = row_converter.empty_rows(0, 0);
5326
5327            rows.size()
5328        };
5329
5330        assert!(
5331            empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_rows,
5332            "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_rows}"
5333        );
5334        assert!(
5335            empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_data,
5336            "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_data}"
5337        );
5338        assert!(
5339            empty_rows_size_with_preallocate_rows > empty_rows_size_without_preallocate,
5340            "{empty_rows_size_with_preallocate_rows} should be larger than {empty_rows_size_without_preallocate}"
5341        );
5342        assert!(
5343            empty_rows_size_with_preallocate_data > empty_rows_size_without_preallocate,
5344            "{empty_rows_size_with_preallocate_data} should be larger than {empty_rows_size_without_preallocate}"
5345        );
5346    }
5347
5348    #[test]
5349    fn test_struct_no_child_fields() {
5350        fn run_test(array: ArrayRef) {
5351            let sort_fields = vec![SortField::new(array.data_type().clone())];
5352            let converter = RowConverter::new(sort_fields).unwrap();
5353            let r = converter.convert_columns(&[Arc::clone(&array)]).unwrap();
5354
5355            let back = converter.convert_rows(&r).unwrap();
5356            assert_eq!(back.len(), 1);
5357            assert_eq!(&back[0], &array);
5358        }
5359
5360        let s = Arc::new(StructArray::new_empty_fields(5, None)) as ArrayRef;
5361        run_test(s);
5362
5363        let s = Arc::new(StructArray::new_empty_fields(
5364            5,
5365            Some(vec![true, false, true, false, false].into()),
5366        )) as ArrayRef;
5367        run_test(s);
5368    }
5369
5370    #[test]
5371    fn reserve_should_increase_capacity_to_the_requested_size() {
5372        let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
5373        let mut empty_rows = row_converter.empty_rows(0, 0);
5374        empty_rows.reserve(50, 50);
5375        let before_size = empty_rows.size();
5376        empty_rows.reserve(50, 50);
5377        assert_eq!(
5378            empty_rows.size(),
5379            before_size,
5380            "Size should not change when reserving already reserved space"
5381        );
5382        empty_rows.reserve(10, 20);
5383        assert_eq!(
5384            empty_rows.size(),
5385            before_size,
5386            "Size should not change when already have space for the expected reserved data"
5387        );
5388
5389        empty_rows.reserve(100, 20);
5390        assert!(
5391            empty_rows.size() > before_size,
5392            "Size should increase when reserving more space than previously reserved"
5393        );
5394
5395        let before_size = empty_rows.size();
5396
5397        empty_rows.reserve(20, 100);
5398        assert!(
5399            empty_rows.size() > before_size,
5400            "Size should increase when reserving more space than previously reserved"
5401        );
5402    }
5403
5404    #[test]
5405    fn empty_rows_should_return_empty_lengths_iterator() {
5406        let rows = RowConverter::new(vec![SortField::new(DataType::UInt8)])
5407            .unwrap()
5408            .empty_rows(0, 0);
5409        let mut lengths_iter = rows.lengths();
5410        assert_eq!(lengths_iter.next(), None);
5411    }
5412
5413    #[test]
5414    #[should_panic(expected = "row index out of bounds")]
5415    fn row_should_panic_on_overflowing_index() {
5416        let rows = RowConverter::new(vec![SortField::new(DataType::Int32)])
5417            .unwrap()
5418            .empty_rows(0, 0);
5419        rows.row(usize::MAX);
5420    }
5421
5422    #[test]
5423    #[should_panic(expected = "row index out of bounds")]
5424    fn row_len_should_panic_on_overflowing_index() {
5425        let rows = RowConverter::new(vec![SortField::new(DataType::Int32)])
5426            .unwrap()
5427            .empty_rows(0, 0);
5428        rows.row_len(usize::MAX);
5429    }
5430
5431    #[test]
5432    fn test_nested_null_list() {
5433        let null_array = Arc::new(NullArray::new(3));
5434        // [[NULL], [], [NULL, NULL]]
5435        let list: ArrayRef = Arc::new(ListArray::new(
5436            Field::new_list_field(DataType::Null, true).into(),
5437            OffsetBuffer::from_lengths(vec![1, 0, 2]),
5438            null_array,
5439            None,
5440        ));
5441
5442        let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
5443        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5444        let back = converter.convert_rows(&rows).unwrap();
5445
5446        assert_eq!(&list, &back[0]);
5447    }
5448
5449    // Test case from original issue #9227: [[NULL]] - double nested List with Null
5450    #[test]
5451    fn test_double_nested_null_list() {
5452        let null_array = Arc::new(NullArray::new(1));
5453        // [NULL]
5454        let nested_field = Arc::new(Field::new_list_field(DataType::Null, true));
5455        let nested_list = Arc::new(ListArray::new(
5456            nested_field.clone(),
5457            OffsetBuffer::from_lengths(vec![1]),
5458            null_array,
5459            None,
5460        ));
5461        // [[NULL]]
5462        let list = Arc::new(ListArray::new(
5463            Field::new_list_field(DataType::List(nested_field), true).into(),
5464            OffsetBuffer::from_lengths(vec![1]),
5465            nested_list,
5466            None,
5467        )) as ArrayRef;
5468
5469        let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
5470        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5471        let back = converter.convert_rows(&rows).unwrap();
5472
5473        assert_eq!(&list, &back[0]);
5474    }
5475
5476    // Test LargeList<Null>
5477    #[test]
5478    fn test_large_list_null() {
5479        let null_array = Arc::new(NullArray::new(3));
5480        // [[NULL], [], [NULL, NULL]] as LargeList
5481        let list: ArrayRef = Arc::new(LargeListArray::new(
5482            Field::new_list_field(DataType::Null, true).into(),
5483            OffsetBuffer::from_lengths(vec![1, 0, 2]),
5484            null_array,
5485            None,
5486        ));
5487
5488        let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
5489        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5490        let back = converter.convert_rows(&rows).unwrap();
5491
5492        assert_eq!(&list, &back[0]);
5493    }
5494
5495    // Test FixedSizeList<Null>
5496    #[test]
5497    fn test_fixed_size_list_null() {
5498        let null_array = Arc::new(NullArray::new(6));
5499        // [[NULL, NULL], [NULL, NULL], [NULL, NULL]] as FixedSizeList
5500        let list: ArrayRef = Arc::new(FixedSizeListArray::new(
5501            Arc::new(Field::new_list_field(DataType::Null, true)),
5502            2,
5503            null_array,
5504            None,
5505        ));
5506
5507        let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
5508        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5509        let back = converter.convert_rows(&rows).unwrap();
5510
5511        assert_eq!(&list, &back[0]);
5512    }
5513
5514    // Test List<Null> with various combinations of nulls and empty lists
5515    #[test]
5516    fn test_list_null_variations() {
5517        // Test case: [[NULL], [], [NULL, NULL]]
5518        let null_array = Arc::new(NullArray::new(3));
5519        let list: ArrayRef = Arc::new(ListArray::new(
5520            Field::new_list_field(DataType::Null, true).into(),
5521            OffsetBuffer::from_lengths(vec![1, 0, 2]),
5522            null_array,
5523            None,
5524        ));
5525
5526        let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
5527        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5528        let back = converter.convert_rows(&rows).unwrap();
5529        assert_eq!(&list, &back[0]);
5530
5531        // Test case: [[NULL], null, [NULL, NULL]] - middle element is null
5532        let null_array = Arc::new(NullArray::new(3));
5533        let list: ArrayRef = Arc::new(ListArray::new(
5534            Field::new_list_field(DataType::Null, true).into(),
5535            OffsetBuffer::from_lengths(vec![1, 0, 2]),
5536            null_array,
5537            Some(vec![true, false, true].into()),
5538        ));
5539
5540        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5541        let back = converter.convert_rows(&rows).unwrap();
5542        assert_eq!(&list, &back[0]);
5543
5544        // Test case: [] - empty list
5545        let null_array = Arc::new(NullArray::new(0));
5546        let list: ArrayRef = Arc::new(ListArray::new(
5547            Field::new_list_field(DataType::Null, true).into(),
5548            OffsetBuffer::from_lengths(vec![]),
5549            null_array,
5550            None,
5551        ));
5552
5553        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5554        let back = converter.convert_rows(&rows).unwrap();
5555        assert_eq!(&list, &back[0]);
5556
5557        // Test case: [[], [], []] - all empty sublists
5558        let null_array = Arc::new(NullArray::new(0));
5559        let list: ArrayRef = Arc::new(ListArray::new(
5560            Field::new_list_field(DataType::Null, true).into(),
5561            OffsetBuffer::from_lengths(vec![0, 0, 0]),
5562            null_array,
5563            None,
5564        ));
5565
5566        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5567        let back = converter.convert_rows(&rows).unwrap();
5568        assert_eq!(&list, &back[0]);
5569    }
5570
5571    // Test List<Null> with descending order
5572    #[test]
5573    fn test_list_null_descending() {
5574        let null_array = Arc::new(NullArray::new(3));
5575        // [[NULL], [], [NULL, NULL]]
5576        let list: ArrayRef = Arc::new(ListArray::new(
5577            Field::new_list_field(DataType::Null, true).into(),
5578            OffsetBuffer::from_lengths(vec![1, 0, 2]),
5579            null_array,
5580            None,
5581        ));
5582
5583        let options = SortOptions::default().with_descending(true);
5584        let field = SortField::new_with_options(list.data_type().clone(), options);
5585        let converter = RowConverter::new(vec![field]).unwrap();
5586        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
5587        let back = converter.convert_rows(&rows).unwrap();
5588
5589        assert_eq!(&list, &back[0]);
5590    }
5591
5592    // Test Struct with Null field
5593    #[test]
5594    fn test_struct_with_null_field() {
5595        // Struct { a: Null, b: Int32 }
5596        let null_array = Arc::new(NullArray::new(3));
5597        let int_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
5598
5599        let struct_array: ArrayRef = Arc::new(StructArray::new(
5600            vec![
5601                Arc::new(Field::new("a", DataType::Null, true)),
5602                Arc::new(Field::new("b", DataType::Int32, true)),
5603            ]
5604            .into(),
5605            vec![null_array, int_array],
5606            Some(vec![true, true, false].into()), // validity bitmap
5607        ));
5608
5609        let converter =
5610            RowConverter::new(vec![SortField::new(struct_array.data_type().clone())]).unwrap();
5611        let rows = converter
5612            .convert_columns(&[Arc::clone(&struct_array)])
5613            .unwrap();
5614        let back = converter.convert_rows(&rows).unwrap();
5615
5616        assert_eq!(&struct_array, &back[0]);
5617    }
5618
5619    // Test nested Struct with Null
5620    #[test]
5621    fn test_nested_struct_with_null() {
5622        // Inner struct: { x: Null }
5623        let inner_null = Arc::new(NullArray::new(2));
5624        let inner_struct = Arc::new(StructArray::new(
5625            vec![Arc::new(Field::new("x", DataType::Null, true))].into(),
5626            vec![inner_null],
5627            None,
5628        ));
5629
5630        // Outer struct: { inner: Struct { x: Null }, y: Int32 }
5631        let y_array = Arc::new(Int32Array::from(vec![10, 20]));
5632        let outer_struct: ArrayRef = Arc::new(StructArray::new(
5633            vec![
5634                Arc::new(Field::new("inner", inner_struct.data_type().clone(), true)),
5635                Arc::new(Field::new("y", DataType::Int32, true)),
5636            ]
5637            .into(),
5638            vec![inner_struct, y_array],
5639            None,
5640        ));
5641
5642        let converter =
5643            RowConverter::new(vec![SortField::new(outer_struct.data_type().clone())]).unwrap();
5644        let rows = converter
5645            .convert_columns(&[Arc::clone(&outer_struct)])
5646            .unwrap();
5647        let back = converter.convert_rows(&rows).unwrap();
5648
5649        assert_eq!(&outer_struct, &back[0]);
5650    }
5651
5652    // Test Map<Null> - verify it's not supported (as per current implementation)
5653    // https://github.com/apache/arrow-rs/issues/7879
5654    #[test]
5655    fn test_map_null_not_supported() {
5656        // Map with Null values
5657        let map_data_type = Field::new_map(
5658            "map",
5659            "entries",
5660            Field::new("key", DataType::Utf8, false),
5661            Field::new("value", DataType::Null, true),
5662            false,
5663            true,
5664        )
5665        .data_type()
5666        .clone();
5667
5668        // Currently Map is not supported by RowConverter
5669        let result = RowConverter::new(vec![SortField::new(map_data_type)]);
5670        assert!(
5671            result.is_err(),
5672            "Map should not be supported by RowConverter"
5673        );
5674        assert!(
5675            result
5676                .unwrap_err()
5677                .to_string()
5678                .contains("not yet implemented")
5679        );
5680    }
5681
5682    #[test]
5683    fn empty_row_iter_next_back() {
5684        let rows = RowConverter::new(vec![SortField::new(DataType::UInt8)])
5685            .unwrap()
5686            .empty_rows(0, 0);
5687        let mut rows_iter = rows.iter();
5688        assert_eq!(rows_iter.next_back(), None);
5689        assert_eq!(rows_iter.next_back(), None);
5690        assert_eq!(rows_iter.next_back(), None);
5691    }
5692
5693    #[test]
5694    fn row_iter_next_back() {
5695        let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
5696        let mut rng = StdRng::seed_from_u64(42);
5697        let array = generate_primitive_array::<UInt8Type>(&mut rng, 100, 0.8);
5698        let rows = row_converter.convert_columns(&[Arc::new(array)]).unwrap();
5699
5700        let mut rows_iter = rows.iter();
5701        let mut bytes: Vec<u8> = vec![];
5702
5703        while let Some(row) = rows_iter.next_back() {
5704            bytes.extend(row.data.iter().rev());
5705        }
5706
5707        bytes.reverse();
5708
5709        assert_eq!(
5710            bytes,
5711            &rows.buffer.as_slice()[..*rows.offsets.last().unwrap()]
5712        );
5713
5714        assert_eq!(rows_iter.next_back(), None);
5715        assert_eq!(rows_iter.next(), None);
5716    }
5717}