arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexicographic Sorts (lexsort)
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! # Flattening Dictionaries
121//!
122//! For performance reasons, dictionary arrays are flattened ("hydrated") to their
123//! underlying values during row conversion. See [the issue] for more details.
124//!
125//! This means that the arrays that come out of [`RowConverter::convert_rows`]
126//! may not have the same data types as the input arrays. For example, encoding
127//! a `Dictionary<Int8, Utf8>` and then will come out as a `Utf8` array.
128//!
129//! ```
130//! # use arrow_array::{Array, ArrayRef, DictionaryArray};
131//! # use arrow_array::types::Int8Type;
132//! # use arrow_row::{RowConverter, SortField};
133//! # use arrow_schema::DataType;
134//! # use std::sync::Arc;
135//! // Input is a Dictionary array
136//! let dict: DictionaryArray::<Int8Type> = ["a", "b", "c", "a", "b"].into_iter().collect();
137//! let sort_fields = vec![SortField::new(dict.data_type().clone())];
138//! let arrays = vec![Arc::new(dict) as ArrayRef];
139//! let converter = RowConverter::new(sort_fields).unwrap();
140//! // Convert to rows
141//! let rows = converter.convert_columns(&arrays).unwrap();
142//! let converted = converter.convert_rows(&rows).unwrap();
143//! // result was a Utf8 array, not a Dictionary array
144//! assert_eq!(converted[0].data_type(), &DataType::Utf8);
145//! ```
146//!
147//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
148//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
149//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
150//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
151//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
152//! [compared]: PartialOrd
153//! [compare]: PartialOrd
154//! [the issue]: https://github.com/apache/arrow-rs/issues/4811
155
156#![doc(
157    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::ArrowDictionaryKeyType;
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
185///
186/// *Note: The encoding of the row format may change from release to release.*
187///
188/// ## Overview
189///
190/// The row format is a variable length byte sequence created by
191/// concatenating the encoded form of each column. The encoding for
192/// each column depends on its datatype (and sort options).
193///
194/// The encoding is carefully designed in such a way that escaping is
195/// unnecessary: it is never ambiguous as to whether a byte is part of
196/// a sentinel (e.g. null) or a value.
197///
198/// ## Unsigned Integer Encoding
199///
200/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
201/// to the integer's length.
202///
203/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
204/// integer.
205///
206/// ```text
207///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
208///    3          │03│00│00│00│      │01│00│00│00│03│
209///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
210///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
211///   258         │02│01│00│00│      │01│00│00│01│02│
212///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
213///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
214///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
215///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
216///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
217///  NULL         │??│??│??│??│      │00│00│00│00│00│
218///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
219///
220///              32-bit (4 bytes)        Row Format
221///  Value        Little Endian
222/// ```
223///
224/// ## Signed Integer Encoding
225///
226/// Signed integers have their most significant sign bit flipped, and are then encoded in the
227/// same manner as an unsigned integer.
228///
229/// ```text
230///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
231///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
232///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
233///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
234///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
235///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
236///
237///  Value  32-bit (4 bytes)    High bit flipped      Row Format
238///          Little Endian
239/// ```
240///
241/// ## Float Encoding
242///
243/// Floats are converted from IEEE 754 representation to a signed integer representation
244/// by flipping all bar the sign bit if they are negative.
245///
246/// They are then encoded in the same manner as a signed integer.
247///
248/// ## Fixed Length Bytes Encoding
249///
250/// Fixed length bytes are encoded in the same fashion as primitive types above.
251///
252/// For a fixed length array of length `n`:
253///
254/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
255///
256/// A valid value is encoded as `1_u8` followed by the value bytes
257///
258/// ## Variable Length Bytes (including Strings) Encoding
259///
260/// A null is encoded as a `0_u8`.
261///
262/// An empty byte array is encoded as `1_u8`.
263///
264/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
265/// encoded using a block based scheme described below.
266///
267/// The byte array is broken up into fixed-width blocks, each block is written in turn
268/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
269/// with `0_u8` and written to the output, followed by the un-padded length in bytes
270/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
271/// blocks using a length of 32, this is to reduce space amplification for small strings.
272///
273/// Note the following example encodings use a block size of 4 bytes for brevity:
274///
275/// ```text
276///                       ┌───┬───┬───┬───┬───┬───┐
277///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
278///                       └───┴───┴───┴───┴───┴───┘
279///
280///                       ┌───┐
281///  ""                   │01 |
282///                       └───┘
283///
284///  NULL                 ┌───┐
285///                       │00 │
286///                       └───┘
287///
288/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
289///                       │02 │'D'│'e'│'f'│'e'│FF │
290///                       └───┼───┼───┼───┼───┼───┤
291///                           │'n'│'e'│'s'│'t'│FF │
292///                           ├───┼───┼───┼───┼───┤
293///                           │'r'│'a'│'t'│'r'│FF │
294///                           ├───┼───┼───┼───┼───┤
295///                           │'a'│'t'│'i'│'o'│FF │
296///                           ├───┼───┼───┼───┼───┤
297///                           │'n'│00 │00 │00 │01 │
298///                           └───┴───┴───┴───┴───┘
299/// ```
300///
301/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
302/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
303///
304/// ## Dictionary Encoding
305///
306/// Dictionary encoded arrays are hydrated to their underlying values
307///
308/// ## REE Encoding
309///
310/// REE (Run End Encoding) arrays, A form of Run Length Encoding, are hydrated to their underlying values.
311///
312/// ## Struct Encoding
313///
314/// A null is encoded as a `0_u8`.
315///
316/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
317///
318/// This encoding effectively flattens the schema in a depth-first fashion.
319///
320/// For example
321///
322/// ```text
323/// ┌───────┬────────────────────────┬───────┐
324/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
325/// └───────┴────────────────────────┴───────┘
326/// ```
327///
328/// Is encoded as
329///
330/// ```text
331/// ┌───────┬───────────────┬───────┬─────────┬───────┐
332/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
333/// └───────┴───────────────┴───────┴─────────┴───────┘
334/// ```
335///
336/// ## List Encoding
337///
338/// Lists are encoded by first encoding all child elements to the row format.
339///
340/// A list value is then encoded as the concatenation of each of the child elements,
341/// separately encoded using the variable length encoding described above, followed
342/// by the variable length encoding of an empty byte array.
343///
344/// For example given:
345///
346/// ```text
347/// [1_u8, 2_u8, 3_u8]
348/// [1_u8, null]
349/// []
350/// null
351/// ```
352///
353/// The elements would be converted to:
354///
355/// ```text
356///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
357///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
358///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
359///```
360///
361/// Which would be encoded as
362///
363/// ```text
364///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
365///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
366///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
367///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
368///
369///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
370///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
371///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
372///                          └──── 1_u8 ────┘   └──── null ────┘
373///
374///```
375///
376/// With `[]` represented by an empty byte array, and `null` a null byte array.
377///
378/// ## Fixed Size List Encoding
379///
380/// Fixed Size Lists are encoded by first encoding all child elements to the row format.
381///
382/// A non-null list value is then encoded as 0x01 followed by the concatenation of each
383/// of the child elements. A null list value is encoded as a null marker.
384///
385/// For example given:
386///
387/// ```text
388/// [1_u8, 2_u8]
389/// [3_u8, null]
390/// null
391/// ```
392///
393/// The elements would be converted to:
394///
395/// ```text
396///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
397///  1  │01│01│  2  │01│02│  3  │01│03│  null  │00│00│
398///     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
399///```
400///
401/// Which would be encoded as
402///
403/// ```text
404///                 ┌──┬──┬──┬──┬──┐
405///  [1_u8, 2_u8]   │01│01│01│01│02│
406///                 └──┴──┴──┴──┴──┘
407///                     └ 1 ┘ └ 2 ┘
408///                 ┌──┬──┬──┬──┬──┐
409///  [3_u8, null]   │01│01│03│00│00│
410///                 └──┴──┴──┴──┴──┘
411///                     └ 1 ┘ └null┘
412///                 ┌──┐
413///  null           │00│
414///                 └──┘
415///
416///```
417///
418/// # Ordering
419///
420/// ## Float Ordering
421///
422/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
423/// in the IEEE 754 (2008 revision) floating point standard.
424///
425/// The ordering established by this does not always agree with the
426/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
427/// they consider negative and positive zero equal, while this does not
428///
429/// ## Null Ordering
430///
431/// The encoding described above will order nulls first, this can be inverted by representing
432/// nulls as `0xFF_u8` instead of `0_u8`
433///
434/// ## Reverse Column Ordering
435///
436/// The order of a given column can be reversed by negating the encoded bytes of non-null values
437///
438/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
439/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
440#[derive(Debug)]
441pub struct RowConverter {
442    fields: Arc<[SortField]>,
443    /// State for codecs
444    codecs: Vec<Codec>,
445}
446
447#[derive(Debug)]
448enum Codec {
449    /// No additional codec state is necessary
450    Stateless,
451    /// A row converter for the dictionary values
452    /// and the encoding of a row containing only nulls
453    Dictionary(RowConverter, OwnedRow),
454    /// A row converter for the child fields
455    /// and the encoding of a row containing only nulls
456    Struct(RowConverter, OwnedRow),
457    /// A row converter for the child field
458    List(RowConverter),
459    /// A row converter for the values array of a run-end encoded array
460    RunEndEncoded(RowConverter),
461    /// Row converters for each union field (indexed by type_id)
462    /// and the encoding of null rows for each field
463    Union(Vec<RowConverter>, Vec<OwnedRow>),
464}
465
466impl Codec {
467    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
468        match &sort_field.data_type {
469            DataType::Dictionary(_, values) => {
470                let sort_field =
471                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
472
473                let converter = RowConverter::new(vec![sort_field])?;
474                let null_array = new_null_array(values.as_ref(), 1);
475                let nulls = converter.convert_columns(&[null_array])?;
476
477                let owned = OwnedRow {
478                    data: nulls.buffer.into(),
479                    config: nulls.config,
480                };
481                Ok(Self::Dictionary(converter, owned))
482            }
483            DataType::RunEndEncoded(_, values) => {
484                // Similar to List implementation
485                let options = SortOptions {
486                    descending: false,
487                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
488                };
489
490                let field = SortField::new_with_options(values.data_type().clone(), options);
491                let converter = RowConverter::new(vec![field])?;
492                Ok(Self::RunEndEncoded(converter))
493            }
494            d if !d.is_nested() => Ok(Self::Stateless),
495            DataType::List(f) | DataType::LargeList(f) => {
496                // The encoded contents will be inverted if descending is set to true
497                // As such we set `descending` to false and negate nulls first if it
498                // it set to true
499                let options = SortOptions {
500                    descending: false,
501                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
502                };
503
504                let field = SortField::new_with_options(f.data_type().clone(), options);
505                let converter = RowConverter::new(vec![field])?;
506                Ok(Self::List(converter))
507            }
508            DataType::FixedSizeList(f, _) => {
509                let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
510                let converter = RowConverter::new(vec![field])?;
511                Ok(Self::List(converter))
512            }
513            DataType::Struct(f) => {
514                let sort_fields = f
515                    .iter()
516                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
517                    .collect();
518
519                let converter = RowConverter::new(sort_fields)?;
520                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
521
522                let nulls = converter.convert_columns(&nulls)?;
523                let owned = OwnedRow {
524                    data: nulls.buffer.into(),
525                    config: nulls.config,
526                };
527
528                Ok(Self::Struct(converter, owned))
529            }
530            DataType::Union(fields, _mode) => {
531                // similar to dictionaries and lists, we set descending to false and negate nulls_first
532                // since the encoded contents will be inverted if descending is set
533                let options = SortOptions {
534                    descending: false,
535                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
536                };
537
538                let mut converters = Vec::with_capacity(fields.len());
539                let mut null_rows = Vec::with_capacity(fields.len());
540
541                for (_type_id, field) in fields.iter() {
542                    let sort_field =
543                        SortField::new_with_options(field.data_type().clone(), options);
544                    let converter = RowConverter::new(vec![sort_field])?;
545
546                    let null_array = new_null_array(field.data_type(), 1);
547                    let nulls = converter.convert_columns(&[null_array])?;
548                    let owned = OwnedRow {
549                        data: nulls.buffer.into(),
550                        config: nulls.config,
551                    };
552
553                    converters.push(converter);
554                    null_rows.push(owned);
555                }
556
557                Ok(Self::Union(converters, null_rows))
558            }
559            _ => Err(ArrowError::NotYetImplemented(format!(
560                "not yet implemented: {:?}",
561                sort_field.data_type
562            ))),
563        }
564    }
565
566    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
567        match self {
568            Codec::Stateless => Ok(Encoder::Stateless),
569            Codec::Dictionary(converter, nulls) => {
570                let values = array.as_any_dictionary().values().clone();
571                let rows = converter.convert_columns(&[values])?;
572                Ok(Encoder::Dictionary(rows, nulls.row()))
573            }
574            Codec::Struct(converter, null) => {
575                let v = as_struct_array(array);
576                let rows = converter.convert_columns(v.columns())?;
577                Ok(Encoder::Struct(rows, null.row()))
578            }
579            Codec::List(converter) => {
580                let values = match array.data_type() {
581                    DataType::List(_) => {
582                        let list_array = as_list_array(array);
583                        let first_offset = list_array.offsets()[0] as usize;
584                        let last_offset =
585                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
586
587                        // values can include more data than referenced in the ListArray, only encode
588                        // the referenced values.
589                        list_array
590                            .values()
591                            .slice(first_offset, last_offset - first_offset)
592                    }
593                    DataType::LargeList(_) => {
594                        let list_array = as_large_list_array(array);
595
596                        let first_offset = list_array.offsets()[0] as usize;
597                        let last_offset =
598                            list_array.offsets()[list_array.offsets().len() - 1] as usize;
599
600                        // values can include more data than referenced in the LargeListArray, only encode
601                        // the referenced values.
602                        list_array
603                            .values()
604                            .slice(first_offset, last_offset - first_offset)
605                    }
606                    DataType::FixedSizeList(_, _) => {
607                        as_fixed_size_list_array(array).values().clone()
608                    }
609                    _ => unreachable!(),
610                };
611                let rows = converter.convert_columns(&[values])?;
612                Ok(Encoder::List(rows))
613            }
614            Codec::RunEndEncoded(converter) => {
615                let values = match array.data_type() {
616                    DataType::RunEndEncoded(r, _) => match r.data_type() {
617                        DataType::Int16 => array.as_run::<Int16Type>().values(),
618                        DataType::Int32 => array.as_run::<Int32Type>().values(),
619                        DataType::Int64 => array.as_run::<Int64Type>().values(),
620                        _ => unreachable!("Unsupported run end index type: {r:?}"),
621                    },
622                    _ => unreachable!(),
623                };
624                let rows = converter.convert_columns(std::slice::from_ref(values))?;
625                Ok(Encoder::RunEndEncoded(rows))
626            }
627            Codec::Union(converters, _) => {
628                let union_array = array
629                    .as_any()
630                    .downcast_ref::<UnionArray>()
631                    .expect("expected Union array");
632
633                let type_ids = union_array.type_ids().clone();
634                let offsets = union_array.offsets().cloned();
635
636                let mut child_rows = Vec::with_capacity(converters.len());
637                for (type_id, converter) in converters.iter().enumerate() {
638                    let child_array = union_array.child(type_id as i8);
639                    let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
640                    child_rows.push(rows);
641                }
642
643                Ok(Encoder::Union {
644                    child_rows,
645                    type_ids,
646                    offsets,
647                })
648            }
649        }
650    }
651
652    fn size(&self) -> usize {
653        match self {
654            Codec::Stateless => 0,
655            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
656            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
657            Codec::List(converter) => converter.size(),
658            Codec::RunEndEncoded(converter) => converter.size(),
659            Codec::Union(converters, null_rows) => {
660                converters.iter().map(|c| c.size()).sum::<usize>()
661                    + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
662            }
663        }
664    }
665}
666
667#[derive(Debug)]
668enum Encoder<'a> {
669    /// No additional encoder state is necessary
670    Stateless,
671    /// The encoding of the child array and the encoding of a null row
672    Dictionary(Rows, Row<'a>),
673    /// The row encoding of the child arrays and the encoding of a null row
674    ///
675    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
676    /// values that are masked by a null in the parent StructArray, otherwise
677    /// this would establish an ordering between semantically null values
678    Struct(Rows, Row<'a>),
679    /// The row encoding of the child array
680    List(Rows),
681    /// The row encoding of the values array
682    RunEndEncoded(Rows),
683    /// The row encoding of each union field's child array, type_ids buffer, offsets buffer (for Dense), and mode
684    Union {
685        child_rows: Vec<Rows>,
686        type_ids: ScalarBuffer<i8>,
687        offsets: Option<ScalarBuffer<i32>>,
688    },
689}
690
691/// Configure the data type and sort order for a given column
692#[derive(Debug, Clone, PartialEq, Eq)]
693pub struct SortField {
694    /// Sort options
695    options: SortOptions,
696    /// Data type
697    data_type: DataType,
698}
699
700impl SortField {
701    /// Create a new column with the given data type
702    pub fn new(data_type: DataType) -> Self {
703        Self::new_with_options(data_type, Default::default())
704    }
705
706    /// Create a new column with the given data type and [`SortOptions`]
707    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
708        Self { options, data_type }
709    }
710
711    /// Return size of this instance in bytes.
712    ///
713    /// Includes the size of `Self`.
714    pub fn size(&self) -> usize {
715        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
716    }
717}
718
719impl RowConverter {
720    /// Create a new [`RowConverter`] with the provided schema
721    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
722        if !Self::supports_fields(&fields) {
723            return Err(ArrowError::NotYetImplemented(format!(
724                "Row format support not yet implemented for: {fields:?}"
725            )));
726        }
727
728        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
729        Ok(Self {
730            fields: fields.into(),
731            codecs,
732        })
733    }
734
735    /// Check if the given fields are supported by the row format.
736    pub fn supports_fields(fields: &[SortField]) -> bool {
737        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
738    }
739
740    fn supports_datatype(d: &DataType) -> bool {
741        match d {
742            _ if !d.is_nested() => true,
743            DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
744                Self::supports_datatype(f.data_type())
745            }
746            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
747            DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
748            DataType::Union(fs, _mode) => fs
749                .iter()
750                .all(|(_, f)| Self::supports_datatype(f.data_type())),
751            _ => false,
752        }
753    }
754
755    /// Convert [`ArrayRef`] columns into [`Rows`]
756    ///
757    /// See [`Row`] for information on when [`Row`] can be compared
758    ///
759    /// See [`Self::convert_rows`] for converting [`Rows`] back into [`ArrayRef`]
760    ///
761    /// # Panics
762    ///
763    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
764    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
765        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
766        let mut rows = self.empty_rows(num_rows, 0);
767        self.append(&mut rows, columns)?;
768        Ok(rows)
769    }
770
771    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
772    ///
773    /// See [`Row`] for information on when [`Row`] can be compared
774    ///
775    /// # Panics
776    ///
777    /// Panics if
778    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
779    /// * The provided [`Rows`] were not created by this [`RowConverter`]
780    ///
781    /// ```
782    /// # use std::sync::Arc;
783    /// # use std::collections::HashSet;
784    /// # use arrow_array::cast::AsArray;
785    /// # use arrow_array::StringArray;
786    /// # use arrow_row::{Row, RowConverter, SortField};
787    /// # use arrow_schema::DataType;
788    /// #
789    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
790    /// let a1 = StringArray::from(vec!["hello", "world"]);
791    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
792    ///
793    /// let mut rows = converter.empty_rows(5, 128);
794    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
795    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
796    ///
797    /// let back = converter.convert_rows(&rows).unwrap();
798    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
799    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
800    /// ```
801    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
802        assert!(
803            Arc::ptr_eq(&rows.config.fields, &self.fields),
804            "rows were not produced by this RowConverter"
805        );
806
807        if columns.len() != self.fields.len() {
808            return Err(ArrowError::InvalidArgumentError(format!(
809                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
810                self.fields.len(),
811                columns.len()
812            )));
813        }
814        for colum in columns.iter().skip(1) {
815            if colum.len() != columns[0].len() {
816                return Err(ArrowError::InvalidArgumentError(format!(
817                    "RowConverter columns must all have the same length, expected {} got {}",
818                    columns[0].len(),
819                    colum.len()
820                )));
821            }
822        }
823
824        let encoders = columns
825            .iter()
826            .zip(&self.codecs)
827            .zip(self.fields.iter())
828            .map(|((column, codec), field)| {
829                if !column.data_type().equals_datatype(&field.data_type) {
830                    return Err(ArrowError::InvalidArgumentError(format!(
831                        "RowConverter column schema mismatch, expected {} got {}",
832                        field.data_type,
833                        column.data_type()
834                    )));
835                }
836                codec.encoder(column.as_ref())
837            })
838            .collect::<Result<Vec<_>, _>>()?;
839
840        let write_offset = rows.num_rows();
841        let lengths = row_lengths(columns, &encoders);
842        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
843        rows.buffer.resize(total, 0);
844
845        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
846            // We encode a column at a time to minimise dispatch overheads
847            encode_column(
848                &mut rows.buffer,
849                &mut rows.offsets[write_offset..],
850                column.as_ref(),
851                field.options,
852                &encoder,
853            )
854        }
855
856        if cfg!(debug_assertions) {
857            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
858            rows.offsets
859                .windows(2)
860                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
861        }
862
863        Ok(())
864    }
865
866    /// Convert [`Rows`] columns into [`ArrayRef`]
867    ///
868    /// See [`Self::convert_columns`] for converting [`ArrayRef`] into [`Rows`]
869    ///
870    /// # Panics
871    ///
872    /// Panics if the rows were not produced by this [`RowConverter`]
873    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
874    where
875        I: IntoIterator<Item = Row<'a>>,
876    {
877        let mut validate_utf8 = false;
878        let mut rows: Vec<_> = rows
879            .into_iter()
880            .map(|row| {
881                assert!(
882                    Arc::ptr_eq(&row.config.fields, &self.fields),
883                    "rows were not produced by this RowConverter"
884                );
885                validate_utf8 |= row.config.validate_utf8;
886                row.data
887            })
888            .collect();
889
890        // SAFETY
891        // We have validated that the rows came from this [`RowConverter`]
892        // and therefore must be valid
893        let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
894
895        if cfg!(test) {
896            for (i, row) in rows.iter().enumerate() {
897                if !row.is_empty() {
898                    return Err(ArrowError::InvalidArgumentError(format!(
899                        "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
900                        codecs = &self.codecs
901                    )));
902                }
903            }
904        }
905
906        Ok(result)
907    }
908
909    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
910    /// a total length of `data_capacity`
911    ///
912    /// This can be used to buffer a selection of [`Row`]
913    ///
914    /// ```
915    /// # use std::sync::Arc;
916    /// # use std::collections::HashSet;
917    /// # use arrow_array::cast::AsArray;
918    /// # use arrow_array::StringArray;
919    /// # use arrow_row::{Row, RowConverter, SortField};
920    /// # use arrow_schema::DataType;
921    /// #
922    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
923    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
924    ///
925    /// // Convert to row format and deduplicate
926    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
927    /// let mut distinct_rows = converter.empty_rows(3, 100);
928    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
929    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
930    ///
931    /// // Note: we could skip buffering and feed the filtered iterator directly
932    /// // into convert_rows, this is done for demonstration purposes only
933    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
934    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
935    /// assert_eq!(&values, &["hello", "world", "a"]);
936    /// ```
937    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
938        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
939        offsets.push(0);
940
941        Rows {
942            offsets,
943            buffer: Vec::with_capacity(data_capacity),
944            config: RowConfig {
945                fields: self.fields.clone(),
946                validate_utf8: false,
947            },
948        }
949    }
950
951    /// Create a new [Rows] instance from the given binary data.
952    ///
953    /// ```
954    /// # use std::sync::Arc;
955    /// # use std::collections::HashSet;
956    /// # use arrow_array::cast::AsArray;
957    /// # use arrow_array::StringArray;
958    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
959    /// # use arrow_schema::DataType;
960    /// #
961    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
962    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
963    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
964    ///
965    /// // We can convert rows into binary format and back in batch.
966    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
967    /// let binary = rows.try_into_binary().expect("known-small array");
968    /// let converted = converter.from_binary(binary.clone());
969    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
970    /// ```
971    ///
972    /// # Panics
973    ///
974    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
975    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
976    /// panic if the data is malformed.
977    pub fn from_binary(&self, array: BinaryArray) -> Rows {
978        assert_eq!(
979            array.null_count(),
980            0,
981            "can't construct Rows instance from array with nulls"
982        );
983        let (offsets, values, _) = array.into_parts();
984        let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
985        // Try zero-copy, if it does not succeed, fall back to copying the values.
986        let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
987        Rows {
988            buffer,
989            offsets,
990            config: RowConfig {
991                fields: Arc::clone(&self.fields),
992                validate_utf8: true,
993            },
994        }
995    }
996
997    /// Convert raw bytes into [`ArrayRef`]
998    ///
999    /// # Safety
1000    ///
1001    /// `rows` must contain valid data for this [`RowConverter`]
1002    unsafe fn convert_raw(
1003        &self,
1004        rows: &mut [&[u8]],
1005        validate_utf8: bool,
1006    ) -> Result<Vec<ArrayRef>, ArrowError> {
1007        self.fields
1008            .iter()
1009            .zip(&self.codecs)
1010            .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1011            .collect()
1012    }
1013
1014    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
1015    pub fn parser(&self) -> RowParser {
1016        RowParser::new(Arc::clone(&self.fields))
1017    }
1018
1019    /// Returns the size of this instance in bytes
1020    ///
1021    /// Includes the size of `Self`.
1022    pub fn size(&self) -> usize {
1023        std::mem::size_of::<Self>()
1024            + self.fields.iter().map(|x| x.size()).sum::<usize>()
1025            + self.codecs.capacity() * std::mem::size_of::<Codec>()
1026            + self.codecs.iter().map(Codec::size).sum::<usize>()
1027    }
1028}
1029
1030/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
1031#[derive(Debug)]
1032pub struct RowParser {
1033    config: RowConfig,
1034}
1035
1036impl RowParser {
1037    fn new(fields: Arc<[SortField]>) -> Self {
1038        Self {
1039            config: RowConfig {
1040                fields,
1041                validate_utf8: true,
1042            },
1043        }
1044    }
1045
1046    /// Creates a [`Row`] from the provided `bytes`.
1047    ///
1048    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
1049    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
1050    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1051        Row {
1052            data: bytes,
1053            config: &self.config,
1054        }
1055    }
1056}
1057
1058/// The config of a given set of [`Row`]
1059#[derive(Debug, Clone)]
1060struct RowConfig {
1061    /// The schema for these rows
1062    fields: Arc<[SortField]>,
1063    /// Whether to run UTF-8 validation when converting to arrow arrays
1064    validate_utf8: bool,
1065}
1066
1067/// A row-oriented representation of arrow data, that is normalized for comparison.
1068///
1069/// See the [module level documentation](self) and [`RowConverter`] for more details.
1070#[derive(Debug)]
1071pub struct Rows {
1072    /// Underlying row bytes
1073    buffer: Vec<u8>,
1074    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
1075    offsets: Vec<usize>,
1076    /// The config for these rows
1077    config: RowConfig,
1078}
1079
1080impl Rows {
1081    /// Append a [`Row`] to this [`Rows`]
1082    pub fn push(&mut self, row: Row<'_>) {
1083        assert!(
1084            Arc::ptr_eq(&row.config.fields, &self.config.fields),
1085            "row was not produced by this RowConverter"
1086        );
1087        self.config.validate_utf8 |= row.config.validate_utf8;
1088        self.buffer.extend_from_slice(row.data);
1089        self.offsets.push(self.buffer.len())
1090    }
1091
1092    /// Returns the row at index `row`
1093    pub fn row(&self, row: usize) -> Row<'_> {
1094        assert!(row + 1 < self.offsets.len());
1095        unsafe { self.row_unchecked(row) }
1096    }
1097
1098    /// Returns the row at `index` without bounds checking
1099    ///
1100    /// # Safety
1101    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
1102    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1103        let end = unsafe { self.offsets.get_unchecked(index + 1) };
1104        let start = unsafe { self.offsets.get_unchecked(index) };
1105        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1106        Row {
1107            data,
1108            config: &self.config,
1109        }
1110    }
1111
1112    /// Sets the length of this [`Rows`] to 0
1113    pub fn clear(&mut self) {
1114        self.offsets.truncate(1);
1115        self.buffer.clear();
1116    }
1117
1118    /// Returns the number of [`Row`] in this [`Rows`]
1119    pub fn num_rows(&self) -> usize {
1120        self.offsets.len() - 1
1121    }
1122
1123    /// Returns an iterator over the [`Row`] in this [`Rows`]
1124    pub fn iter(&self) -> RowsIter<'_> {
1125        self.into_iter()
1126    }
1127
1128    /// Returns the size of this instance in bytes
1129    ///
1130    /// Includes the size of `Self`.
1131    pub fn size(&self) -> usize {
1132        // Size of fields is accounted for as part of RowConverter
1133        std::mem::size_of::<Self>()
1134            + self.buffer.len()
1135            + self.offsets.len() * std::mem::size_of::<usize>()
1136    }
1137
1138    /// Create a [BinaryArray] from the [Rows] data without reallocating the
1139    /// underlying bytes.
1140    ///
1141    ///
1142    /// ```
1143    /// # use std::sync::Arc;
1144    /// # use std::collections::HashSet;
1145    /// # use arrow_array::cast::AsArray;
1146    /// # use arrow_array::StringArray;
1147    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
1148    /// # use arrow_schema::DataType;
1149    /// #
1150    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1151    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
1152    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
1153    ///
1154    /// // We can convert rows into binary format and back.
1155    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
1156    /// let binary = rows.try_into_binary().expect("known-small array");
1157    /// let parser = converter.parser();
1158    /// let parsed: Vec<OwnedRow> =
1159    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
1160    /// assert_eq!(values, parsed);
1161    /// ```
1162    ///
1163    /// # Errors
1164    ///
1165    /// This function will return an error if there is more data than can be stored in
1166    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
1167    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1168        if self.buffer.len() > i32::MAX as usize {
1169            return Err(ArrowError::InvalidArgumentError(format!(
1170                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1171                self.buffer.len()
1172            )));
1173        }
1174        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
1175        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1176        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
1177        let array = unsafe {
1178            BinaryArray::new_unchecked(
1179                OffsetBuffer::new_unchecked(offsets_scalar),
1180                Buffer::from_vec(self.buffer),
1181                None,
1182            )
1183        };
1184        Ok(array)
1185    }
1186}
1187
1188impl<'a> IntoIterator for &'a Rows {
1189    type Item = Row<'a>;
1190    type IntoIter = RowsIter<'a>;
1191
1192    fn into_iter(self) -> Self::IntoIter {
1193        RowsIter {
1194            rows: self,
1195            start: 0,
1196            end: self.num_rows(),
1197        }
1198    }
1199}
1200
1201/// An iterator over [`Rows`]
1202#[derive(Debug)]
1203pub struct RowsIter<'a> {
1204    rows: &'a Rows,
1205    start: usize,
1206    end: usize,
1207}
1208
1209impl<'a> Iterator for RowsIter<'a> {
1210    type Item = Row<'a>;
1211
1212    fn next(&mut self) -> Option<Self::Item> {
1213        if self.end == self.start {
1214            return None;
1215        }
1216
1217        // SAFETY: We have checked that `start` is less than `end`
1218        let row = unsafe { self.rows.row_unchecked(self.start) };
1219        self.start += 1;
1220        Some(row)
1221    }
1222
1223    fn size_hint(&self) -> (usize, Option<usize>) {
1224        let len = self.len();
1225        (len, Some(len))
1226    }
1227}
1228
1229impl ExactSizeIterator for RowsIter<'_> {
1230    fn len(&self) -> usize {
1231        self.end - self.start
1232    }
1233}
1234
1235impl DoubleEndedIterator for RowsIter<'_> {
1236    fn next_back(&mut self) -> Option<Self::Item> {
1237        if self.end == self.start {
1238            return None;
1239        }
1240        // Safety: We have checked that `start` is less than `end`
1241        let row = unsafe { self.rows.row_unchecked(self.end) };
1242        self.end -= 1;
1243        Some(row)
1244    }
1245}
1246
1247/// A comparable representation of a row.
1248///
1249/// See the [module level documentation](self) for more details.
1250///
1251/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1252/// returned by calls to [`RowConverter::convert_columns`] on the same
1253/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1254/// ordering established by comparing the [`Row`] is arbitrary.
1255#[derive(Debug, Copy, Clone)]
1256pub struct Row<'a> {
1257    data: &'a [u8],
1258    config: &'a RowConfig,
1259}
1260
1261impl<'a> Row<'a> {
1262    /// Create owned version of the row to detach it from the shared [`Rows`].
1263    pub fn owned(&self) -> OwnedRow {
1264        OwnedRow {
1265            data: self.data.into(),
1266            config: self.config.clone(),
1267        }
1268    }
1269
1270    /// The row's bytes, with the lifetime of the underlying data.
1271    pub fn data(&self) -> &'a [u8] {
1272        self.data
1273    }
1274}
1275
1276// Manually derive these as don't wish to include `fields`
1277
1278impl PartialEq for Row<'_> {
1279    #[inline]
1280    fn eq(&self, other: &Self) -> bool {
1281        self.data.eq(other.data)
1282    }
1283}
1284
1285impl Eq for Row<'_> {}
1286
1287impl PartialOrd for Row<'_> {
1288    #[inline]
1289    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1290        Some(self.cmp(other))
1291    }
1292}
1293
1294impl Ord for Row<'_> {
1295    #[inline]
1296    fn cmp(&self, other: &Self) -> Ordering {
1297        self.data.cmp(other.data)
1298    }
1299}
1300
1301impl Hash for Row<'_> {
1302    #[inline]
1303    fn hash<H: Hasher>(&self, state: &mut H) {
1304        self.data.hash(state)
1305    }
1306}
1307
1308impl AsRef<[u8]> for Row<'_> {
1309    #[inline]
1310    fn as_ref(&self) -> &[u8] {
1311        self.data
1312    }
1313}
1314
1315/// Owned version of a [`Row`] that can be moved/cloned freely.
1316///
1317/// This contains the data for the one specific row (not the entire buffer of all rows).
1318#[derive(Debug, Clone)]
1319pub struct OwnedRow {
1320    data: Box<[u8]>,
1321    config: RowConfig,
1322}
1323
1324impl OwnedRow {
1325    /// Get borrowed [`Row`] from owned version.
1326    ///
1327    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1328    pub fn row(&self) -> Row<'_> {
1329        Row {
1330            data: &self.data,
1331            config: &self.config,
1332        }
1333    }
1334}
1335
1336// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1337
1338impl PartialEq for OwnedRow {
1339    #[inline]
1340    fn eq(&self, other: &Self) -> bool {
1341        self.row().eq(&other.row())
1342    }
1343}
1344
1345impl Eq for OwnedRow {}
1346
1347impl PartialOrd for OwnedRow {
1348    #[inline]
1349    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1350        Some(self.cmp(other))
1351    }
1352}
1353
1354impl Ord for OwnedRow {
1355    #[inline]
1356    fn cmp(&self, other: &Self) -> Ordering {
1357        self.row().cmp(&other.row())
1358    }
1359}
1360
1361impl Hash for OwnedRow {
1362    #[inline]
1363    fn hash<H: Hasher>(&self, state: &mut H) {
1364        self.row().hash(state)
1365    }
1366}
1367
1368impl AsRef<[u8]> for OwnedRow {
1369    #[inline]
1370    fn as_ref(&self) -> &[u8] {
1371        &self.data
1372    }
1373}
1374
1375/// Returns the null sentinel, negated if `invert` is true
1376#[inline]
1377fn null_sentinel(options: SortOptions) -> u8 {
1378    match options.nulls_first {
1379        true => 0,
1380        false => 0xFF,
1381    }
1382}
1383
1384/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types.
1385enum LengthTracker {
1386    /// Fixed state: All rows have length `length`
1387    Fixed { length: usize, num_rows: usize },
1388    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
1389    Variable {
1390        fixed_length: usize,
1391        lengths: Vec<usize>,
1392    },
1393}
1394
1395impl LengthTracker {
1396    fn new(num_rows: usize) -> Self {
1397        Self::Fixed {
1398            length: 0,
1399            num_rows,
1400        }
1401    }
1402
1403    /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker
1404    fn push_fixed(&mut self, new_length: usize) {
1405        match self {
1406            LengthTracker::Fixed { length, .. } => *length += new_length,
1407            LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1408        }
1409    }
1410
1411    /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)`
1412    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1413        match self {
1414            LengthTracker::Fixed { length, .. } => {
1415                *self = LengthTracker::Variable {
1416                    fixed_length: *length,
1417                    lengths: new_lengths.collect(),
1418                }
1419            }
1420            LengthTracker::Variable { lengths, .. } => {
1421                assert_eq!(lengths.len(), new_lengths.len());
1422                lengths
1423                    .iter_mut()
1424                    .zip(new_lengths)
1425                    .for_each(|(length, new_length)| *length += new_length);
1426            }
1427        }
1428    }
1429
1430    /// Returns the tracked row lengths as a slice
1431    fn materialized(&mut self) -> &mut [usize] {
1432        if let LengthTracker::Fixed { length, num_rows } = *self {
1433            *self = LengthTracker::Variable {
1434                fixed_length: length,
1435                lengths: vec![0; num_rows],
1436            };
1437        }
1438
1439        match self {
1440            LengthTracker::Variable { lengths, .. } => lengths,
1441            LengthTracker::Fixed { .. } => unreachable!(),
1442        }
1443    }
1444
1445    /// Initializes the offsets using the tracked lengths. Returns the sum of the
1446    /// lengths of the rows added.
1447    ///
1448    /// We initialize the offsets shifted down by one row index.
1449    ///
1450    /// As the rows are appended to the offsets will be incremented to match
1451    ///
1452    /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
1453    /// The offsets would be initialized to `0, 0, 3, 7`
1454    ///
1455    /// Writing the first row entirely would yield `0, 3, 3, 7`
1456    /// The second, `0, 3, 7, 7`
1457    /// The third, `0, 3, 7, 13`
1458    //
1459    /// This would be the final offsets for reading
1460    //
1461    /// In this way offsets tracks the position during writing whilst eventually serving
1462    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1463        match self {
1464            LengthTracker::Fixed { length, num_rows } => {
1465                offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1466
1467                initial_offset + num_rows * length
1468            }
1469            LengthTracker::Variable {
1470                fixed_length,
1471                lengths,
1472            } => {
1473                let mut acc = initial_offset;
1474
1475                offsets.extend(lengths.iter().map(|length| {
1476                    let current = acc;
1477                    acc += length + fixed_length;
1478                    current
1479                }));
1480
1481                acc
1482            }
1483        }
1484    }
1485}
1486
1487/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1488fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1489    use fixed::FixedLengthEncoding;
1490
1491    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1492    let mut tracker = LengthTracker::new(num_rows);
1493
1494    for (array, encoder) in cols.iter().zip(encoders) {
1495        match encoder {
1496            Encoder::Stateless => {
1497                downcast_primitive_array! {
1498                    array => tracker.push_fixed(fixed::encoded_len(array)),
1499                    DataType::Null => {},
1500                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1501                    DataType::Binary => tracker.push_variable(
1502                        as_generic_binary_array::<i32>(array)
1503                            .iter()
1504                            .map(|slice| variable::encoded_len(slice))
1505                    ),
1506                    DataType::LargeBinary => tracker.push_variable(
1507                        as_generic_binary_array::<i64>(array)
1508                            .iter()
1509                            .map(|slice| variable::encoded_len(slice))
1510                    ),
1511                    DataType::BinaryView => tracker.push_variable(
1512                        array.as_binary_view()
1513                            .iter()
1514                            .map(|slice| variable::encoded_len(slice))
1515                    ),
1516                    DataType::Utf8 => tracker.push_variable(
1517                        array.as_string::<i32>()
1518                            .iter()
1519                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1520                    ),
1521                    DataType::LargeUtf8 => tracker.push_variable(
1522                        array.as_string::<i64>()
1523                            .iter()
1524                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1525                    ),
1526                    DataType::Utf8View => tracker.push_variable(
1527                        array.as_string_view()
1528                            .iter()
1529                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1530                    ),
1531                    DataType::FixedSizeBinary(len) => {
1532                        let len = len.to_usize().unwrap();
1533                        tracker.push_fixed(1 + len)
1534                    }
1535                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1536                }
1537            }
1538            Encoder::Dictionary(values, null) => {
1539                downcast_dictionary_array! {
1540                    array => {
1541                        tracker.push_variable(
1542                            array.keys().iter().map(|v| match v {
1543                                Some(k) => values.row(k.as_usize()).data.len(),
1544                                None => null.data.len(),
1545                            })
1546                        )
1547                    }
1548                    _ => unreachable!(),
1549                }
1550            }
1551            Encoder::Struct(rows, null) => {
1552                let array = as_struct_array(array);
1553                tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1554                    true => 1 + rows.row(idx).as_ref().len(),
1555                    false => 1 + null.data.len(),
1556                }));
1557            }
1558            Encoder::List(rows) => match array.data_type() {
1559                DataType::List(_) => {
1560                    list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1561                }
1562                DataType::LargeList(_) => {
1563                    list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1564                }
1565                DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1566                    &mut tracker,
1567                    rows,
1568                    as_fixed_size_list_array(array),
1569                ),
1570                _ => unreachable!(),
1571            },
1572            Encoder::RunEndEncoded(rows) => match array.data_type() {
1573                DataType::RunEndEncoded(r, _) => match r.data_type() {
1574                    DataType::Int16 => run::compute_lengths(
1575                        tracker.materialized(),
1576                        rows,
1577                        array.as_run::<Int16Type>(),
1578                    ),
1579                    DataType::Int32 => run::compute_lengths(
1580                        tracker.materialized(),
1581                        rows,
1582                        array.as_run::<Int32Type>(),
1583                    ),
1584                    DataType::Int64 => run::compute_lengths(
1585                        tracker.materialized(),
1586                        rows,
1587                        array.as_run::<Int64Type>(),
1588                    ),
1589                    _ => unreachable!("Unsupported run end index type: {r:?}"),
1590                },
1591                _ => unreachable!(),
1592            },
1593            Encoder::Union {
1594                child_rows,
1595                type_ids,
1596                offsets,
1597            } => {
1598                let union_array = array
1599                    .as_any()
1600                    .downcast_ref::<UnionArray>()
1601                    .expect("expected UnionArray");
1602
1603                let lengths = (0..union_array.len()).map(|i| {
1604                    let type_id = type_ids[i];
1605                    let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1606                    let child_row = child_rows[type_id as usize].row(child_row_i);
1607
1608                    // length: 1 byte type_id + child row bytes
1609                    1 + child_row.as_ref().len()
1610                });
1611
1612                tracker.push_variable(lengths);
1613            }
1614        }
1615    }
1616
1617    tracker
1618}
1619
1620/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1621fn encode_column(
1622    data: &mut [u8],
1623    offsets: &mut [usize],
1624    column: &dyn Array,
1625    opts: SortOptions,
1626    encoder: &Encoder<'_>,
1627) {
1628    match encoder {
1629        Encoder::Stateless => {
1630            downcast_primitive_array! {
1631                column => {
1632                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1633                        fixed::encode(data, offsets, column.values(), nulls, opts)
1634                    } else {
1635                        fixed::encode_not_null(data, offsets, column.values(), opts)
1636                    }
1637                }
1638                DataType::Null => {}
1639                DataType::Boolean => {
1640                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1641                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1642                    } else {
1643                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1644                    }
1645                }
1646                DataType::Binary => {
1647                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1648                }
1649                DataType::BinaryView => {
1650                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1651                }
1652                DataType::LargeBinary => {
1653                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1654                }
1655                DataType::Utf8 => variable::encode(
1656                    data, offsets,
1657                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1658                    opts,
1659                ),
1660                DataType::LargeUtf8 => variable::encode(
1661                    data, offsets,
1662                    column.as_string::<i64>()
1663                        .iter()
1664                        .map(|x| x.map(|x| x.as_bytes())),
1665                    opts,
1666                ),
1667                DataType::Utf8View => variable::encode(
1668                    data, offsets,
1669                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1670                    opts,
1671                ),
1672                DataType::FixedSizeBinary(_) => {
1673                    let array = column.as_any().downcast_ref().unwrap();
1674                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1675                }
1676                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1677            }
1678        }
1679        Encoder::Dictionary(values, nulls) => {
1680            downcast_dictionary_array! {
1681                column => encode_dictionary_values(data, offsets, column, values, nulls),
1682                _ => unreachable!()
1683            }
1684        }
1685        Encoder::Struct(rows, null) => {
1686            let array = as_struct_array(column);
1687            let null_sentinel = null_sentinel(opts);
1688            offsets
1689                .iter_mut()
1690                .skip(1)
1691                .enumerate()
1692                .for_each(|(idx, offset)| {
1693                    let (row, sentinel) = match array.is_valid(idx) {
1694                        true => (rows.row(idx), 0x01),
1695                        false => (*null, null_sentinel),
1696                    };
1697                    let end_offset = *offset + 1 + row.as_ref().len();
1698                    data[*offset] = sentinel;
1699                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1700                    *offset = end_offset;
1701                })
1702        }
1703        Encoder::List(rows) => match column.data_type() {
1704            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1705            DataType::LargeList(_) => {
1706                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1707            }
1708            DataType::FixedSizeList(_, _) => {
1709                encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1710            }
1711            _ => unreachable!(),
1712        },
1713        Encoder::RunEndEncoded(rows) => match column.data_type() {
1714            DataType::RunEndEncoded(r, _) => match r.data_type() {
1715                DataType::Int16 => {
1716                    run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1717                }
1718                DataType::Int32 => {
1719                    run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1720                }
1721                DataType::Int64 => {
1722                    run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1723                }
1724                _ => unreachable!("Unsupported run end index type: {r:?}"),
1725            },
1726            _ => unreachable!(),
1727        },
1728        Encoder::Union {
1729            child_rows,
1730            type_ids,
1731            offsets: offsets_buf,
1732        } => {
1733            offsets
1734                .iter_mut()
1735                .skip(1)
1736                .enumerate()
1737                .for_each(|(i, offset)| {
1738                    let type_id = type_ids[i];
1739
1740                    let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1741                    let child_row = child_rows[type_id as usize].row(child_row_idx);
1742                    let child_bytes = child_row.as_ref();
1743
1744                    let type_id_byte = if opts.descending {
1745                        !(type_id as u8)
1746                    } else {
1747                        type_id as u8
1748                    };
1749                    data[*offset] = type_id_byte;
1750
1751                    let child_start = *offset + 1;
1752                    let child_end = child_start + child_bytes.len();
1753                    data[child_start..child_end].copy_from_slice(child_bytes);
1754
1755                    *offset = child_end;
1756                });
1757        }
1758    }
1759}
1760
1761/// Encode dictionary values not preserving the dictionary encoding
1762pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1763    data: &mut [u8],
1764    offsets: &mut [usize],
1765    column: &DictionaryArray<K>,
1766    values: &Rows,
1767    null: &Row<'_>,
1768) {
1769    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1770        let row = match k {
1771            Some(k) => values.row(k.as_usize()).data,
1772            None => null.data,
1773        };
1774        let end_offset = *offset + row.len();
1775        data[*offset..end_offset].copy_from_slice(row);
1776        *offset = end_offset;
1777    }
1778}
1779
1780macro_rules! decode_primitive_helper {
1781    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1782        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1783    };
1784}
1785
1786/// Decodes a the provided `field` from `rows`
1787///
1788/// # Safety
1789///
1790/// Rows must contain valid data for the provided field
1791unsafe fn decode_column(
1792    field: &SortField,
1793    rows: &mut [&[u8]],
1794    codec: &Codec,
1795    validate_utf8: bool,
1796) -> Result<ArrayRef, ArrowError> {
1797    let options = field.options;
1798
1799    let array: ArrayRef = match codec {
1800        Codec::Stateless => {
1801            let data_type = field.data_type.clone();
1802            downcast_primitive! {
1803                data_type => (decode_primitive_helper, rows, data_type, options),
1804                DataType::Null => Arc::new(NullArray::new(rows.len())),
1805                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1806                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1807                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1808                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1809                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1810                DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1811                DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1812                DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1813                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1814            }
1815        }
1816        Codec::Dictionary(converter, _) => {
1817            let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1818            cols.into_iter().next().unwrap()
1819        }
1820        Codec::Struct(converter, _) => {
1821            let (null_count, nulls) = fixed::decode_nulls(rows);
1822            rows.iter_mut().for_each(|row| *row = &row[1..]);
1823            let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1824
1825            let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1826            // Since RowConverter flattens certain data types (i.e. Dictionary),
1827            // we need to use updated data type instead of original field
1828            let corrected_fields: Vec<Field> = match &field.data_type {
1829                DataType::Struct(struct_fields) => struct_fields
1830                    .iter()
1831                    .zip(child_data.iter())
1832                    .map(|(orig_field, child_array)| {
1833                        orig_field
1834                            .as_ref()
1835                            .clone()
1836                            .with_data_type(child_array.data_type().clone())
1837                    })
1838                    .collect(),
1839                _ => unreachable!("Only Struct types should be corrected here"),
1840            };
1841            let corrected_struct_type = DataType::Struct(corrected_fields.into());
1842            let builder = ArrayDataBuilder::new(corrected_struct_type)
1843                .len(rows.len())
1844                .null_count(null_count)
1845                .null_bit_buffer(Some(nulls))
1846                .child_data(child_data);
1847
1848            Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1849        }
1850        Codec::List(converter) => match &field.data_type {
1851            DataType::List(_) => {
1852                Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1853            }
1854            DataType::LargeList(_) => {
1855                Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1856            }
1857            DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1858                list::decode_fixed_size_list(
1859                    converter,
1860                    rows,
1861                    field,
1862                    validate_utf8,
1863                    value_length.as_usize(),
1864                )
1865            }?),
1866            _ => unreachable!(),
1867        },
1868        Codec::RunEndEncoded(converter) => match &field.data_type {
1869            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1870                DataType::Int16 => Arc::new(unsafe {
1871                    run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1872                }?),
1873                DataType::Int32 => Arc::new(unsafe {
1874                    run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1875                }?),
1876                DataType::Int64 => Arc::new(unsafe {
1877                    run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1878                }?),
1879                _ => unreachable!(),
1880            },
1881            _ => unreachable!(),
1882        },
1883        Codec::Union(converters, null_rows) => {
1884            let len = rows.len();
1885
1886            let DataType::Union(union_fields, mode) = &field.data_type else {
1887                unreachable!()
1888            };
1889
1890            let mut type_ids = Vec::with_capacity(len);
1891            let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
1892
1893            for (idx, row) in rows.iter_mut().enumerate() {
1894                let type_id_byte = {
1895                    let id = row[0];
1896                    if options.descending { !id } else { id }
1897                };
1898
1899                let type_id = type_id_byte as i8;
1900                type_ids.push(type_id);
1901
1902                let field_idx = type_id as usize;
1903
1904                let child_row = &row[1..];
1905                rows_by_field[field_idx].push((idx, child_row));
1906
1907                *row = &row[row.len()..];
1908            }
1909
1910            let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
1911
1912            let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
1913
1914            for (field_idx, converter) in converters.iter().enumerate() {
1915                let field_rows = &rows_by_field[field_idx];
1916
1917                match &mode {
1918                    UnionMode::Dense => {
1919                        if field_rows.is_empty() {
1920                            let (_, field) = union_fields.iter().nth(field_idx).unwrap();
1921                            child_arrays.push(arrow_array::new_empty_array(field.data_type()));
1922                            continue;
1923                        }
1924
1925                        let mut child_data = field_rows
1926                            .iter()
1927                            .map(|(_, bytes)| *bytes)
1928                            .collect::<Vec<_>>();
1929
1930                        let child_array =
1931                            unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
1932
1933                        child_arrays.push(child_array.into_iter().next().unwrap());
1934                    }
1935                    UnionMode::Sparse => {
1936                        let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
1937                        let mut field_row_iter = field_rows.iter().peekable();
1938                        let null_row_bytes: &[u8] = &null_rows[field_idx].data;
1939
1940                        for idx in 0..len {
1941                            if let Some((next_idx, bytes)) = field_row_iter.peek() {
1942                                if *next_idx == idx {
1943                                    sparse_data.push(*bytes);
1944
1945                                    field_row_iter.next();
1946                                    continue;
1947                                }
1948                            }
1949                            sparse_data.push(null_row_bytes);
1950                        }
1951
1952                        let child_array =
1953                            unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
1954                        child_arrays.push(child_array.into_iter().next().unwrap());
1955                    }
1956                }
1957            }
1958
1959            // build offsets for dense unions
1960            if let Some(ref mut offsets_vec) = offsets {
1961                let mut count = vec![0i32; converters.len()];
1962                for type_id in &type_ids {
1963                    let field_idx = *type_id as usize;
1964                    offsets_vec.push(count[field_idx]);
1965
1966                    count[field_idx] += 1;
1967                }
1968            }
1969
1970            let type_ids_buffer = ScalarBuffer::from(type_ids);
1971            let offsets_buffer = offsets.map(ScalarBuffer::from);
1972
1973            let union_array = UnionArray::try_new(
1974                union_fields.clone(),
1975                type_ids_buffer,
1976                offsets_buffer,
1977                child_arrays,
1978            )?;
1979
1980            // note: union arrays don't support physical null buffers
1981            // nulls are represented logically though child arrays
1982            Arc::new(union_array)
1983        }
1984    };
1985    Ok(array)
1986}
1987
1988#[cfg(test)]
1989mod tests {
1990    use rand::distr::uniform::SampleUniform;
1991    use rand::distr::{Distribution, StandardUniform};
1992    use rand::{Rng, rng};
1993
1994    use arrow_array::builder::*;
1995    use arrow_array::types::*;
1996    use arrow_array::*;
1997    use arrow_buffer::{Buffer, OffsetBuffer};
1998    use arrow_buffer::{NullBuffer, i256};
1999    use arrow_cast::display::{ArrayFormatter, FormatOptions};
2000    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2001
2002    use super::*;
2003
2004    #[test]
2005    fn test_fixed_width() {
2006        let cols = [
2007            Arc::new(Int16Array::from_iter([
2008                Some(1),
2009                Some(2),
2010                None,
2011                Some(-5),
2012                Some(2),
2013                Some(2),
2014                Some(0),
2015            ])) as ArrayRef,
2016            Arc::new(Float32Array::from_iter([
2017                Some(1.3),
2018                Some(2.5),
2019                None,
2020                Some(4.),
2021                Some(0.1),
2022                Some(-4.),
2023                Some(-0.),
2024            ])) as ArrayRef,
2025        ];
2026
2027        let converter = RowConverter::new(vec![
2028            SortField::new(DataType::Int16),
2029            SortField::new(DataType::Float32),
2030        ])
2031        .unwrap();
2032        let rows = converter.convert_columns(&cols).unwrap();
2033
2034        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2035        assert_eq!(
2036            rows.buffer,
2037            &[
2038                1, 128, 1, //
2039                1, 191, 166, 102, 102, //
2040                1, 128, 2, //
2041                1, 192, 32, 0, 0, //
2042                0, 0, 0, //
2043                0, 0, 0, 0, 0, //
2044                1, 127, 251, //
2045                1, 192, 128, 0, 0, //
2046                1, 128, 2, //
2047                1, 189, 204, 204, 205, //
2048                1, 128, 2, //
2049                1, 63, 127, 255, 255, //
2050                1, 128, 0, //
2051                1, 127, 255, 255, 255 //
2052            ]
2053        );
2054
2055        assert!(rows.row(3) < rows.row(6));
2056        assert!(rows.row(0) < rows.row(1));
2057        assert!(rows.row(3) < rows.row(0));
2058        assert!(rows.row(4) < rows.row(1));
2059        assert!(rows.row(5) < rows.row(4));
2060
2061        let back = converter.convert_rows(&rows).unwrap();
2062        for (expected, actual) in cols.iter().zip(&back) {
2063            assert_eq!(expected, actual);
2064        }
2065    }
2066
2067    #[test]
2068    fn test_decimal32() {
2069        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2070            DECIMAL32_MAX_PRECISION,
2071            7,
2072        ))])
2073        .unwrap();
2074        let col = Arc::new(
2075            Decimal32Array::from_iter([
2076                None,
2077                Some(i32::MIN),
2078                Some(-13),
2079                Some(46_i32),
2080                Some(5456_i32),
2081                Some(i32::MAX),
2082            ])
2083            .with_precision_and_scale(9, 7)
2084            .unwrap(),
2085        ) as ArrayRef;
2086
2087        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2088        for i in 0..rows.num_rows() - 1 {
2089            assert!(rows.row(i) < rows.row(i + 1));
2090        }
2091
2092        let back = converter.convert_rows(&rows).unwrap();
2093        assert_eq!(back.len(), 1);
2094        assert_eq!(col.as_ref(), back[0].as_ref())
2095    }
2096
2097    #[test]
2098    fn test_decimal64() {
2099        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2100            DECIMAL64_MAX_PRECISION,
2101            7,
2102        ))])
2103        .unwrap();
2104        let col = Arc::new(
2105            Decimal64Array::from_iter([
2106                None,
2107                Some(i64::MIN),
2108                Some(-13),
2109                Some(46_i64),
2110                Some(5456_i64),
2111                Some(i64::MAX),
2112            ])
2113            .with_precision_and_scale(18, 7)
2114            .unwrap(),
2115        ) as ArrayRef;
2116
2117        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2118        for i in 0..rows.num_rows() - 1 {
2119            assert!(rows.row(i) < rows.row(i + 1));
2120        }
2121
2122        let back = converter.convert_rows(&rows).unwrap();
2123        assert_eq!(back.len(), 1);
2124        assert_eq!(col.as_ref(), back[0].as_ref())
2125    }
2126
2127    #[test]
2128    fn test_decimal128() {
2129        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2130            DECIMAL128_MAX_PRECISION,
2131            7,
2132        ))])
2133        .unwrap();
2134        let col = Arc::new(
2135            Decimal128Array::from_iter([
2136                None,
2137                Some(i128::MIN),
2138                Some(-13),
2139                Some(46_i128),
2140                Some(5456_i128),
2141                Some(i128::MAX),
2142            ])
2143            .with_precision_and_scale(38, 7)
2144            .unwrap(),
2145        ) as ArrayRef;
2146
2147        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2148        for i in 0..rows.num_rows() - 1 {
2149            assert!(rows.row(i) < rows.row(i + 1));
2150        }
2151
2152        let back = converter.convert_rows(&rows).unwrap();
2153        assert_eq!(back.len(), 1);
2154        assert_eq!(col.as_ref(), back[0].as_ref())
2155    }
2156
2157    #[test]
2158    fn test_decimal256() {
2159        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2160            DECIMAL256_MAX_PRECISION,
2161            7,
2162        ))])
2163        .unwrap();
2164        let col = Arc::new(
2165            Decimal256Array::from_iter([
2166                None,
2167                Some(i256::MIN),
2168                Some(i256::from_parts(0, -1)),
2169                Some(i256::from_parts(u128::MAX, -1)),
2170                Some(i256::from_parts(u128::MAX, 0)),
2171                Some(i256::from_parts(0, 46_i128)),
2172                Some(i256::from_parts(5, 46_i128)),
2173                Some(i256::MAX),
2174            ])
2175            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2176            .unwrap(),
2177        ) as ArrayRef;
2178
2179        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2180        for i in 0..rows.num_rows() - 1 {
2181            assert!(rows.row(i) < rows.row(i + 1));
2182        }
2183
2184        let back = converter.convert_rows(&rows).unwrap();
2185        assert_eq!(back.len(), 1);
2186        assert_eq!(col.as_ref(), back[0].as_ref())
2187    }
2188
2189    #[test]
2190    fn test_bool() {
2191        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2192
2193        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2194
2195        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2196        assert!(rows.row(2) > rows.row(1));
2197        assert!(rows.row(2) > rows.row(0));
2198        assert!(rows.row(1) > rows.row(0));
2199
2200        let cols = converter.convert_rows(&rows).unwrap();
2201        assert_eq!(&cols[0], &col);
2202
2203        let converter = RowConverter::new(vec![SortField::new_with_options(
2204            DataType::Boolean,
2205            SortOptions::default().desc().with_nulls_first(false),
2206        )])
2207        .unwrap();
2208
2209        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2210        assert!(rows.row(2) < rows.row(1));
2211        assert!(rows.row(2) < rows.row(0));
2212        assert!(rows.row(1) < rows.row(0));
2213        let cols = converter.convert_rows(&rows).unwrap();
2214        assert_eq!(&cols[0], &col);
2215    }
2216
2217    #[test]
2218    fn test_timezone() {
2219        let a =
2220            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2221        let d = a.data_type().clone();
2222
2223        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2224        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2225        let back = converter.convert_rows(&rows).unwrap();
2226        assert_eq!(back.len(), 1);
2227        assert_eq!(back[0].data_type(), &d);
2228
2229        // Test dictionary
2230        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2231        a.append(34).unwrap();
2232        a.append_null();
2233        a.append(345).unwrap();
2234
2235        // Construct dictionary with a timezone
2236        let dict = a.finish();
2237        let values = TimestampNanosecondArray::from(dict.values().to_data());
2238        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2239        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2240        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2241
2242        assert_eq!(dict_with_tz.data_type(), &d);
2243        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2244        let rows = converter
2245            .convert_columns(&[Arc::new(dict_with_tz) as _])
2246            .unwrap();
2247        let back = converter.convert_rows(&rows).unwrap();
2248        assert_eq!(back.len(), 1);
2249        assert_eq!(back[0].data_type(), &v);
2250    }
2251
2252    #[test]
2253    fn test_null_encoding() {
2254        let col = Arc::new(NullArray::new(10));
2255        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2256        let rows = converter.convert_columns(&[col]).unwrap();
2257        assert_eq!(rows.num_rows(), 10);
2258        assert_eq!(rows.row(1).data.len(), 0);
2259    }
2260
2261    #[test]
2262    fn test_variable_width() {
2263        let col = Arc::new(StringArray::from_iter([
2264            Some("hello"),
2265            Some("he"),
2266            None,
2267            Some("foo"),
2268            Some(""),
2269        ])) as ArrayRef;
2270
2271        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2272        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2273
2274        assert!(rows.row(1) < rows.row(0));
2275        assert!(rows.row(2) < rows.row(4));
2276        assert!(rows.row(3) < rows.row(0));
2277        assert!(rows.row(3) < rows.row(1));
2278
2279        let cols = converter.convert_rows(&rows).unwrap();
2280        assert_eq!(&cols[0], &col);
2281
2282        let col = Arc::new(BinaryArray::from_iter([
2283            None,
2284            Some(vec![0_u8; 0]),
2285            Some(vec![0_u8; 6]),
2286            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2287            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2288            Some(vec![0_u8; variable::BLOCK_SIZE]),
2289            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2290            Some(vec![1_u8; 6]),
2291            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2292            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2293            Some(vec![1_u8; variable::BLOCK_SIZE]),
2294            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2295            Some(vec![0xFF_u8; 6]),
2296            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2297            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2298            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2299            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2300        ])) as ArrayRef;
2301
2302        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2303        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2304
2305        for i in 0..rows.num_rows() {
2306            for j in i + 1..rows.num_rows() {
2307                assert!(
2308                    rows.row(i) < rows.row(j),
2309                    "{} < {} - {:?} < {:?}",
2310                    i,
2311                    j,
2312                    rows.row(i),
2313                    rows.row(j)
2314                );
2315            }
2316        }
2317
2318        let cols = converter.convert_rows(&rows).unwrap();
2319        assert_eq!(&cols[0], &col);
2320
2321        let converter = RowConverter::new(vec![SortField::new_with_options(
2322            DataType::Binary,
2323            SortOptions::default().desc().with_nulls_first(false),
2324        )])
2325        .unwrap();
2326        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2327
2328        for i in 0..rows.num_rows() {
2329            for j in i + 1..rows.num_rows() {
2330                assert!(
2331                    rows.row(i) > rows.row(j),
2332                    "{} > {} - {:?} > {:?}",
2333                    i,
2334                    j,
2335                    rows.row(i),
2336                    rows.row(j)
2337                );
2338            }
2339        }
2340
2341        let cols = converter.convert_rows(&rows).unwrap();
2342        assert_eq!(&cols[0], &col);
2343    }
2344
2345    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
2346    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2347        match b.data_type() {
2348            DataType::Dictionary(_, v) => {
2349                assert_eq!(a.data_type(), v.as_ref());
2350                let b = arrow_cast::cast(b, v).unwrap();
2351                assert_eq!(a, b.as_ref())
2352            }
2353            _ => assert_eq!(a, b),
2354        }
2355    }
2356
2357    #[test]
2358    fn test_string_dictionary() {
2359        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2360            Some("foo"),
2361            Some("hello"),
2362            Some("he"),
2363            None,
2364            Some("hello"),
2365            Some(""),
2366            Some("hello"),
2367            Some("hello"),
2368        ])) as ArrayRef;
2369
2370        let field = SortField::new(a.data_type().clone());
2371        let converter = RowConverter::new(vec![field]).unwrap();
2372        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2373
2374        assert!(rows_a.row(3) < rows_a.row(5));
2375        assert!(rows_a.row(2) < rows_a.row(1));
2376        assert!(rows_a.row(0) < rows_a.row(1));
2377        assert!(rows_a.row(3) < rows_a.row(0));
2378
2379        assert_eq!(rows_a.row(1), rows_a.row(4));
2380        assert_eq!(rows_a.row(1), rows_a.row(6));
2381        assert_eq!(rows_a.row(1), rows_a.row(7));
2382
2383        let cols = converter.convert_rows(&rows_a).unwrap();
2384        dictionary_eq(&cols[0], &a);
2385
2386        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2387            Some("hello"),
2388            None,
2389            Some("cupcakes"),
2390        ])) as ArrayRef;
2391
2392        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2393        assert_eq!(rows_a.row(1), rows_b.row(0));
2394        assert_eq!(rows_a.row(3), rows_b.row(1));
2395        assert!(rows_b.row(2) < rows_a.row(0));
2396
2397        let cols = converter.convert_rows(&rows_b).unwrap();
2398        dictionary_eq(&cols[0], &b);
2399
2400        let converter = RowConverter::new(vec![SortField::new_with_options(
2401            a.data_type().clone(),
2402            SortOptions::default().desc().with_nulls_first(false),
2403        )])
2404        .unwrap();
2405
2406        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2407        assert!(rows_c.row(3) > rows_c.row(5));
2408        assert!(rows_c.row(2) > rows_c.row(1));
2409        assert!(rows_c.row(0) > rows_c.row(1));
2410        assert!(rows_c.row(3) > rows_c.row(0));
2411
2412        let cols = converter.convert_rows(&rows_c).unwrap();
2413        dictionary_eq(&cols[0], &a);
2414
2415        let converter = RowConverter::new(vec![SortField::new_with_options(
2416            a.data_type().clone(),
2417            SortOptions::default().desc().with_nulls_first(true),
2418        )])
2419        .unwrap();
2420
2421        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2422        assert!(rows_c.row(3) < rows_c.row(5));
2423        assert!(rows_c.row(2) > rows_c.row(1));
2424        assert!(rows_c.row(0) > rows_c.row(1));
2425        assert!(rows_c.row(3) < rows_c.row(0));
2426
2427        let cols = converter.convert_rows(&rows_c).unwrap();
2428        dictionary_eq(&cols[0], &a);
2429    }
2430
2431    #[test]
2432    fn test_struct() {
2433        // Test basic
2434        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2435        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2436        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2437        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2438        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2439
2440        let sort_fields = vec![SortField::new(s1.data_type().clone())];
2441        let converter = RowConverter::new(sort_fields).unwrap();
2442        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2443
2444        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2445            assert!(a < b);
2446        }
2447
2448        let back = converter.convert_rows(&r1).unwrap();
2449        assert_eq!(back.len(), 1);
2450        assert_eq!(&back[0], &s1);
2451
2452        // Test struct nullability
2453        let data = s1
2454            .to_data()
2455            .into_builder()
2456            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2457            .null_count(2)
2458            .build()
2459            .unwrap();
2460
2461        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2462        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2463        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
2464        assert!(r2.row(0) < r2.row(1)); // Nulls first
2465        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
2466        assert_eq!(r1.row(1), r2.row(1)); // Values equal
2467
2468        let back = converter.convert_rows(&r2).unwrap();
2469        assert_eq!(back.len(), 1);
2470        assert_eq!(&back[0], &s2);
2471
2472        back[0].to_data().validate_full().unwrap();
2473    }
2474
2475    #[test]
2476    fn test_dictionary_in_struct() {
2477        let builder = StringDictionaryBuilder::<Int32Type>::new();
2478        let mut struct_builder = StructBuilder::new(
2479            vec![Field::new_dictionary(
2480                "foo",
2481                DataType::Int32,
2482                DataType::Utf8,
2483                true,
2484            )],
2485            vec![Box::new(builder)],
2486        );
2487
2488        let dict_builder = struct_builder
2489            .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2490            .unwrap();
2491
2492        // Flattened: ["a", null, "a", "b"]
2493        dict_builder.append_value("a");
2494        dict_builder.append_null();
2495        dict_builder.append_value("a");
2496        dict_builder.append_value("b");
2497
2498        for _ in 0..4 {
2499            struct_builder.append(true);
2500        }
2501
2502        let s = Arc::new(struct_builder.finish()) as ArrayRef;
2503        let sort_fields = vec![SortField::new(s.data_type().clone())];
2504        let converter = RowConverter::new(sort_fields).unwrap();
2505        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2506
2507        let back = converter.convert_rows(&r).unwrap();
2508        let [s2] = back.try_into().unwrap();
2509
2510        // RowConverter flattens Dictionary
2511        // s.ty = Struct("foo": Dictionary(Int32, Utf8)), s2.ty = Struct("foo": Utf8)
2512        assert_ne!(&s.data_type(), &s2.data_type());
2513        s2.to_data().validate_full().unwrap();
2514
2515        // Check if the logical data remains the same
2516        // Keys: [0, null, 0, 1]
2517        // Values: ["a", "b"]
2518        let s1_struct = s.as_struct();
2519        let s1_0 = s1_struct.column(0);
2520        let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2521        let keys = s1_idx_0.keys();
2522        let values = s1_idx_0.values().as_string::<i32>();
2523        // Flattened: ["a", null, "a", "b"]
2524        let s2_struct = s2.as_struct();
2525        let s2_0 = s2_struct.column(0);
2526        let s2_idx_0 = s2_0.as_string::<i32>();
2527
2528        for i in 0..keys.len() {
2529            if keys.is_null(i) {
2530                assert!(s2_idx_0.is_null(i));
2531            } else {
2532                let dict_index = keys.value(i) as usize;
2533                assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2534            }
2535        }
2536    }
2537
2538    #[test]
2539    fn test_dictionary_in_struct_empty() {
2540        let ty = DataType::Struct(
2541            vec![Field::new_dictionary(
2542                "foo",
2543                DataType::Int32,
2544                DataType::Int32,
2545                false,
2546            )]
2547            .into(),
2548        );
2549        let s = arrow_array::new_empty_array(&ty);
2550
2551        let sort_fields = vec![SortField::new(s.data_type().clone())];
2552        let converter = RowConverter::new(sort_fields).unwrap();
2553        let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2554
2555        let back = converter.convert_rows(&r).unwrap();
2556        let [s2] = back.try_into().unwrap();
2557
2558        // RowConverter flattens Dictionary
2559        // s.ty = Struct("foo": Dictionary(Int32, Int32)), s2.ty = Struct("foo": Int32)
2560        assert_ne!(&s.data_type(), &s2.data_type());
2561        s2.to_data().validate_full().unwrap();
2562        assert_eq!(s.len(), 0);
2563        assert_eq!(s2.len(), 0);
2564    }
2565
2566    #[test]
2567    fn test_list_of_string_dictionary() {
2568        let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2569        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2570        builder.values().append("a").unwrap();
2571        builder.values().append("b").unwrap();
2572        builder.values().append("zero").unwrap();
2573        builder.values().append_null();
2574        builder.values().append("c").unwrap();
2575        builder.values().append("b").unwrap();
2576        builder.values().append("d").unwrap();
2577        builder.append(true);
2578        // List[1] = null
2579        builder.append(false);
2580        // List[2] = ["e", "zero", "a" (dict)]
2581        builder.values().append("e").unwrap();
2582        builder.values().append("zero").unwrap();
2583        builder.values().append("a").unwrap();
2584        builder.append(true);
2585
2586        let a = Arc::new(builder.finish()) as ArrayRef;
2587        let data_type = a.data_type().clone();
2588
2589        let field = SortField::new(data_type.clone());
2590        let converter = RowConverter::new(vec![field]).unwrap();
2591        let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2592
2593        let back = converter.convert_rows(&rows).unwrap();
2594        assert_eq!(back.len(), 1);
2595        let [a2] = back.try_into().unwrap();
2596
2597        // RowConverter flattens Dictionary
2598        // a.ty: List(Dictionary(Int32, Utf8)), a2.ty: List(Utf8)
2599        assert_ne!(&a.data_type(), &a2.data_type());
2600
2601        a2.to_data().validate_full().unwrap();
2602
2603        let a2_list = a2.as_list::<i32>();
2604        let a1_list = a.as_list::<i32>();
2605
2606        // Check if the logical data remains the same
2607        // List[0] = ["a", "b", "zero", null, "c", "b", "d" (dict)]
2608        let a1_0 = a1_list.value(0);
2609        let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2610        let keys = a1_idx_0.keys();
2611        let values = a1_idx_0.values().as_string::<i32>();
2612        let a2_0 = a2_list.value(0);
2613        let a2_idx_0 = a2_0.as_string::<i32>();
2614
2615        for i in 0..keys.len() {
2616            if keys.is_null(i) {
2617                assert!(a2_idx_0.is_null(i));
2618            } else {
2619                let dict_index = keys.value(i) as usize;
2620                assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2621            }
2622        }
2623
2624        // List[1] = null
2625        assert!(a1_list.is_null(1));
2626        assert!(a2_list.is_null(1));
2627
2628        // List[2] = ["e", "zero", "a" (dict)]
2629        let a1_2 = a1_list.value(2);
2630        let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2631        let keys = a1_idx_2.keys();
2632        let values = a1_idx_2.values().as_string::<i32>();
2633        let a2_2 = a2_list.value(2);
2634        let a2_idx_2 = a2_2.as_string::<i32>();
2635
2636        for i in 0..keys.len() {
2637            if keys.is_null(i) {
2638                assert!(a2_idx_2.is_null(i));
2639            } else {
2640                let dict_index = keys.value(i) as usize;
2641                assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2642            }
2643        }
2644    }
2645
2646    #[test]
2647    fn test_primitive_dictionary() {
2648        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2649        builder.append(2).unwrap();
2650        builder.append(3).unwrap();
2651        builder.append(0).unwrap();
2652        builder.append_null();
2653        builder.append(5).unwrap();
2654        builder.append(3).unwrap();
2655        builder.append(-1).unwrap();
2656
2657        let a = builder.finish();
2658        let data_type = a.data_type().clone();
2659        let columns = [Arc::new(a) as ArrayRef];
2660
2661        let field = SortField::new(data_type.clone());
2662        let converter = RowConverter::new(vec![field]).unwrap();
2663        let rows = converter.convert_columns(&columns).unwrap();
2664        assert!(rows.row(0) < rows.row(1));
2665        assert!(rows.row(2) < rows.row(0));
2666        assert!(rows.row(3) < rows.row(2));
2667        assert!(rows.row(6) < rows.row(2));
2668        assert!(rows.row(3) < rows.row(6));
2669
2670        let back = converter.convert_rows(&rows).unwrap();
2671        assert_eq!(back.len(), 1);
2672        back[0].to_data().validate_full().unwrap();
2673    }
2674
2675    #[test]
2676    fn test_dictionary_nulls() {
2677        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2678        let keys =
2679            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2680
2681        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2682        let data = keys
2683            .into_builder()
2684            .data_type(data_type.clone())
2685            .child_data(vec![values])
2686            .build()
2687            .unwrap();
2688
2689        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2690        let field = SortField::new(data_type.clone());
2691        let converter = RowConverter::new(vec![field]).unwrap();
2692        let rows = converter.convert_columns(&columns).unwrap();
2693
2694        assert_eq!(rows.row(0), rows.row(1));
2695        assert_eq!(rows.row(3), rows.row(4));
2696        assert_eq!(rows.row(4), rows.row(5));
2697        assert!(rows.row(3) < rows.row(0));
2698    }
2699
2700    #[test]
2701    fn test_from_binary_shared_buffer() {
2702        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2703        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2704        let rows = converter.convert_columns(&[array]).unwrap();
2705        let binary_rows = rows.try_into_binary().expect("known-small rows");
2706        let _binary_rows_shared_buffer = binary_rows.clone();
2707
2708        let parsed = converter.from_binary(binary_rows);
2709
2710        converter.convert_rows(parsed.iter()).unwrap();
2711    }
2712
2713    #[test]
2714    #[should_panic(expected = "Encountered non UTF-8 data")]
2715    fn test_invalid_utf8() {
2716        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2717        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2718        let rows = converter.convert_columns(&[array]).unwrap();
2719        let binary_row = rows.row(0);
2720
2721        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2722        let parser = converter.parser();
2723        let utf8_row = parser.parse(binary_row.as_ref());
2724
2725        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2726    }
2727
2728    #[test]
2729    #[should_panic(expected = "Encountered non UTF-8 data")]
2730    fn test_invalid_utf8_array() {
2731        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2732        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2733        let rows = converter.convert_columns(&[array]).unwrap();
2734        let binary_rows = rows.try_into_binary().expect("known-small rows");
2735
2736        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2737        let parsed = converter.from_binary(binary_rows);
2738
2739        converter.convert_rows(parsed.iter()).unwrap();
2740    }
2741
2742    #[test]
2743    #[should_panic(expected = "index out of bounds")]
2744    fn test_invalid_empty() {
2745        let binary_row: &[u8] = &[];
2746
2747        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2748        let parser = converter.parser();
2749        let utf8_row = parser.parse(binary_row.as_ref());
2750
2751        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2752    }
2753
2754    #[test]
2755    #[should_panic(expected = "index out of bounds")]
2756    fn test_invalid_empty_array() {
2757        let row: &[u8] = &[];
2758        let binary_rows = BinaryArray::from(vec![row]);
2759
2760        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2761        let parsed = converter.from_binary(binary_rows);
2762
2763        converter.convert_rows(parsed.iter()).unwrap();
2764    }
2765
2766    #[test]
2767    #[should_panic(expected = "index out of bounds")]
2768    fn test_invalid_truncated() {
2769        let binary_row: &[u8] = &[0x02];
2770
2771        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2772        let parser = converter.parser();
2773        let utf8_row = parser.parse(binary_row.as_ref());
2774
2775        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2776    }
2777
2778    #[test]
2779    #[should_panic(expected = "index out of bounds")]
2780    fn test_invalid_truncated_array() {
2781        let row: &[u8] = &[0x02];
2782        let binary_rows = BinaryArray::from(vec![row]);
2783
2784        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2785        let parsed = converter.from_binary(binary_rows);
2786
2787        converter.convert_rows(parsed.iter()).unwrap();
2788    }
2789
2790    #[test]
2791    #[should_panic(expected = "rows were not produced by this RowConverter")]
2792    fn test_different_converter() {
2793        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2794        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2795        let rows = converter.convert_columns(&[values]).unwrap();
2796
2797        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2798        let _ = converter.convert_rows(&rows);
2799    }
2800
2801    fn test_single_list<O: OffsetSizeTrait>() {
2802        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2803        builder.values().append_value(32);
2804        builder.values().append_value(52);
2805        builder.values().append_value(32);
2806        builder.append(true);
2807        builder.values().append_value(32);
2808        builder.values().append_value(52);
2809        builder.values().append_value(12);
2810        builder.append(true);
2811        builder.values().append_value(32);
2812        builder.values().append_value(52);
2813        builder.append(true);
2814        builder.values().append_value(32); // MASKED
2815        builder.values().append_value(52); // MASKED
2816        builder.append(false);
2817        builder.values().append_value(32);
2818        builder.values().append_null();
2819        builder.append(true);
2820        builder.append(true);
2821        builder.values().append_value(17); // MASKED
2822        builder.values().append_null(); // MASKED
2823        builder.append(false);
2824
2825        let list = Arc::new(builder.finish()) as ArrayRef;
2826        let d = list.data_type().clone();
2827
2828        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2829
2830        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2831        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2832        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2833        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2834        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2835        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2836        assert!(rows.row(3) < rows.row(5)); // null < []
2837        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2838
2839        let back = converter.convert_rows(&rows).unwrap();
2840        assert_eq!(back.len(), 1);
2841        back[0].to_data().validate_full().unwrap();
2842        assert_eq!(&back[0], &list);
2843
2844        let options = SortOptions::default().asc().with_nulls_first(false);
2845        let field = SortField::new_with_options(d.clone(), options);
2846        let converter = RowConverter::new(vec![field]).unwrap();
2847        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2848
2849        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2850        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
2851        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2852        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2853        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
2854        assert!(rows.row(3) > rows.row(5)); // null > []
2855        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2856
2857        let back = converter.convert_rows(&rows).unwrap();
2858        assert_eq!(back.len(), 1);
2859        back[0].to_data().validate_full().unwrap();
2860        assert_eq!(&back[0], &list);
2861
2862        let options = SortOptions::default().desc().with_nulls_first(false);
2863        let field = SortField::new_with_options(d.clone(), options);
2864        let converter = RowConverter::new(vec![field]).unwrap();
2865        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2866
2867        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2868        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2869        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
2870        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
2871        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2872        assert!(rows.row(3) > rows.row(5)); // null > []
2873        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2874
2875        let back = converter.convert_rows(&rows).unwrap();
2876        assert_eq!(back.len(), 1);
2877        back[0].to_data().validate_full().unwrap();
2878        assert_eq!(&back[0], &list);
2879
2880        let options = SortOptions::default().desc().with_nulls_first(true);
2881        let field = SortField::new_with_options(d, options);
2882        let converter = RowConverter::new(vec![field]).unwrap();
2883        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2884
2885        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2886        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
2887        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
2888        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
2889        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
2890        assert!(rows.row(3) < rows.row(5)); // null < []
2891        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
2892
2893        let back = converter.convert_rows(&rows).unwrap();
2894        assert_eq!(back.len(), 1);
2895        back[0].to_data().validate_full().unwrap();
2896        assert_eq!(&back[0], &list);
2897
2898        let sliced_list = list.slice(1, 5);
2899        let rows_on_sliced_list = converter
2900            .convert_columns(&[Arc::clone(&sliced_list)])
2901            .unwrap();
2902
2903        assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); // [32, 52] > [32, 52, 12]
2904        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52]
2905        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null] < [32, 52]
2906        assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); // [] > [32, 52]
2907        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < []
2908
2909        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2910        assert_eq!(back.len(), 1);
2911        back[0].to_data().validate_full().unwrap();
2912        assert_eq!(&back[0], &sliced_list);
2913    }
2914
2915    fn test_nested_list<O: OffsetSizeTrait>() {
2916        let mut builder =
2917            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2918
2919        builder.values().values().append_value(1);
2920        builder.values().values().append_value(2);
2921        builder.values().append(true);
2922        builder.values().values().append_value(1);
2923        builder.values().values().append_null();
2924        builder.values().append(true);
2925        builder.append(true);
2926
2927        builder.values().values().append_value(1);
2928        builder.values().values().append_null();
2929        builder.values().append(true);
2930        builder.values().values().append_value(1);
2931        builder.values().values().append_null();
2932        builder.values().append(true);
2933        builder.append(true);
2934
2935        builder.values().values().append_value(1);
2936        builder.values().values().append_null();
2937        builder.values().append(true);
2938        builder.values().append(false);
2939        builder.append(true);
2940        builder.append(false);
2941
2942        builder.values().values().append_value(1);
2943        builder.values().values().append_value(2);
2944        builder.values().append(true);
2945        builder.append(true);
2946
2947        let list = Arc::new(builder.finish()) as ArrayRef;
2948        let d = list.data_type().clone();
2949
2950        // [
2951        //   [[1, 2], [1, null]],
2952        //   [[1, null], [1, null]],
2953        //   [[1, null], null]
2954        //   null
2955        //   [[1, 2]]
2956        // ]
2957        let options = SortOptions::default().asc().with_nulls_first(true);
2958        let field = SortField::new_with_options(d.clone(), options);
2959        let converter = RowConverter::new(vec![field]).unwrap();
2960        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2961
2962        assert!(rows.row(0) > rows.row(1));
2963        assert!(rows.row(1) > rows.row(2));
2964        assert!(rows.row(2) > rows.row(3));
2965        assert!(rows.row(4) < rows.row(0));
2966        assert!(rows.row(4) > rows.row(1));
2967
2968        let back = converter.convert_rows(&rows).unwrap();
2969        assert_eq!(back.len(), 1);
2970        back[0].to_data().validate_full().unwrap();
2971        assert_eq!(&back[0], &list);
2972
2973        let options = SortOptions::default().desc().with_nulls_first(true);
2974        let field = SortField::new_with_options(d.clone(), options);
2975        let converter = RowConverter::new(vec![field]).unwrap();
2976        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2977
2978        assert!(rows.row(0) > rows.row(1));
2979        assert!(rows.row(1) > rows.row(2));
2980        assert!(rows.row(2) > rows.row(3));
2981        assert!(rows.row(4) > rows.row(0));
2982        assert!(rows.row(4) > rows.row(1));
2983
2984        let back = converter.convert_rows(&rows).unwrap();
2985        assert_eq!(back.len(), 1);
2986        back[0].to_data().validate_full().unwrap();
2987        assert_eq!(&back[0], &list);
2988
2989        let options = SortOptions::default().desc().with_nulls_first(false);
2990        let field = SortField::new_with_options(d, options);
2991        let converter = RowConverter::new(vec![field]).unwrap();
2992        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2993
2994        assert!(rows.row(0) < rows.row(1));
2995        assert!(rows.row(1) < rows.row(2));
2996        assert!(rows.row(2) < rows.row(3));
2997        assert!(rows.row(4) > rows.row(0));
2998        assert!(rows.row(4) < rows.row(1));
2999
3000        let back = converter.convert_rows(&rows).unwrap();
3001        assert_eq!(back.len(), 1);
3002        back[0].to_data().validate_full().unwrap();
3003        assert_eq!(&back[0], &list);
3004
3005        let sliced_list = list.slice(1, 3);
3006        let rows = converter
3007            .convert_columns(&[Arc::clone(&sliced_list)])
3008            .unwrap();
3009
3010        assert!(rows.row(0) < rows.row(1));
3011        assert!(rows.row(1) < rows.row(2));
3012
3013        let back = converter.convert_rows(&rows).unwrap();
3014        assert_eq!(back.len(), 1);
3015        back[0].to_data().validate_full().unwrap();
3016        assert_eq!(&back[0], &sliced_list);
3017    }
3018
3019    #[test]
3020    fn test_list() {
3021        test_single_list::<i32>();
3022        test_nested_list::<i32>();
3023    }
3024
3025    #[test]
3026    fn test_large_list() {
3027        test_single_list::<i64>();
3028        test_nested_list::<i64>();
3029    }
3030
3031    #[test]
3032    fn test_fixed_size_list() {
3033        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3034        builder.values().append_value(32);
3035        builder.values().append_value(52);
3036        builder.values().append_value(32);
3037        builder.append(true);
3038        builder.values().append_value(32);
3039        builder.values().append_value(52);
3040        builder.values().append_value(12);
3041        builder.append(true);
3042        builder.values().append_value(32);
3043        builder.values().append_value(52);
3044        builder.values().append_null();
3045        builder.append(true);
3046        builder.values().append_value(32); // MASKED
3047        builder.values().append_value(52); // MASKED
3048        builder.values().append_value(13); // MASKED
3049        builder.append(false);
3050        builder.values().append_value(32);
3051        builder.values().append_null();
3052        builder.values().append_null();
3053        builder.append(true);
3054        builder.values().append_null();
3055        builder.values().append_null();
3056        builder.values().append_null();
3057        builder.append(true);
3058        builder.values().append_value(17); // MASKED
3059        builder.values().append_null(); // MASKED
3060        builder.values().append_value(77); // MASKED
3061        builder.append(false);
3062
3063        let list = Arc::new(builder.finish()) as ArrayRef;
3064        let d = list.data_type().clone();
3065
3066        // Default sorting (ascending, nulls first)
3067        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3068
3069        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3070        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3071        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12]
3072        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
3073        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
3074        assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null]
3075        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
3076        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3077
3078        let back = converter.convert_rows(&rows).unwrap();
3079        assert_eq!(back.len(), 1);
3080        back[0].to_data().validate_full().unwrap();
3081        assert_eq!(&back[0], &list);
3082
3083        // Ascending, null last
3084        let options = SortOptions::default().asc().with_nulls_first(false);
3085        let field = SortField::new_with_options(d.clone(), options);
3086        let converter = RowConverter::new(vec![field]).unwrap();
3087        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3088        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
3089        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
3090        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
3091        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
3092        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
3093        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
3094        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3095
3096        let back = converter.convert_rows(&rows).unwrap();
3097        assert_eq!(back.len(), 1);
3098        back[0].to_data().validate_full().unwrap();
3099        assert_eq!(&back[0], &list);
3100
3101        // Descending, nulls last
3102        let options = SortOptions::default().desc().with_nulls_first(false);
3103        let field = SortField::new_with_options(d.clone(), options);
3104        let converter = RowConverter::new(vec![field]).unwrap();
3105        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3106        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3107        assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12]
3108        assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null]
3109        assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null]
3110        assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null]
3111        assert!(rows.row(3) > rows.row(5)); // null > [null, null, null]
3112        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3113
3114        let back = converter.convert_rows(&rows).unwrap();
3115        assert_eq!(back.len(), 1);
3116        back[0].to_data().validate_full().unwrap();
3117        assert_eq!(&back[0], &list);
3118
3119        // Descending, nulls first
3120        let options = SortOptions::default().desc().with_nulls_first(true);
3121        let field = SortField::new_with_options(d, options);
3122        let converter = RowConverter::new(vec![field]).unwrap();
3123        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3124
3125        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
3126        assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12]
3127        assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null]
3128        assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null]
3129        assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null]
3130        assert!(rows.row(3) < rows.row(5)); // null < [null, null, null]
3131        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values)
3132
3133        let back = converter.convert_rows(&rows).unwrap();
3134        assert_eq!(back.len(), 1);
3135        back[0].to_data().validate_full().unwrap();
3136        assert_eq!(&back[0], &list);
3137
3138        let sliced_list = list.slice(1, 5);
3139        let rows_on_sliced_list = converter
3140            .convert_columns(&[Arc::clone(&sliced_list)])
3141            .unwrap();
3142
3143        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // null < [32, 52, null]
3144        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // [32, null, null] < [32, 52, null]
3145        assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); // [null, null, null] > [32, 52, null]
3146        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // null < [null, null, null]
3147
3148        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3149        assert_eq!(back.len(), 1);
3150        back[0].to_data().validate_full().unwrap();
3151        assert_eq!(&back[0], &sliced_list);
3152    }
3153
3154    #[test]
3155    fn test_two_fixed_size_lists() {
3156        let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3157        // 0: [100]
3158        first.values().append_value(100);
3159        first.append(true);
3160        // 1: [101]
3161        first.values().append_value(101);
3162        first.append(true);
3163        // 2: [102]
3164        first.values().append_value(102);
3165        first.append(true);
3166        // 3: [null]
3167        first.values().append_null();
3168        first.append(true);
3169        // 4: null
3170        first.values().append_null(); // MASKED
3171        first.append(false);
3172        let first = Arc::new(first.finish()) as ArrayRef;
3173        let first_type = first.data_type().clone();
3174
3175        let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3176        // 0: [200]
3177        second.values().append_value(200);
3178        second.append(true);
3179        // 1: [201]
3180        second.values().append_value(201);
3181        second.append(true);
3182        // 2: [202]
3183        second.values().append_value(202);
3184        second.append(true);
3185        // 3: [null]
3186        second.values().append_null();
3187        second.append(true);
3188        // 4: null
3189        second.values().append_null(); // MASKED
3190        second.append(false);
3191        let second = Arc::new(second.finish()) as ArrayRef;
3192        let second_type = second.data_type().clone();
3193
3194        let converter = RowConverter::new(vec![
3195            SortField::new(first_type.clone()),
3196            SortField::new(second_type.clone()),
3197        ])
3198        .unwrap();
3199
3200        let rows = converter
3201            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3202            .unwrap();
3203
3204        let back = converter.convert_rows(&rows).unwrap();
3205        assert_eq!(back.len(), 2);
3206        back[0].to_data().validate_full().unwrap();
3207        assert_eq!(&back[0], &first);
3208        back[1].to_data().validate_full().unwrap();
3209        assert_eq!(&back[1], &second);
3210    }
3211
3212    #[test]
3213    fn test_fixed_size_list_with_variable_width_content() {
3214        let mut first = FixedSizeListBuilder::new(
3215            StructBuilder::from_fields(
3216                vec![
3217                    Field::new(
3218                        "timestamp",
3219                        DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3220                        false,
3221                    ),
3222                    Field::new("offset_minutes", DataType::Int16, false),
3223                    Field::new("time_zone", DataType::Utf8, false),
3224                ],
3225                1,
3226            ),
3227            1,
3228        );
3229        // 0: null
3230        first
3231            .values()
3232            .field_builder::<TimestampMicrosecondBuilder>(0)
3233            .unwrap()
3234            .append_null();
3235        first
3236            .values()
3237            .field_builder::<Int16Builder>(1)
3238            .unwrap()
3239            .append_null();
3240        first
3241            .values()
3242            .field_builder::<StringBuilder>(2)
3243            .unwrap()
3244            .append_null();
3245        first.values().append(false);
3246        first.append(false);
3247        // 1: [null]
3248        first
3249            .values()
3250            .field_builder::<TimestampMicrosecondBuilder>(0)
3251            .unwrap()
3252            .append_null();
3253        first
3254            .values()
3255            .field_builder::<Int16Builder>(1)
3256            .unwrap()
3257            .append_null();
3258        first
3259            .values()
3260            .field_builder::<StringBuilder>(2)
3261            .unwrap()
3262            .append_null();
3263        first.values().append(false);
3264        first.append(true);
3265        // 2: [1970-01-01 00:00:00.000000 UTC]
3266        first
3267            .values()
3268            .field_builder::<TimestampMicrosecondBuilder>(0)
3269            .unwrap()
3270            .append_value(0);
3271        first
3272            .values()
3273            .field_builder::<Int16Builder>(1)
3274            .unwrap()
3275            .append_value(0);
3276        first
3277            .values()
3278            .field_builder::<StringBuilder>(2)
3279            .unwrap()
3280            .append_value("UTC");
3281        first.values().append(true);
3282        first.append(true);
3283        // 3: [2005-09-10 13:30:00.123456 Europe/Warsaw]
3284        first
3285            .values()
3286            .field_builder::<TimestampMicrosecondBuilder>(0)
3287            .unwrap()
3288            .append_value(1126351800123456);
3289        first
3290            .values()
3291            .field_builder::<Int16Builder>(1)
3292            .unwrap()
3293            .append_value(120);
3294        first
3295            .values()
3296            .field_builder::<StringBuilder>(2)
3297            .unwrap()
3298            .append_value("Europe/Warsaw");
3299        first.values().append(true);
3300        first.append(true);
3301        let first = Arc::new(first.finish()) as ArrayRef;
3302        let first_type = first.data_type().clone();
3303
3304        let mut second = StringBuilder::new();
3305        second.append_value("somewhere near");
3306        second.append_null();
3307        second.append_value("Greenwich");
3308        second.append_value("Warsaw");
3309        let second = Arc::new(second.finish()) as ArrayRef;
3310        let second_type = second.data_type().clone();
3311
3312        let converter = RowConverter::new(vec![
3313            SortField::new(first_type.clone()),
3314            SortField::new(second_type.clone()),
3315        ])
3316        .unwrap();
3317
3318        let rows = converter
3319            .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3320            .unwrap();
3321
3322        let back = converter.convert_rows(&rows).unwrap();
3323        assert_eq!(back.len(), 2);
3324        back[0].to_data().validate_full().unwrap();
3325        assert_eq!(&back[0], &first);
3326        back[1].to_data().validate_full().unwrap();
3327        assert_eq!(&back[1], &second);
3328    }
3329
3330    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3331    where
3332        K: ArrowPrimitiveType,
3333        StandardUniform: Distribution<K::Native>,
3334    {
3335        let mut rng = rng();
3336        (0..len)
3337            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3338            .collect()
3339    }
3340
3341    fn generate_strings<O: OffsetSizeTrait>(
3342        len: usize,
3343        valid_percent: f64,
3344    ) -> GenericStringArray<O> {
3345        let mut rng = rng();
3346        (0..len)
3347            .map(|_| {
3348                rng.random_bool(valid_percent).then(|| {
3349                    let len = rng.random_range(0..100);
3350                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3351                    String::from_utf8(bytes).unwrap()
3352                })
3353            })
3354            .collect()
3355    }
3356
3357    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3358        let mut rng = rng();
3359        (0..len)
3360            .map(|_| {
3361                rng.random_bool(valid_percent).then(|| {
3362                    let len = rng.random_range(0..100);
3363                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3364                    String::from_utf8(bytes).unwrap()
3365                })
3366            })
3367            .collect()
3368    }
3369
3370    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3371        let mut rng = rng();
3372        (0..len)
3373            .map(|_| {
3374                rng.random_bool(valid_percent).then(|| {
3375                    let len = rng.random_range(0..100);
3376                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3377                    bytes
3378                })
3379            })
3380            .collect()
3381    }
3382
3383    fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3384        let edge_cases = vec![
3385            Some("bar".to_string()),
3386            Some("bar\0".to_string()),
3387            Some("LongerThan12Bytes".to_string()),
3388            Some("LongerThan12Bytez".to_string()),
3389            Some("LongerThan12Bytes\0".to_string()),
3390            Some("LongerThan12Byt".to_string()),
3391            Some("backend one".to_string()),
3392            Some("backend two".to_string()),
3393            Some("a".repeat(257)),
3394            Some("a".repeat(300)),
3395        ];
3396
3397        // Fill up to `len` by repeating edge cases and trimming
3398        let mut values = Vec::with_capacity(len);
3399        for i in 0..len {
3400            values.push(
3401                edge_cases
3402                    .get(i % edge_cases.len())
3403                    .cloned()
3404                    .unwrap_or(None),
3405            );
3406        }
3407
3408        StringViewArray::from(values)
3409    }
3410
3411    fn generate_dictionary<K>(
3412        values: ArrayRef,
3413        len: usize,
3414        valid_percent: f64,
3415    ) -> DictionaryArray<K>
3416    where
3417        K: ArrowDictionaryKeyType,
3418        K::Native: SampleUniform,
3419    {
3420        let mut rng = rng();
3421        let min_key = K::Native::from_usize(0).unwrap();
3422        let max_key = K::Native::from_usize(values.len()).unwrap();
3423        let keys: PrimitiveArray<K> = (0..len)
3424            .map(|_| {
3425                rng.random_bool(valid_percent)
3426                    .then(|| rng.random_range(min_key..max_key))
3427            })
3428            .collect();
3429
3430        let data_type =
3431            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3432
3433        let data = keys
3434            .into_data()
3435            .into_builder()
3436            .data_type(data_type)
3437            .add_child_data(values.to_data())
3438            .build()
3439            .unwrap();
3440
3441        DictionaryArray::from(data)
3442    }
3443
3444    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3445        let mut rng = rng();
3446        let width = rng.random_range(0..20);
3447        let mut builder = FixedSizeBinaryBuilder::new(width);
3448
3449        let mut b = vec![0; width as usize];
3450        for _ in 0..len {
3451            match rng.random_bool(valid_percent) {
3452                true => {
3453                    b.iter_mut().for_each(|x| *x = rng.random());
3454                    builder.append_value(&b).unwrap();
3455                }
3456                false => builder.append_null(),
3457            }
3458        }
3459
3460        builder.finish()
3461    }
3462
3463    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3464        let mut rng = rng();
3465        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3466        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3467        let b = generate_strings::<i32>(len, valid_percent);
3468        let fields = Fields::from(vec![
3469            Field::new("a", DataType::Int32, true),
3470            Field::new("b", DataType::Utf8, true),
3471        ]);
3472        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3473        StructArray::new(fields, values, Some(nulls))
3474    }
3475
3476    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3477    where
3478        F: FnOnce(usize) -> ArrayRef,
3479    {
3480        let mut rng = rng();
3481        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3482        let values_len = offsets.last().unwrap().to_usize().unwrap();
3483        let values = values(values_len);
3484        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3485        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3486        ListArray::new(field, offsets, values, Some(nulls))
3487    }
3488
3489    fn generate_column(len: usize) -> ArrayRef {
3490        let mut rng = rng();
3491        match rng.random_range(0..18) {
3492            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3493            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3494            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3495            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3496            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3497            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3498            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3499            7 => Arc::new(generate_dictionary::<Int64Type>(
3500                // Cannot test dictionaries containing null values because of #2687
3501                Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3502                len,
3503                0.8,
3504            )),
3505            8 => Arc::new(generate_dictionary::<Int64Type>(
3506                // Cannot test dictionaries containing null values because of #2687
3507                Arc::new(generate_primitive_array::<Int64Type>(
3508                    rng.random_range(1..len),
3509                    1.0,
3510                )),
3511                len,
3512                0.8,
3513            )),
3514            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3515            10 => Arc::new(generate_struct(len, 0.8)),
3516            11 => Arc::new(generate_list(len, 0.8, |values_len| {
3517                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3518            })),
3519            12 => Arc::new(generate_list(len, 0.8, |values_len| {
3520                Arc::new(generate_strings::<i32>(values_len, 0.8))
3521            })),
3522            13 => Arc::new(generate_list(len, 0.8, |values_len| {
3523                Arc::new(generate_struct(values_len, 0.8))
3524            })),
3525            14 => Arc::new(generate_string_view(len, 0.8)),
3526            15 => Arc::new(generate_byte_view(len, 0.8)),
3527            16 => Arc::new(generate_fixed_stringview_column(len)),
3528            17 => Arc::new(
3529                generate_list(len + 1000, 0.8, |values_len| {
3530                    Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3531                })
3532                .slice(500, len),
3533            ),
3534            _ => unreachable!(),
3535        }
3536    }
3537
3538    fn print_row(cols: &[SortColumn], row: usize) -> String {
3539        let t: Vec<_> = cols
3540            .iter()
3541            .map(|x| match x.values.is_valid(row) {
3542                true => {
3543                    let opts = FormatOptions::default().with_null("NULL");
3544                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3545                    formatter.value(row).to_string()
3546                }
3547                false => "NULL".to_string(),
3548            })
3549            .collect();
3550        t.join(",")
3551    }
3552
3553    fn print_col_types(cols: &[SortColumn]) -> String {
3554        let t: Vec<_> = cols
3555            .iter()
3556            .map(|x| x.values.data_type().to_string())
3557            .collect();
3558        t.join(",")
3559    }
3560
3561    #[test]
3562    #[cfg_attr(miri, ignore)]
3563    fn fuzz_test() {
3564        for _ in 0..100 {
3565            let mut rng = rng();
3566            let num_columns = rng.random_range(1..5);
3567            let len = rng.random_range(5..100);
3568            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3569
3570            let options: Vec<_> = (0..num_columns)
3571                .map(|_| SortOptions {
3572                    descending: rng.random_bool(0.5),
3573                    nulls_first: rng.random_bool(0.5),
3574                })
3575                .collect();
3576
3577            let sort_columns: Vec<_> = options
3578                .iter()
3579                .zip(&arrays)
3580                .map(|(o, c)| SortColumn {
3581                    values: Arc::clone(c),
3582                    options: Some(*o),
3583                })
3584                .collect();
3585
3586            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3587
3588            let columns: Vec<SortField> = options
3589                .into_iter()
3590                .zip(&arrays)
3591                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3592                .collect();
3593
3594            let converter = RowConverter::new(columns).unwrap();
3595            let rows = converter.convert_columns(&arrays).unwrap();
3596
3597            for i in 0..len {
3598                for j in 0..len {
3599                    let row_i = rows.row(i);
3600                    let row_j = rows.row(j);
3601                    let row_cmp = row_i.cmp(&row_j);
3602                    let lex_cmp = comparator.compare(i, j);
3603                    assert_eq!(
3604                        row_cmp,
3605                        lex_cmp,
3606                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3607                        print_row(&sort_columns, i),
3608                        print_row(&sort_columns, j),
3609                        row_i,
3610                        row_j,
3611                        print_col_types(&sort_columns)
3612                    );
3613                }
3614            }
3615
3616            // Convert rows produced from convert_columns().
3617            // Note: validate_utf8 is set to false since Row is initialized through empty_rows()
3618            let back = converter.convert_rows(&rows).unwrap();
3619            for (actual, expected) in back.iter().zip(&arrays) {
3620                actual.to_data().validate_full().unwrap();
3621                dictionary_eq(actual, expected)
3622            }
3623
3624            // Check that we can convert rows into ByteArray and then parse, convert it back to array
3625            // Note: validate_utf8 is set to true since Row is initialized through RowParser
3626            let rows = rows.try_into_binary().expect("reasonable size");
3627            let parser = converter.parser();
3628            let back = converter
3629                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3630                .unwrap();
3631            for (actual, expected) in back.iter().zip(&arrays) {
3632                actual.to_data().validate_full().unwrap();
3633                dictionary_eq(actual, expected)
3634            }
3635
3636            let rows = converter.from_binary(rows);
3637            let back = converter.convert_rows(&rows).unwrap();
3638            for (actual, expected) in back.iter().zip(&arrays) {
3639                actual.to_data().validate_full().unwrap();
3640                dictionary_eq(actual, expected)
3641            }
3642        }
3643    }
3644
3645    #[test]
3646    fn test_clear() {
3647        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3648        let mut rows = converter.empty_rows(3, 128);
3649
3650        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3651        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3652        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3653
3654        for array in arrays.iter() {
3655            rows.clear();
3656            converter
3657                .append(&mut rows, std::slice::from_ref(array))
3658                .unwrap();
3659            let back = converter.convert_rows(&rows).unwrap();
3660            assert_eq!(&back[0], array);
3661        }
3662
3663        let mut rows_expected = converter.empty_rows(3, 128);
3664        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3665
3666        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3667            assert_eq!(
3668                actual, expected,
3669                "For row {i}: expected {expected:?}, actual: {actual:?}",
3670            );
3671        }
3672    }
3673
3674    #[test]
3675    fn test_append_codec_dictionary_binary() {
3676        use DataType::*;
3677        // Dictionary RowConverter
3678        let converter = RowConverter::new(vec![SortField::new(Dictionary(
3679            Box::new(Int32),
3680            Box::new(Binary),
3681        ))])
3682        .unwrap();
3683        let mut rows = converter.empty_rows(4, 128);
3684
3685        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3686        let values = BinaryArray::from(vec![
3687            Some("a".as_bytes()),
3688            Some(b"b"),
3689            Some(b"c"),
3690            Some(b"d"),
3691        ]);
3692        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3693
3694        rows.clear();
3695        let array = Arc::new(dict_array) as ArrayRef;
3696        converter
3697            .append(&mut rows, std::slice::from_ref(&array))
3698            .unwrap();
3699        let back = converter.convert_rows(&rows).unwrap();
3700
3701        dictionary_eq(&back[0], &array);
3702    }
3703
3704    #[test]
3705    fn test_list_prefix() {
3706        let mut a = ListBuilder::new(Int8Builder::new());
3707        a.append_value([None]);
3708        a.append_value([None, None]);
3709        let a = a.finish();
3710
3711        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3712        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3713        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3714    }
3715
3716    #[test]
3717    fn map_should_be_marked_as_unsupported() {
3718        let map_data_type = Field::new_map(
3719            "map",
3720            "entries",
3721            Field::new("key", DataType::Utf8, false),
3722            Field::new("value", DataType::Utf8, true),
3723            false,
3724            true,
3725        )
3726        .data_type()
3727        .clone();
3728
3729        let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3730
3731        assert!(!is_supported, "Map should not be supported");
3732    }
3733
3734    #[test]
3735    fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3736        let map_data_type = Field::new_map(
3737            "map",
3738            "entries",
3739            Field::new("key", DataType::Utf8, false),
3740            Field::new("value", DataType::Utf8, true),
3741            false,
3742            true,
3743        )
3744        .data_type()
3745        .clone();
3746
3747        let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3748
3749        match converter {
3750            Err(ArrowError::NotYetImplemented(message)) => {
3751                assert!(
3752                    message.contains("Row format support not yet implemented for"),
3753                    "Expected NotYetImplemented error for map data type, got: {message}",
3754                );
3755            }
3756            Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3757            Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3758        }
3759    }
3760
3761    #[test]
3762    fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3763        fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3764            // 1. Convert cols into rows
3765            let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3766
3767            // 2a. Convert rows into colsa (validate_utf8 = false)
3768            let rows = converter.convert_columns(&[col]).unwrap();
3769            let converted = converter.convert_rows(&rows).unwrap();
3770            let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3771
3772            // 2b. Convert rows into cols (validate_utf8 = true since Row is initialized through RowParser)
3773            let rows = rows.try_into_binary().expect("reasonable size");
3774            let parser = converter.parser();
3775            let converted = converter
3776                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3777                .unwrap();
3778            let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3779            (unchecked_values_len, checked_values_len)
3780        }
3781
3782        // Case1. StringViewArray with inline strings
3783        let col = Arc::new(StringViewArray::from_iter([
3784            Some("hello"), // short(5)
3785            None,          // null
3786            Some("short"), // short(5)
3787            Some("tiny"),  // short(4)
3788        ])) as ArrayRef;
3789
3790        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3791        // Since there are no long (>12) strings, len of values buffer is 0
3792        assert_eq!(unchecked_values_len, 0);
3793        // When utf8 validation enabled, values buffer includes inline strings (5+5+4)
3794        assert_eq!(checked_values_len, 14);
3795
3796        // Case2. StringViewArray with long(>12) strings
3797        let col = Arc::new(StringViewArray::from_iter([
3798            Some("this is a very long string over 12 bytes"),
3799            Some("another long string to test the buffer"),
3800        ])) as ArrayRef;
3801
3802        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3803        // Since there are no inline strings, expected length of values buffer is the same
3804        assert!(unchecked_values_len > 0);
3805        assert_eq!(unchecked_values_len, checked_values_len);
3806
3807        // Case3. StringViewArray with both short and long strings
3808        let col = Arc::new(StringViewArray::from_iter([
3809            Some("tiny"),          // 4 (short)
3810            Some("thisisexact13"), // 13 (long)
3811            None,
3812            Some("short"), // 5 (short)
3813        ])) as ArrayRef;
3814
3815        let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3816        // Since there is single long string, len of values buffer is 13
3817        assert_eq!(unchecked_values_len, 13);
3818        assert!(checked_values_len > unchecked_values_len);
3819    }
3820
3821    #[test]
3822    fn test_sparse_union() {
3823        // create a sparse union with Int32 (type_id = 0) and Utf8 (type_id = 1)
3824        let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3825        let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
3826
3827        // [1, "b", 3, "d", 5]
3828        let type_ids = vec![0, 1, 0, 1, 0].into();
3829
3830        let union_fields = [
3831            (0, Arc::new(Field::new("int", DataType::Int32, false))),
3832            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3833        ]
3834        .into_iter()
3835        .collect();
3836
3837        let union_array = UnionArray::try_new(
3838            union_fields,
3839            type_ids,
3840            None,
3841            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3842        )
3843        .unwrap();
3844
3845        let union_type = union_array.data_type().clone();
3846        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3847
3848        let rows = converter
3849            .convert_columns(&[Arc::new(union_array.clone())])
3850            .unwrap();
3851
3852        // round trip
3853        let back = converter.convert_rows(&rows).unwrap();
3854        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3855
3856        assert_eq!(union_array.len(), back_union.len());
3857        for i in 0..union_array.len() {
3858            assert_eq!(union_array.type_id(i), back_union.type_id(i));
3859        }
3860    }
3861
3862    #[test]
3863    fn test_sparse_union_with_nulls() {
3864        // create a sparse union with Int32 (type_id = 0) and Utf8 (type_id = 1)
3865        let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3866        let str_array = StringArray::from(vec![None::<&str>; 5]);
3867
3868        // [1, null (both children null), 3, null (both children null), 5]
3869        let type_ids = vec![0, 1, 0, 1, 0].into();
3870
3871        let union_fields = [
3872            (0, Arc::new(Field::new("int", DataType::Int32, true))),
3873            (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3874        ]
3875        .into_iter()
3876        .collect();
3877
3878        let union_array = UnionArray::try_new(
3879            union_fields,
3880            type_ids,
3881            None,
3882            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3883        )
3884        .unwrap();
3885
3886        let union_type = union_array.data_type().clone();
3887        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3888
3889        let rows = converter
3890            .convert_columns(&[Arc::new(union_array.clone())])
3891            .unwrap();
3892
3893        // round trip
3894        let back = converter.convert_rows(&rows).unwrap();
3895        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3896
3897        assert_eq!(union_array.len(), back_union.len());
3898        for i in 0..union_array.len() {
3899            let expected_null = union_array.is_null(i);
3900            let actual_null = back_union.is_null(i);
3901            assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3902            if !expected_null {
3903                assert_eq!(union_array.type_id(i), back_union.type_id(i));
3904            }
3905        }
3906    }
3907
3908    #[test]
3909    fn test_dense_union() {
3910        // create a dense union with Int32 (type_id = 0) and use Utf8 (type_id = 1)
3911        let int_array = Int32Array::from(vec![1, 3, 5]);
3912        let str_array = StringArray::from(vec!["a", "b"]);
3913
3914        let type_ids = vec![0, 1, 0, 1, 0].into();
3915
3916        // [1, "a", 3, "b", 5]
3917        let offsets = vec![0, 0, 1, 1, 2].into();
3918
3919        let union_fields = [
3920            (0, Arc::new(Field::new("int", DataType::Int32, false))),
3921            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3922        ]
3923        .into_iter()
3924        .collect();
3925
3926        let union_array = UnionArray::try_new(
3927            union_fields,
3928            type_ids,
3929            Some(offsets), // Dense mode
3930            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3931        )
3932        .unwrap();
3933
3934        let union_type = union_array.data_type().clone();
3935        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3936
3937        let rows = converter
3938            .convert_columns(&[Arc::new(union_array.clone())])
3939            .unwrap();
3940
3941        // round trip
3942        let back = converter.convert_rows(&rows).unwrap();
3943        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3944
3945        assert_eq!(union_array.len(), back_union.len());
3946        for i in 0..union_array.len() {
3947            assert_eq!(union_array.type_id(i), back_union.type_id(i));
3948        }
3949    }
3950
3951    #[test]
3952    fn test_dense_union_with_nulls() {
3953        // create a dense union with Int32 (type_id = 0) and Utf8 (type_id = 1)
3954        let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
3955        let str_array = StringArray::from(vec![Some("a"), None]);
3956
3957        // [1, "a", 5, null (str null), null (int null)]
3958        let type_ids = vec![0, 1, 0, 1, 0].into();
3959        let offsets = vec![0, 0, 1, 1, 2].into();
3960
3961        let union_fields = [
3962            (0, Arc::new(Field::new("int", DataType::Int32, true))),
3963            (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3964        ]
3965        .into_iter()
3966        .collect();
3967
3968        let union_array = UnionArray::try_new(
3969            union_fields,
3970            type_ids,
3971            Some(offsets),
3972            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3973        )
3974        .unwrap();
3975
3976        let union_type = union_array.data_type().clone();
3977        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3978
3979        let rows = converter
3980            .convert_columns(&[Arc::new(union_array.clone())])
3981            .unwrap();
3982
3983        // round trip
3984        let back = converter.convert_rows(&rows).unwrap();
3985        let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3986
3987        assert_eq!(union_array.len(), back_union.len());
3988        for i in 0..union_array.len() {
3989            let expected_null = union_array.is_null(i);
3990            let actual_null = back_union.is_null(i);
3991            assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3992            if !expected_null {
3993                assert_eq!(union_array.type_id(i), back_union.type_id(i));
3994            }
3995        }
3996    }
3997
3998    #[test]
3999    fn test_union_ordering() {
4000        let int_array = Int32Array::from(vec![100, 5, 20]);
4001        let str_array = StringArray::from(vec!["z", "a"]);
4002
4003        // [100, "z", 5, "a", 20]
4004        let type_ids = vec![0, 1, 0, 1, 0].into();
4005        let offsets = vec![0, 0, 1, 1, 2].into();
4006
4007        let union_fields = [
4008            (0, Arc::new(Field::new("int", DataType::Int32, false))),
4009            (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4010        ]
4011        .into_iter()
4012        .collect();
4013
4014        let union_array = UnionArray::try_new(
4015            union_fields,
4016            type_ids,
4017            Some(offsets),
4018            vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4019        )
4020        .unwrap();
4021
4022        let union_type = union_array.data_type().clone();
4023        let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4024
4025        let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
4026
4027        /*
4028        expected ordering
4029
4030        row 2: 5    - type_id 0
4031        row 4: 20   - type_id 0
4032        row 0: 100  - type id 0
4033        row 3: "a"  - type id 1
4034        row 1: "z"  - type id 1
4035        */
4036
4037        // 5 < "z"
4038        assert!(rows.row(2) < rows.row(1));
4039
4040        // 100 < "a"
4041        assert!(rows.row(0) < rows.row(3));
4042
4043        // among ints
4044        // 5 < 20
4045        assert!(rows.row(2) < rows.row(4));
4046        // 20 < 100
4047        assert!(rows.row(4) < rows.row(0));
4048
4049        // among strigns
4050        // "a" < "z"
4051        assert!(rows.row(3) < rows.row(1));
4052    }
4053}