Skip to main content

parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::column::chunker::CdcChunk;
44use crate::column::writer::LevelDataRef;
45use crate::errors::{ParquetError, Result};
46use arrow_array::cast::AsArray;
47use arrow_array::types::RunEndIndexType;
48use arrow_array::{Array, ArrayRef, Int32Array, OffsetSizeTrait, RunArray, downcast_run_array};
49use arrow_buffer::bit_iterator::BitIndexIterator;
50use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
51use arrow_schema::{DataType, Field};
52use std::ops::Range;
53use std::sync::Arc;
54
55/// Expands a [`DataType::RunEndEncoded`] array into a flat (logical) array of its values type.
56///
57/// use `arrow_select::take` to materialize the  full-length flat array.
58/// This is intentionally simple (O(n)); efficiency can/should be improved
59fn expand_ree_array(array: &ArrayRef) -> Result<ArrayRef> {
60    downcast_run_array!(
61        array => expand_typed_ree(array),
62        _ => unreachable!("expand_ree_array called on non-REE array"),
63    )
64}
65
66fn expand_typed_ree<R: RunEndIndexType>(run_array: &RunArray<R>) -> Result<ArrayRef> {
67    let run_ends = run_array.run_ends();
68    let values = run_array.values();
69    let len = run_array.len();
70    let indices: Int32Array = (0..len)
71        .map(|i| run_ends.get_physical_index(i) as i32)
72        .collect();
73    arrow_select::take::take(values.as_ref(), &indices, None)
74        .map_err(|e| arrow_err!("Failed to expand REE array: {}", e))
75}
76
77/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
78/// for each leaf column encountered
79pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
80    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
81    builder.write(0..array.len());
82    Ok(builder.finish())
83}
84
85/// Returns true if the DataType can be represented as a primitive parquet column,
86/// i.e. a leaf array with no children
87fn is_leaf(data_type: &DataType) -> bool {
88    matches!(
89        data_type,
90        DataType::Null
91            | DataType::Boolean
92            | DataType::Int8
93            | DataType::Int16
94            | DataType::Int32
95            | DataType::Int64
96            | DataType::UInt8
97            | DataType::UInt16
98            | DataType::UInt32
99            | DataType::UInt64
100            | DataType::Float16
101            | DataType::Float32
102            | DataType::Float64
103            | DataType::Utf8
104            | DataType::Utf8View
105            | DataType::LargeUtf8
106            | DataType::Timestamp(_, _)
107            | DataType::Date32
108            | DataType::Date64
109            | DataType::Time32(_)
110            | DataType::Time64(_)
111            | DataType::Duration(_)
112            | DataType::Interval(_)
113            | DataType::Binary
114            | DataType::LargeBinary
115            | DataType::BinaryView
116            | DataType::Decimal32(_, _)
117            | DataType::Decimal64(_, _)
118            | DataType::Decimal128(_, _)
119            | DataType::Decimal256(_, _)
120            | DataType::FixedSizeBinary(_)
121    )
122}
123
124/// The definition and repetition level of an array within a potentially nested hierarchy
125#[derive(Debug, Default, Clone, Copy)]
126struct LevelContext {
127    /// The current repetition level
128    rep_level: i16,
129    /// The current definition level
130    def_level: i16,
131}
132
133/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
134#[derive(Debug)]
135enum LevelInfoBuilder {
136    /// A primitive, leaf array
137    Primitive(ArrayLevels),
138    /// A list array
139    List(
140        Box<LevelInfoBuilder>, // Child Values
141        LevelContext,          // Context
142        OffsetBuffer<i32>,     // Offsets
143        Option<NullBuffer>,    // Nulls
144        bool,                  // is_last_level (child has no nested rep)
145    ),
146    /// A large list array
147    LargeList(
148        Box<LevelInfoBuilder>, // Child Values
149        LevelContext,          // Context
150        OffsetBuffer<i64>,     // Offsets
151        Option<NullBuffer>,    // Nulls
152        bool,                  // is_last_level (child has no nested rep)
153    ),
154    /// A fixed size list array
155    FixedSizeList(
156        Box<LevelInfoBuilder>, // Values
157        LevelContext,          // Context
158        usize,                 // List Size
159        Option<NullBuffer>,    // Nulls
160    ),
161    /// A list view array
162    ListView(
163        Box<LevelInfoBuilder>, // Child Values
164        LevelContext,          // Context
165        ScalarBuffer<i32>,     // Offsets
166        ScalarBuffer<i32>,     // Sizes
167        Option<NullBuffer>,    // Nulls
168    ),
169    /// A large list view array
170    LargeListView(
171        Box<LevelInfoBuilder>, // Child Values
172        LevelContext,          // Context
173        ScalarBuffer<i64>,     // Offsets
174        ScalarBuffer<i64>,     // Sizes
175        Option<NullBuffer>,    // Nulls
176    ),
177    /// A struct array
178    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
179}
180
181/// Minimum sub-range length before the bulk-fill fast path in `write_leaf`
182/// becomes profitable for null-heavy leaf columns. Below this, per-call
183/// slice + popcount overhead regresses list/struct paths that call
184/// `write_leaf` many times with tiny ranges. Picked via threshold sweep;
185/// see <https://github.com/apache/arrow-rs/pull/9967> for the rationale.
186const BULK_FILL_MIN_LEN: usize = 64;
187
188impl LevelInfoBuilder {
189    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
190    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
191        if !Self::types_compatible(field.data_type(), array.data_type()) {
192            return Err(arrow_err!(format!(
193                "Incompatible type. Field '{}' has type {}, array has type {}",
194                field.name(),
195                field.data_type(),
196                array.data_type(),
197            )));
198        }
199
200        let is_nullable = field.is_nullable();
201
202        match array.data_type() {
203            d if is_leaf(d) => {
204                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
205                Ok(Self::Primitive(levels))
206            }
207            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
208                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
209                Ok(Self::Primitive(levels))
210            }
211            DataType::RunEndEncoded(_, value_field) => {
212                let flat = expand_ree_array(array)?;
213                let flat_field = Field::new(
214                    field.name(),
215                    value_field.data_type().clone(),
216                    field.is_nullable(),
217                );
218                Self::try_new(&flat_field, parent_ctx, &flat)
219            }
220            DataType::Struct(children) => {
221                let array = array.as_struct();
222                let def_level = match is_nullable {
223                    true => parent_ctx.def_level + 1,
224                    false => parent_ctx.def_level,
225                };
226
227                let ctx = LevelContext {
228                    rep_level: parent_ctx.rep_level,
229                    def_level,
230                };
231
232                let children = children
233                    .iter()
234                    .zip(array.columns())
235                    .map(|(f, a)| Self::try_new(f, ctx, a))
236                    .collect::<Result<_>>()?;
237
238                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
239            }
240            DataType::List(child)
241            | DataType::LargeList(child)
242            | DataType::Map(child, _)
243            | DataType::FixedSizeList(child, _)
244            | DataType::ListView(child)
245            | DataType::LargeListView(child) => {
246                let def_level = match is_nullable {
247                    true => parent_ctx.def_level + 2,
248                    false => parent_ctx.def_level + 1,
249                };
250
251                let ctx = LevelContext {
252                    rep_level: parent_ctx.rep_level + 1,
253                    def_level,
254                };
255
256                Ok(match field.data_type() {
257                    DataType::List(_) => {
258                        let list = array.as_list();
259                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
260                        let is_last = child.child_has_no_nested_rep();
261                        let offsets = list.offsets().clone();
262                        Self::List(
263                            Box::new(child),
264                            ctx,
265                            offsets,
266                            list.nulls().cloned(),
267                            is_last,
268                        )
269                    }
270                    DataType::LargeList(_) => {
271                        let list = array.as_list();
272                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
273                        let is_last = child.child_has_no_nested_rep();
274                        let offsets = list.offsets().clone();
275                        let nulls = list.nulls().cloned();
276                        Self::LargeList(Box::new(child), ctx, offsets, nulls, is_last)
277                    }
278                    DataType::Map(_, _) => {
279                        let map = array.as_map();
280                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
281                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
282                        let is_last = child.child_has_no_nested_rep();
283                        let offsets = map.offsets().clone();
284                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned(), is_last)
285                    }
286                    DataType::FixedSizeList(_, size) => {
287                        let list = array.as_fixed_size_list();
288                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
289                        let nulls = list.nulls().cloned();
290                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
291                    }
292                    DataType::ListView(_) => {
293                        let list = array.as_list_view();
294                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
295                        let offsets = list.offsets().clone();
296                        let sizes = list.sizes().clone();
297                        let nulls = list.nulls().cloned();
298                        Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
299                    }
300                    DataType::LargeListView(_) => {
301                        let list = array.as_list_view();
302                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
303                        let offsets = list.offsets().clone();
304                        let sizes = list.sizes().clone();
305                        let nulls = list.nulls().cloned();
306                        Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
307                    }
308                    _ => unreachable!(),
309                })
310            }
311            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
312        }
313    }
314
315    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
316    /// as enumerated by a depth-first search
317    fn finish(self) -> Vec<ArrayLevels> {
318        match self {
319            LevelInfoBuilder::Primitive(v) => vec![v],
320            LevelInfoBuilder::List(v, _, _, _, _)
321            | LevelInfoBuilder::LargeList(v, _, _, _, _)
322            | LevelInfoBuilder::FixedSizeList(v, _, _, _)
323            | LevelInfoBuilder::ListView(v, _, _, _, _)
324            | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
325            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
326        }
327    }
328
329    /// Given an `array`, write the level data for the elements in `range`
330    fn write(&mut self, range: Range<usize>) {
331        match self {
332            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
333            LevelInfoBuilder::List(child, ctx, offsets, nulls, is_last) => {
334                Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last)
335            }
336            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls, is_last) => {
337                Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last)
338            }
339            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
340                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
341            }
342            LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
343                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
344            }
345            LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
346                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
347            }
348            LevelInfoBuilder::Struct(children, ctx, nulls) => {
349                Self::write_struct(children, ctx, nulls.as_ref(), range)
350            }
351        }
352    }
353
354    /// Returns `true` if the child contains no nested repetition levels, meaning
355    /// each child element produces exactly one rep_level entry in the leaf.
356    /// This is true for `Primitive` children and `Struct` trees with no list descendants.
357    fn child_has_no_nested_rep(&self) -> bool {
358        match self {
359            LevelInfoBuilder::Primitive(_) => true,
360            LevelInfoBuilder::Struct(children, _, _) => {
361                children.iter().all(|c| c.child_has_no_nested_rep())
362            }
363            _ => false,
364        }
365    }
366
367    /// Write `range` elements from ListArray `array`
368    ///
369    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
370    fn write_list<O: OffsetSizeTrait>(
371        child: &mut LevelInfoBuilder,
372        ctx: &LevelContext,
373        offsets: &[O],
374        nulls: Option<&NullBuffer>,
375        range: Range<usize>,
376        is_last_level: bool,
377    ) {
378        // Fast path: entire list array is null; emit bulk null rep/def levels
379        if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
380            let count = range.end - range.start;
381            child.visit_leaves(|leaf| {
382                leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
383            });
384            return;
385        }
386
387        // Fast path for "last-level list": when the child has no nested rep_levels,
388        // each child element produces exactly one rep_level entry. We can batch
389        // contiguous non-empty list slots into a single child.write() call, then
390        // fix up the rep_levels at list-slot boundaries using offsets directly.
391        //
392        // Kept as a separate function so the compiler can optimize write_list's
393        // hot loop independently (function body size affects codegen quality).
394        if is_last_level {
395            Self::write_list_last_level(child, ctx, offsets, nulls, range);
396            return;
397        }
398
399        let offsets = &offsets[range.start..range.end + 1];
400
401        let write_non_null_slice =
402            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
403                child.write(start_idx..end_idx);
404                child.visit_leaves(|leaf| {
405                    let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
406                    let mut rev = rep_levels.iter_mut().rev();
407                    let mut remaining = end_idx - start_idx;
408
409                    loop {
410                        let next = rev.next().unwrap();
411                        if *next > ctx.rep_level {
412                            // Nested element - ignore
413                            continue;
414                        }
415
416                        remaining -= 1;
417                        if remaining == 0 {
418                            *next = ctx.rep_level - 1;
419                            break;
420                        }
421                    }
422                })
423            };
424
425        // In a list column, each row falls into one of three categories:
426        // - "null": the list slot is absent (!is_valid), encoded at def_level - 2
427        // - "empty": the list slot is present but has zero elements
428        //   (offsets[i] == offsets[i+1]), encoded at def_level - 1
429        // - non-empty: the list slot has child values, which are recursed into
430        //
431        // Consecutive runs of null or empty rows are batched and written together.
432        let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
433            if count > 0 {
434                child.visit_leaves(|leaf| {
435                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
436                    leaf.append_def_level_run(ctx.def_level - 2, count);
437                });
438            }
439        };
440
441        let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
442            if count > 0 {
443                child.visit_leaves(|leaf| {
444                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
445                    leaf.append_def_level_run(ctx.def_level - 1, count);
446                });
447            }
448        };
449
450        match nulls {
451            Some(nulls) => {
452                let null_offset = range.start;
453                let mut pending_nulls: usize = 0;
454                let mut pending_empties: usize = 0;
455
456                // TODO: Faster bitmask iteration (#1757)
457                for (idx, w) in offsets.windows(2).enumerate() {
458                    let is_valid = nulls.is_valid(idx + null_offset);
459                    let start_idx = w[0].as_usize();
460                    let end_idx = w[1].as_usize();
461
462                    if !is_valid {
463                        write_empty_run(child, pending_empties);
464                        pending_empties = 0;
465                        pending_nulls += 1;
466                    } else if start_idx == end_idx {
467                        write_null_run(child, pending_nulls);
468                        pending_nulls = 0;
469                        pending_empties += 1;
470                    } else {
471                        write_null_run(child, pending_nulls);
472                        pending_nulls = 0;
473                        write_empty_run(child, pending_empties);
474                        pending_empties = 0;
475                        write_non_null_slice(child, start_idx, end_idx);
476                    }
477                }
478                write_null_run(child, pending_nulls);
479                write_empty_run(child, pending_empties);
480            }
481            None => {
482                let mut pending_empties: usize = 0;
483                for w in offsets.windows(2) {
484                    let start_idx = w[0].as_usize();
485                    let end_idx = w[1].as_usize();
486                    if start_idx == end_idx {
487                        pending_empties += 1;
488                    } else {
489                        write_empty_run(child, pending_empties);
490                        pending_empties = 0;
491                        write_non_null_slice(child, start_idx, end_idx);
492                    }
493                }
494                write_empty_run(child, pending_empties);
495            }
496        }
497    }
498
499    /// Optimized write path for lists whose child has no nested repetition levels.
500    ///
501    /// When the child is a leaf (or a struct of leaves), each child element maps to
502    /// exactly one rep_level entry. This lets us batch contiguous non-empty list
503    /// slots into a single `child.write()` call, then stamp the list-start markers
504    /// at positions computed directly from offsets — avoiding per-slot `write` +
505    /// reverse-scan overhead.
506    fn write_list_last_level<O: OffsetSizeTrait>(
507        child: &mut LevelInfoBuilder,
508        ctx: &LevelContext,
509        offsets: &[O],
510        nulls: Option<&NullBuffer>,
511        range: Range<usize>,
512    ) {
513        let null_offset = range.start;
514        let offsets = &offsets[range.start..range.end + 1];
515        let list_start_rep = ctx.rep_level - 1;
516
517        let emit_nulls = |child: &mut LevelInfoBuilder, count: usize| {
518            child.visit_leaves(|leaf| {
519                leaf.append_rep_level_run(list_start_rep, count);
520                leaf.append_def_level_run(ctx.def_level - 2, count);
521            });
522        };
523
524        let emit_empties = |child: &mut LevelInfoBuilder, count: usize| {
525            child.visit_leaves(|leaf| {
526                leaf.append_rep_level_run(list_start_rep, count);
527                leaf.append_def_level_run(ctx.def_level - 1, count);
528            });
529        };
530
531        let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| {
532            debug_assert!(run_offsets.len() >= 2);
533            let values_start = run_offsets[0].as_usize();
534            let values_end = run_offsets[run_offsets.len() - 1].as_usize();
535            debug_assert!(values_end > values_start);
536
537            // Write all leaf values in one batch. Since the child has no nested
538            // rep, this emits (values_end - values_start) rep_levels all equal
539            // to ctx.rep_level (= "continuation within list").
540            child.write(values_start..values_end);
541
542            // The first element of each list slot needs rep_level =
543            // list_start_rep to mark a new list boundary. Because there's a 1:1
544            // mapping between child elements and rep_level entries, the position
545            // of each slot's first element is directly computable from offsets.
546            child.visit_leaves(|leaf| {
547                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
548                let batch_len = values_end - values_start;
549                let batch_base = rep_levels.len() - batch_len;
550
551                for slot_offset in run_offsets.iter().take(run_offsets.len() - 1) {
552                    let list_start_pos = batch_base + (slot_offset.as_usize() - values_start);
553                    rep_levels[list_start_pos] = list_start_rep;
554                }
555            });
556        };
557
558        // Classify each slot, detect run boundaries, flush on transition.
559        #[derive(Clone, Copy, PartialEq)]
560        enum SlotKind {
561            Null,
562            Empty,
563            NonEmpty,
564        }
565
566        let num_slots = offsets.len() - 1;
567        if num_slots == 0 {
568            return;
569        }
570
571        macro_rules! classify {
572            ($i:expr, $nulls:expr) => {
573                if !$nulls.is_valid($i + null_offset) {
574                    SlotKind::Null
575                } else if offsets[$i] == offsets[$i + 1] {
576                    SlotKind::Empty
577                } else {
578                    SlotKind::NonEmpty
579                }
580            };
581        }
582
583        macro_rules! flush_run {
584            ($kind:expr, $start:expr, $end:expr) => {
585                match $kind {
586                    SlotKind::Null => emit_nulls(child, $end - $start),
587                    SlotKind::Empty => emit_empties(child, $end - $start),
588                    SlotKind::NonEmpty => emit_non_empty_run(child, &offsets[$start..$end + 1]),
589                }
590            };
591        }
592
593        match nulls {
594            Some(nulls) => {
595                let mut run_kind = classify!(0, nulls);
596                let mut run_start: usize = 0;
597                for i in 1..num_slots {
598                    let kind = classify!(i, nulls);
599                    if kind != run_kind {
600                        flush_run!(run_kind, run_start, i);
601                        run_kind = kind;
602                        run_start = i;
603                    }
604                }
605                flush_run!(run_kind, run_start, num_slots);
606            }
607            None => {
608                let mut run_kind = if offsets[0] == offsets[1] {
609                    SlotKind::Empty
610                } else {
611                    SlotKind::NonEmpty
612                };
613                let mut run_start: usize = 0;
614                for i in 1..num_slots {
615                    let kind = if offsets[i] == offsets[i + 1] {
616                        SlotKind::Empty
617                    } else {
618                        SlotKind::NonEmpty
619                    };
620                    if kind != run_kind {
621                        flush_run!(run_kind, run_start, i);
622                        run_kind = kind;
623                        run_start = i;
624                    }
625                }
626                flush_run!(run_kind, run_start, num_slots);
627            }
628        }
629    }
630
631    /// Write `range` elements from ListViewArray `array`
632    fn write_list_view<O: OffsetSizeTrait>(
633        child: &mut LevelInfoBuilder,
634        ctx: &LevelContext,
635        offsets: &[O],
636        sizes: &[O],
637        nulls: Option<&NullBuffer>,
638        range: Range<usize>,
639    ) {
640        let offsets = &offsets[range.start..range.end];
641        let sizes = &sizes[range.start..range.end];
642
643        let write_non_null_slice =
644            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
645                child.write(start_idx..end_idx);
646                child.visit_leaves(|leaf| {
647                    let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
648                    let mut rev = rep_levels.iter_mut().rev();
649                    let mut remaining = end_idx - start_idx;
650
651                    loop {
652                        let next = rev.next().unwrap();
653                        if *next > ctx.rep_level {
654                            // Nested element - ignore
655                            continue;
656                        }
657
658                        remaining -= 1;
659                        if remaining == 0 {
660                            *next = ctx.rep_level - 1;
661                            break;
662                        }
663                    }
664                })
665            };
666
667        let write_empty_slice = |child: &mut LevelInfoBuilder| {
668            child.visit_leaves(|leaf| {
669                leaf.append_rep_level_run(ctx.rep_level - 1, 1);
670                leaf.append_def_level_run(ctx.def_level - 1, 1);
671            })
672        };
673
674        let write_null_slice = |child: &mut LevelInfoBuilder| {
675            child.visit_leaves(|leaf| {
676                leaf.append_rep_level_run(ctx.rep_level - 1, 1);
677                leaf.append_def_level_run(ctx.def_level - 2, 1);
678            })
679        };
680
681        match nulls {
682            Some(nulls) => {
683                let null_offset = range.start;
684                // TODO: Faster bitmask iteration (#1757)
685                for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
686                    let is_valid = nulls.is_valid(idx + null_offset);
687                    let start_idx = offset.as_usize();
688                    let size = size.as_usize();
689                    let end_idx = start_idx + size;
690                    if !is_valid {
691                        write_null_slice(child)
692                    } else if size == 0 {
693                        write_empty_slice(child)
694                    } else {
695                        write_non_null_slice(child, start_idx, end_idx)
696                    }
697                }
698            }
699            None => {
700                for (offset, size) in offsets.iter().zip(sizes.iter()) {
701                    let start_idx = offset.as_usize();
702                    let size = size.as_usize();
703                    let end_idx = start_idx + size;
704                    if size == 0 {
705                        write_empty_slice(child)
706                    } else {
707                        write_non_null_slice(child, start_idx, end_idx)
708                    }
709                }
710            }
711        }
712    }
713
714    /// Write `range` elements from StructArray `array`
715    fn write_struct(
716        children: &mut [LevelInfoBuilder],
717        ctx: &LevelContext,
718        nulls: Option<&NullBuffer>,
719        range: Range<usize>,
720    ) {
721        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
722            let len = range.end - range.start;
723            for child in children {
724                child.visit_leaves(|info| {
725                    info.extend_uniform_levels(ctx.def_level - 1, ctx.rep_level, len);
726                })
727            }
728        };
729
730        // Fast path: entire struct array is null; emit bulk null def/rep levels
731        if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
732            write_null(children, range);
733            return;
734        }
735
736        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
737            for child in children {
738                child.write(range.clone())
739            }
740        };
741
742        match nulls {
743            Some(validity) => {
744                let mut last_non_null_idx = None;
745                let mut last_null_idx = None;
746
747                // TODO: Faster bitmask iteration (#1757)
748                for i in range.clone() {
749                    match validity.is_valid(i) {
750                        true => {
751                            if let Some(last_idx) = last_null_idx.take() {
752                                write_null(children, last_idx..i)
753                            }
754                            last_non_null_idx.get_or_insert(i);
755                        }
756                        false => {
757                            if let Some(last_idx) = last_non_null_idx.take() {
758                                write_non_null(children, last_idx..i)
759                            }
760                            last_null_idx.get_or_insert(i);
761                        }
762                    }
763                }
764
765                if let Some(last_idx) = last_null_idx.take() {
766                    write_null(children, last_idx..range.end)
767                }
768
769                if let Some(last_idx) = last_non_null_idx.take() {
770                    write_non_null(children, last_idx..range.end)
771                }
772            }
773            None => write_non_null(children, range),
774        }
775    }
776
777    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
778    fn write_fixed_size_list(
779        child: &mut LevelInfoBuilder,
780        ctx: &LevelContext,
781        fixed_size: usize,
782        nulls: Option<&NullBuffer>,
783        range: Range<usize>,
784    ) {
785        // Fast path: entire fixed-size list array is null
786        if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
787            let count = range.end - range.start;
788            child.visit_leaves(|leaf| {
789                leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
790            });
791            return;
792        }
793
794        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
795            let values_start = start_idx * fixed_size;
796            let values_end = end_idx * fixed_size;
797            child.write(values_start..values_end);
798
799            child.visit_leaves(|leaf| {
800                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
801
802                let row_indices = (0..fixed_size)
803                    .rev()
804                    .cycle()
805                    .take(values_end - values_start);
806
807                // Step backward over the child rep levels and mark the start of each list
808                rep_levels
809                    .iter_mut()
810                    .rev()
811                    // Filter out reps from nested children
812                    .filter(|&&mut r| r == ctx.rep_level)
813                    .zip(row_indices)
814                    .for_each(|(r, idx)| {
815                        if idx == 0 {
816                            *r = ctx.rep_level - 1;
817                        }
818                    });
819            })
820        };
821
822        // If list size is 0, ignore values and just write rep/def levels.
823        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
824            let len = end_idx - start_idx;
825            child.visit_leaves(|leaf| {
826                leaf.append_rep_level_run(ctx.rep_level - 1, len);
827                leaf.append_def_level_run(ctx.def_level - 1, len);
828            })
829        };
830
831        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
832            if fixed_size > 0 {
833                write_non_null(child, start_idx, end_idx)
834            } else {
835                write_empty(child, start_idx, end_idx)
836            }
837        };
838
839        match nulls {
840            Some(nulls) => {
841                let mut start_idx = None;
842                for idx in range.clone() {
843                    if nulls.is_valid(idx) {
844                        // Start a run of valid rows if not already inside of one
845                        start_idx.get_or_insert(idx);
846                    } else {
847                        // Write out any pending valid rows
848                        if let Some(start) = start_idx.take() {
849                            write_rows(child, start, idx);
850                        }
851                        // Add null row
852                        child.visit_leaves(|leaf| {
853                            leaf.append_rep_level_run(ctx.rep_level - 1, 1);
854                            leaf.append_def_level_run(ctx.def_level - 2, 1);
855                        })
856                    }
857                }
858                // Write out any remaining valid rows
859                if let Some(start) = start_idx.take() {
860                    write_rows(child, start, range.end);
861                }
862            }
863            // If all rows are valid then write the whole array
864            None => write_rows(child, range.start, range.end),
865        }
866    }
867
868    /// Write a primitive array, as defined by [`is_leaf`]
869    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
870        let len = range.end - range.start;
871
872        // Fast path: entire leaf array is null
873        if let Some(nulls) = &info.logical_nulls {
874            if !matches!(info.def_levels, LevelData::Absent) && nulls.null_count() == nulls.len() {
875                info.extend_uniform_levels(info.max_def_level - 1, info.max_rep_level, len);
876                return;
877            }
878        }
879
880        if matches!(info.def_levels, LevelData::Absent) {
881            info.non_null_indices.extend(range.clone());
882        } else {
883            let max_def_level = info.max_def_level;
884            match &info.logical_nulls {
885                Some(nulls) => {
886                    assert!(range.end <= nulls.len());
887                    // Bulk-fill is profitable only on null-heavy ranges long enough to
888                    // amortize the slice/popcount cost; see `BULK_FILL_MIN_LEN` and the
889                    // PR description for the threshold sweep. The gate uses the cached
890                    // buffer-wide `null_count` (O(1)) to stay cheap on the cold path.
891                    if len >= BULK_FILL_MIN_LEN && nulls.null_count() * 2 >= nulls.len() {
892                        let range_nulls = nulls.slice(range.start, len);
893                        let valid_in_range = len - range_nulls.null_count();
894                        let null_def_level = max_def_level - 1;
895                        let buf = info
896                            .def_levels
897                            .materialize_mut()
898                            .expect("definition levels present");
899                        let base = buf.len();
900                        buf.resize(base + len, null_def_level);
901                        for i in range_nulls.valid_indices() {
902                            buf[base + i] = max_def_level;
903                        }
904                        info.non_null_indices.reserve(valid_in_range);
905                        info.non_null_indices
906                            .extend(range_nulls.valid_indices().map(|i| i + range.start));
907                    } else {
908                        let bits = nulls.inner();
909                        info.def_levels.extend_from_iter(range.clone().map(|i| {
910                            // Safety: range.end was asserted to be in bounds earlier
911                            let valid = unsafe { bits.value_unchecked(i) };
912                            max_def_level - (!valid as i16)
913                        }));
914                        info.non_null_indices.reserve(len);
915                        info.non_null_indices.extend(
916                            BitIndexIterator::new(bits.inner(), bits.offset() + range.start, len)
917                                .map(|i| i + range.start),
918                        );
919                    }
920                }
921                None => {
922                    info.append_def_level_run(max_def_level, len);
923                    info.non_null_indices.reserve(len);
924                    info.non_null_indices.extend(range.clone());
925                }
926            }
927        }
928
929        if !matches!(info.rep_levels, LevelData::Absent) {
930            info.append_rep_level_run(info.max_rep_level, len);
931        }
932    }
933
934    /// Visits all children of this node in depth first order
935    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
936        match self {
937            LevelInfoBuilder::Primitive(info) => visit(info),
938            LevelInfoBuilder::List(c, _, _, _, _)
939            | LevelInfoBuilder::LargeList(c, _, _, _, _)
940            | LevelInfoBuilder::FixedSizeList(c, _, _, _)
941            | LevelInfoBuilder::ListView(c, _, _, _, _)
942            | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
943            LevelInfoBuilder::Struct(children, _, _) => {
944                for c in children {
945                    c.visit_leaves(visit)
946                }
947            }
948        }
949    }
950
951    /// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
952    ///
953    /// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
954    /// and the other is a native array, the dictionary values must have the same type as the
955    /// native array
956    fn types_compatible(a: &DataType, b: &DataType) -> bool {
957        // if the Arrow data types are equal, the types are deemed compatible
958        if a.equals_datatype(b) {
959            return true;
960        }
961
962        // get the values out of the dictionaries
963        let (a, b) = match (a, b) {
964            (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
965                (va.as_ref(), vb.as_ref())
966            }
967            (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
968            (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
969            _ => (a, b),
970        };
971
972        // now that we've got the values from one/both dictionaries, if the values
973        // have the same Arrow data type, they're compatible
974        if a == b {
975            return true;
976        }
977
978        // here we have different Arrow data types, but if the array contains the same type of data
979        // then we consider the type compatible
980        match a {
981            // String, StringView and LargeString are compatible
982            DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
983            DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
984            DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
985
986            // Binary, BinaryView and LargeBinary are compatible
987            DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
988            DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
989            DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
990
991            // otherwise we have incompatible types
992            _ => false,
993        }
994    }
995}
996
997/// The data necessary to write a primitive Arrow array to parquet, taking into account
998/// any non-primitive parents it may have in the arrow representation
999#[derive(Debug, Clone)]
1000pub(crate) enum LevelData {
1001    Absent,
1002    Materialized(Vec<i16>),
1003    Uniform { value: i16, count: usize },
1004}
1005
1006// Compare logical level contents rather than physical representation, so a
1007// uniform run compares equal to the equivalent materialized buffer.
1008impl PartialEq for LevelData {
1009    fn eq(&self, other: &Self) -> bool {
1010        match (self, other) {
1011            (Self::Absent, Self::Absent) => true,
1012            (Self::Materialized(a), Self::Materialized(b)) => a == b,
1013            (Self::Uniform { value: v, count: n }, Self::Materialized(b))
1014            | (Self::Materialized(b), Self::Uniform { value: v, count: n }) => {
1015                b.len() == *n && b.iter().all(|x| x == v)
1016            }
1017            (
1018                Self::Uniform {
1019                    value: v1,
1020                    count: n1,
1021                },
1022                Self::Uniform {
1023                    value: v2,
1024                    count: n2,
1025                },
1026            ) => v1 == v2 && n1 == n2,
1027            _ => false,
1028        }
1029    }
1030}
1031
1032impl Eq for LevelData {}
1033
1034impl LevelData {
1035    fn new(present: bool) -> Self {
1036        match present {
1037            true => Self::Materialized(Vec::new()),
1038            false => Self::Absent,
1039        }
1040    }
1041
1042    pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
1043        match self {
1044            Self::Absent => LevelDataRef::Absent,
1045            Self::Materialized(values) => LevelDataRef::Materialized(values),
1046            Self::Uniform { value, count } => LevelDataRef::Uniform {
1047                value: *value,
1048                count: *count,
1049            },
1050        }
1051    }
1052
1053    pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
1054        match self {
1055            Self::Absent => Self::Absent,
1056            Self::Materialized(values) => Self::Materialized(values[offset..offset + len].to_vec()),
1057            Self::Uniform { value, .. } => Self::Uniform {
1058                value: *value,
1059                count: len,
1060            },
1061        }
1062    }
1063
1064    fn append_run(&mut self, value: i16, count: usize) {
1065        if count == 0 {
1066            return;
1067        }
1068
1069        match self {
1070            // No physical level stream exists for this schema. Higher-level
1071            // traversal may still append implicit levels, so this remains a no-op.
1072            Self::Absent => {}
1073            // Start compact: the first appended run can be represented without
1074            // allocating a level buffer.
1075            Self::Materialized(values) if values.is_empty() => {
1076                *self = Self::Uniform { value, count };
1077            }
1078            // Already materialized, so preserve the buffer representation and append.
1079            Self::Materialized(values) => values.extend(std::iter::repeat_n(value, count)),
1080            // Preserve the compact representation while the appended run has
1081            // the same value.
1082            Self::Uniform {
1083                value: uniform_value,
1084                count: uniform_count,
1085            } if *uniform_value == value => {
1086                *uniform_count += count;
1087            }
1088            // A different value breaks the uniform representation. Materialize
1089            // the existing run, then append the new run to the buffer.
1090            Self::Uniform { .. } => {
1091                let values = self.materialize_mut().unwrap();
1092                values.extend(std::iter::repeat_n(value, count));
1093            }
1094        }
1095    }
1096
1097    fn extend_from_iter<I>(&mut self, iter: I)
1098    where
1099        I: IntoIterator<Item = i16>,
1100    {
1101        if let Some(values) = self.materialize_mut() {
1102            values.extend(iter);
1103        }
1104    }
1105
1106    /// Convert a uniform run into a materialized buffer if needed, then return
1107    /// the mutable level buffer. Returns `None` when no physical level stream exists.
1108    fn materialize_mut(&mut self) -> Option<&mut Vec<i16>> {
1109        match self {
1110            Self::Absent => None,
1111            Self::Materialized(values) => Some(values),
1112            Self::Uniform { value, count } => {
1113                let values = vec![*value; *count];
1114                *self = Self::Materialized(values);
1115                match self {
1116                    Self::Materialized(values) => Some(values),
1117                    _ => unreachable!(),
1118                }
1119            }
1120        }
1121    }
1122}
1123
1124#[derive(Debug, Clone)]
1125pub(crate) struct ArrayLevels {
1126    /// Array's definition levels
1127    ///
1128    /// Present if `max_def_level != 0`
1129    def_levels: LevelData,
1130
1131    /// Array's optional repetition levels
1132    ///
1133    /// Present if `max_rep_level != 0`
1134    rep_levels: LevelData,
1135
1136    /// The corresponding array identifying non-null slices of data
1137    /// from the primitive array
1138    non_null_indices: Vec<usize>,
1139
1140    /// The maximum definition level for this leaf column
1141    max_def_level: i16,
1142
1143    /// The maximum repetition for this leaf column
1144    max_rep_level: i16,
1145
1146    /// The arrow array
1147    array: ArrayRef,
1148
1149    /// cached logical nulls of the array.
1150    logical_nulls: Option<NullBuffer>,
1151}
1152
1153impl PartialEq for ArrayLevels {
1154    fn eq(&self, other: &Self) -> bool {
1155        self.def_levels == other.def_levels
1156            && self.rep_levels == other.rep_levels
1157            && self.non_null_indices == other.non_null_indices
1158            && self.max_def_level == other.max_def_level
1159            && self.max_rep_level == other.max_rep_level
1160            && self.array.as_ref() == other.array.as_ref()
1161            && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
1162    }
1163}
1164impl Eq for ArrayLevels {}
1165
1166impl ArrayLevels {
1167    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
1168        let max_rep_level = ctx.rep_level;
1169        let max_def_level = match is_nullable {
1170            true => ctx.def_level + 1,
1171            false => ctx.def_level,
1172        };
1173
1174        let logical_nulls = array.logical_nulls();
1175
1176        Self {
1177            def_levels: LevelData::new(max_def_level != 0),
1178            rep_levels: LevelData::new(max_rep_level != 0),
1179            non_null_indices: vec![],
1180            max_def_level,
1181            max_rep_level,
1182            array,
1183            logical_nulls,
1184        }
1185    }
1186
1187    pub fn array(&self) -> &ArrayRef {
1188        &self.array
1189    }
1190
1191    pub(crate) fn def_level_data(&self) -> &LevelData {
1192        &self.def_levels
1193    }
1194
1195    pub(crate) fn rep_level_data(&self) -> &LevelData {
1196        &self.rep_levels
1197    }
1198
1199    pub fn non_null_indices(&self) -> &[usize] {
1200        &self.non_null_indices
1201    }
1202
1203    /// Create a sliced view of this `ArrayLevels` for a CDC chunk.
1204    ///
1205    /// The chunk's `value_offset`/`num_values` select the relevant slice of
1206    /// `non_null_indices`. The array is sliced to the range covered by
1207    /// those indices, and they are shifted to be relative to the slice.
1208    pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
1209        let def_levels = self.def_levels.slice(chunk.level_offset, chunk.num_levels);
1210        let rep_levels = self.rep_levels.slice(chunk.level_offset, chunk.num_levels);
1211
1212        // Select the non-null indices for this chunk.
1213        let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
1214        // Compute the array range spanned by the non-null indices.
1215        // When nni is empty (all-null chunk), start=0, end=0 → zero-length
1216        // array slice; write_batch_internal will process only the def/rep
1217        // levels and write no values.
1218        let start = nni.first().copied().unwrap_or(0);
1219        let end = nni.last().map_or(0, |&i| i + 1);
1220        // Shift indices to be relative to the sliced array.
1221        let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
1222        // Slice the array to the computed range.
1223        let array = self.array.slice(start, end - start);
1224        let logical_nulls = array.logical_nulls();
1225
1226        Self {
1227            def_levels,
1228            rep_levels,
1229            non_null_indices,
1230            max_def_level: self.max_def_level,
1231            max_rep_level: self.max_rep_level,
1232            array,
1233            logical_nulls,
1234        }
1235    }
1236
1237    /// Bulk-emit `count` uniform def/rep levels.
1238    fn extend_uniform_levels(&mut self, def_val: i16, rep_val: i16, count: usize) {
1239        self.def_levels.append_run(def_val, count);
1240        self.rep_levels.append_run(rep_val, count);
1241    }
1242
1243    fn append_def_level_run(&mut self, value: i16, count: usize) {
1244        self.def_levels.append_run(value, count);
1245    }
1246
1247    fn append_rep_level_run(&mut self, value: i16, count: usize) {
1248        self.rep_levels.append_run(value, count);
1249    }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use super::*;
1255    use crate::column::chunker::CdcChunk;
1256
1257    use arrow_array::builder::*;
1258    use arrow_array::types::Int32Type;
1259    use arrow_array::*;
1260    use arrow_buffer::{Buffer, ToByteSlice};
1261    use arrow_cast::display::array_value_to_string;
1262    use arrow_data::{ArrayData, ArrayDataBuilder};
1263    use arrow_schema::{Fields, Schema};
1264
1265    #[test]
1266    fn test_calculate_array_levels_twitter_example() {
1267        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
1268        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
1269
1270        let leaf_type = Field::new_list_field(DataType::Int32, false);
1271        let inner_type = DataType::List(Arc::new(leaf_type));
1272        let inner_field = Field::new("l2", inner_type.clone(), false);
1273        let outer_type = DataType::List(Arc::new(inner_field));
1274        let outer_field = Field::new("l1", outer_type.clone(), false);
1275
1276        let primitives = Int32Array::from_iter(0..10);
1277
1278        // Cannot use from_iter_primitive as always infers nullable
1279        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
1280        let inner_list = ArrayDataBuilder::new(inner_type)
1281            .len(4)
1282            .add_buffer(offsets)
1283            .add_child_data(primitives.to_data())
1284            .build()
1285            .unwrap();
1286
1287        let offsets = Buffer::from_iter([0_i32, 2, 4]);
1288        let outer_list = ArrayDataBuilder::new(outer_type)
1289            .len(2)
1290            .add_buffer(offsets)
1291            .add_child_data(inner_list)
1292            .build()
1293            .unwrap();
1294        let outer_list = make_array(outer_list);
1295
1296        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
1297        assert_eq!(levels.len(), 1);
1298
1299        let expected = ArrayLevels {
1300            def_levels: LevelData::Materialized(vec![2; 10]),
1301            rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
1302            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
1303            max_def_level: 2,
1304            max_rep_level: 2,
1305            array: Arc::new(primitives),
1306            logical_nulls: None,
1307        };
1308        assert_eq!(&levels[0], &expected);
1309    }
1310
1311    #[test]
1312    fn test_calculate_one_level_1() {
1313        // This test calculates the levels for a non-null primitive array
1314        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1315        let field = Field::new_list_field(DataType::Int32, false);
1316
1317        let levels = calculate_array_levels(&array, &field).unwrap();
1318        assert_eq!(levels.len(), 1);
1319
1320        let expected_levels = ArrayLevels {
1321            def_levels: LevelData::Absent,
1322            rep_levels: LevelData::Absent,
1323            non_null_indices: (0..10).collect(),
1324            max_def_level: 0,
1325            max_rep_level: 0,
1326            array,
1327            logical_nulls: None,
1328        };
1329        assert_eq!(&levels[0], &expected_levels);
1330    }
1331
1332    #[test]
1333    fn test_calculate_one_level_2() {
1334        // This test calculates the levels for a nullable primitive array
1335        let array = Arc::new(Int32Array::from_iter([
1336            Some(0),
1337            None,
1338            Some(0),
1339            Some(0),
1340            None,
1341        ])) as ArrayRef;
1342        let field = Field::new_list_field(DataType::Int32, true);
1343
1344        let levels = calculate_array_levels(&array, &field).unwrap();
1345        assert_eq!(levels.len(), 1);
1346
1347        let logical_nulls = array.logical_nulls();
1348        let expected_levels = ArrayLevels {
1349            def_levels: LevelData::Materialized(vec![1, 0, 1, 1, 0]),
1350            rep_levels: LevelData::Absent,
1351            non_null_indices: vec![0, 2, 3],
1352            max_def_level: 1,
1353            max_rep_level: 0,
1354            array,
1355            logical_nulls,
1356        };
1357        assert_eq!(&levels[0], &expected_levels);
1358    }
1359
1360    #[test]
1361    fn test_calculate_array_levels_1() {
1362        let leaf_field = Field::new_list_field(DataType::Int32, false);
1363        let list_type = DataType::List(Arc::new(leaf_field));
1364
1365        // if all array values are defined (e.g. batch<list<_>>)
1366        // [[0], [1], [2], [3], [4]]
1367
1368        let leaf_array = Int32Array::from_iter(0..5);
1369        // Cannot use from_iter_primitive as always infers nullable
1370        let offsets = Buffer::from_iter(0_i32..6);
1371        let list = ArrayDataBuilder::new(list_type.clone())
1372            .len(5)
1373            .add_buffer(offsets)
1374            .add_child_data(leaf_array.to_data())
1375            .build()
1376            .unwrap();
1377        let list = make_array(list);
1378
1379        let list_field = Field::new("list", list_type.clone(), false);
1380        let levels = calculate_array_levels(&list, &list_field).unwrap();
1381        assert_eq!(levels.len(), 1);
1382
1383        let expected_levels = ArrayLevels {
1384            def_levels: LevelData::Materialized(vec![1; 5]),
1385            rep_levels: LevelData::Materialized(vec![0; 5]),
1386            non_null_indices: (0..5).collect(),
1387            max_def_level: 1,
1388            max_rep_level: 1,
1389            array: Arc::new(leaf_array),
1390            logical_nulls: None,
1391        };
1392        assert_eq!(&levels[0], &expected_levels);
1393
1394        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
1395        // all values are defined as we do not have nulls on the root (batch)
1396        // repetition:
1397        //   0: 0, 1
1398        //   1: 0
1399        //   2: 0, 1
1400        //   3: 0, 1, 1, 1
1401        //   4: 0, 1, 1
1402        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
1403        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
1404        let list = ArrayDataBuilder::new(list_type.clone())
1405            .len(5)
1406            .add_buffer(offsets)
1407            .add_child_data(leaf_array.to_data())
1408            .null_bit_buffer(Some(Buffer::from([0b00011101])))
1409            .build()
1410            .unwrap();
1411        let list = make_array(list);
1412
1413        let list_field = Field::new("list", list_type, true);
1414        let levels = calculate_array_levels(&list, &list_field).unwrap();
1415        assert_eq!(levels.len(), 1);
1416
1417        let expected_levels = ArrayLevels {
1418            def_levels: LevelData::Materialized(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1419            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1420            non_null_indices: (0..11).collect(),
1421            max_def_level: 2,
1422            max_rep_level: 1,
1423            array: Arc::new(leaf_array),
1424            logical_nulls: None,
1425        };
1426        assert_eq!(&levels[0], &expected_levels);
1427    }
1428
1429    #[test]
1430    fn test_calculate_array_levels_2() {
1431        // If some values are null
1432        //
1433        // This emulates an array in the form: <struct<list<?>>
1434        // with values:
1435        // - 0: [0, 1], but is null because of the struct
1436        // - 1: []
1437        // - 2: [2, 3], but is null because of the struct
1438        // - 3: [4, 5, 6, 7]
1439        // - 4: [8, 9, 10]
1440        //
1441        // If the first values of a list are null due to a parent, we have to still account for them
1442        // while indexing, because they would affect the way the child is indexed
1443        // i.e. in the above example, we have to know that [0, 1] has to be skipped
1444        let leaf = Int32Array::from_iter(0..11);
1445        let leaf_field = Field::new("leaf", DataType::Int32, false);
1446
1447        let list_type = DataType::List(Arc::new(leaf_field));
1448        let list = ArrayData::builder(list_type.clone())
1449            .len(5)
1450            .add_child_data(leaf.to_data())
1451            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1452            .build()
1453            .unwrap();
1454
1455        let list = make_array(list);
1456        let list_field = Arc::new(Field::new("list", list_type, true));
1457
1458        let struct_array =
1459            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1460        let array = Arc::new(struct_array) as ArrayRef;
1461
1462        let struct_field = Field::new("struct", array.data_type().clone(), true);
1463
1464        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1465        assert_eq!(levels.len(), 1);
1466
1467        let expected_levels = ArrayLevels {
1468            def_levels: LevelData::Materialized(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1469            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1470            non_null_indices: (4..11).collect(),
1471            max_def_level: 3,
1472            max_rep_level: 1,
1473            array: Arc::new(leaf),
1474            logical_nulls: None,
1475        };
1476
1477        assert_eq!(&levels[0], &expected_levels);
1478
1479        // nested lists
1480
1481        // 0: [[100, 101], [102, 103]]
1482        // 1: []
1483        // 2: [[104, 105], [106, 107]]
1484        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
1485        // 4: [[116, 117], [118, 119], [120, 121]]
1486
1487        let leaf = Int32Array::from_iter(100..122);
1488        let leaf_field = Field::new("leaf", DataType::Int32, true);
1489
1490        let l1_type = DataType::List(Arc::new(leaf_field));
1491        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1492        let l1 = ArrayData::builder(l1_type.clone())
1493            .len(11)
1494            .add_child_data(leaf.to_data())
1495            .add_buffer(offsets)
1496            .build()
1497            .unwrap();
1498
1499        let l1_field = Field::new("l1", l1_type, true);
1500        let l2_type = DataType::List(Arc::new(l1_field));
1501        let l2 = ArrayData::builder(l2_type)
1502            .len(5)
1503            .add_child_data(l1)
1504            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1505            .build()
1506            .unwrap();
1507
1508        let l2 = make_array(l2);
1509        let l2_field = Field::new("l2", l2.data_type().clone(), true);
1510
1511        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1512        assert_eq!(levels.len(), 1);
1513
1514        let expected_levels = ArrayLevels {
1515            def_levels: LevelData::Materialized(vec![
1516                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1517            ]),
1518            rep_levels: LevelData::Materialized(vec![
1519                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1520            ]),
1521            non_null_indices: (0..22).collect(),
1522            max_def_level: 5,
1523            max_rep_level: 2,
1524            array: Arc::new(leaf),
1525            logical_nulls: None,
1526        };
1527
1528        assert_eq!(&levels[0], &expected_levels);
1529    }
1530
1531    #[test]
1532    fn test_calculate_array_levels_nested_list() {
1533        let leaf_field = Field::new("leaf", DataType::Int32, false);
1534        let list_type = DataType::List(Arc::new(leaf_field));
1535
1536        // if all array values are defined (e.g. batch<list<_>>)
1537        // The array at this level looks like:
1538        // 0: [a]
1539        // 1: [a]
1540        // 2: [a]
1541        // 3: [a]
1542
1543        let leaf = Int32Array::from_iter([0; 4]);
1544        let list = ArrayData::builder(list_type.clone())
1545            .len(4)
1546            .add_buffer(Buffer::from_iter(0_i32..5))
1547            .add_child_data(leaf.to_data())
1548            .build()
1549            .unwrap();
1550        let list = make_array(list);
1551
1552        let list_field = Field::new("list", list_type.clone(), false);
1553        let levels = calculate_array_levels(&list, &list_field).unwrap();
1554        assert_eq!(levels.len(), 1);
1555
1556        let expected_levels = ArrayLevels {
1557            def_levels: LevelData::Materialized(vec![1; 4]),
1558            rep_levels: LevelData::Materialized(vec![0; 4]),
1559            non_null_indices: (0..4).collect(),
1560            max_def_level: 1,
1561            max_rep_level: 1,
1562            array: Arc::new(leaf),
1563            logical_nulls: None,
1564        };
1565        assert_eq!(&levels[0], &expected_levels);
1566
1567        // 0: null
1568        // 1: [1, 2, 3]
1569        // 2: [4, 5]
1570        // 3: [6, 7]
1571        let leaf = Int32Array::from_iter(0..8);
1572        let list = ArrayData::builder(list_type.clone())
1573            .len(4)
1574            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1575            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1576            .add_child_data(leaf.to_data())
1577            .build()
1578            .unwrap();
1579        let list = make_array(list);
1580        let list_field = Arc::new(Field::new("list", list_type, true));
1581
1582        let struct_array = StructArray::from(vec![(list_field, list)]);
1583        let array = Arc::new(struct_array) as ArrayRef;
1584
1585        let struct_field = Field::new("struct", array.data_type().clone(), true);
1586        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1587        assert_eq!(levels.len(), 1);
1588
1589        let expected_levels = ArrayLevels {
1590            def_levels: LevelData::Materialized(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1591            rep_levels: LevelData::Materialized(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1592            non_null_indices: (0..7).collect(),
1593            max_def_level: 3,
1594            max_rep_level: 1,
1595            array: Arc::new(leaf),
1596            logical_nulls: None,
1597        };
1598        assert_eq!(&levels[0], &expected_levels);
1599
1600        // nested lists
1601        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
1602        // 0: {"struct": null }
1603        // 1: {"struct": [ [201], [202, 203], [] ]}
1604        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
1605        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
1606
1607        let leaf = Int32Array::from_iter(201..216);
1608        let leaf_field = Field::new("leaf", DataType::Int32, false);
1609        let list_1_type = DataType::List(Arc::new(leaf_field));
1610        let list_1 = ArrayData::builder(list_1_type.clone())
1611            .len(7)
1612            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1613            .add_child_data(leaf.to_data())
1614            .build()
1615            .unwrap();
1616
1617        let list_1_field = Field::new("l1", list_1_type, true);
1618        let list_2_type = DataType::List(Arc::new(list_1_field));
1619        let list_2 = ArrayData::builder(list_2_type.clone())
1620            .len(4)
1621            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1622            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1623            .add_child_data(list_1)
1624            .build()
1625            .unwrap();
1626
1627        let list_2 = make_array(list_2);
1628        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1629
1630        let struct_array =
1631            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1632        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1633
1634        let array = Arc::new(struct_array) as ArrayRef;
1635        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1636        assert_eq!(levels.len(), 1);
1637
1638        let expected_levels = ArrayLevels {
1639            def_levels: LevelData::Materialized(vec![
1640                1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5,
1641            ]),
1642            rep_levels: LevelData::Materialized(vec![
1643                0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2,
1644            ]),
1645            non_null_indices: (0..15).collect(),
1646            max_def_level: 5,
1647            max_rep_level: 2,
1648            array: Arc::new(leaf),
1649            logical_nulls: None,
1650        };
1651        assert_eq!(&levels[0], &expected_levels);
1652    }
1653
1654    #[test]
1655    fn test_calculate_nested_struct_levels() {
1656        // tests a <struct[a]<struct[b]<int[c]>>
1657        // array:
1658        //  - {a: {b: {c: 1}}}
1659        //  - {a: {b: {c: null}}}
1660        //  - {a: {b: {c: 3}}}
1661        //  - {a: {b: null}}
1662        //  - {a: null}}
1663        //  - {a: {b: {c: 6}}}
1664
1665        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1666        let leaf = Arc::new(c) as ArrayRef;
1667        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1668        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1669
1670        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1671        let a = StructArray::from((
1672            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1673            Buffer::from([0b00101111]),
1674        ));
1675
1676        let a_field = Field::new("a", a.data_type().clone(), true);
1677        let a_array = Arc::new(a) as ArrayRef;
1678
1679        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1680        assert_eq!(levels.len(), 1);
1681
1682        let logical_nulls = leaf.logical_nulls();
1683        let expected_levels = ArrayLevels {
1684            def_levels: LevelData::Materialized(vec![3, 2, 3, 1, 0, 3]),
1685            rep_levels: LevelData::Absent,
1686            non_null_indices: vec![0, 2, 5],
1687            max_def_level: 3,
1688            max_rep_level: 0,
1689            array: leaf,
1690            logical_nulls,
1691        };
1692        assert_eq!(&levels[0], &expected_levels);
1693    }
1694
1695    #[test]
1696    fn list_single_column() {
1697        // this tests the level generation from the arrow_writer equivalent test
1698
1699        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1700        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1701        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1702        let a_list_data = ArrayData::builder(a_list_type.clone())
1703            .len(5)
1704            .add_buffer(a_value_offsets)
1705            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1706            .add_child_data(a_values.to_data())
1707            .build()
1708            .unwrap();
1709
1710        assert_eq!(a_list_data.null_count(), 1);
1711
1712        let a = ListArray::from(a_list_data);
1713
1714        let item_field = Field::new_list_field(a_list_type, true);
1715        let mut builder = levels(&item_field, a);
1716        builder.write(2..4);
1717        let levels = builder.finish();
1718
1719        assert_eq!(levels.len(), 1);
1720
1721        let list_level = &levels[0];
1722
1723        let expected_level = ArrayLevels {
1724            def_levels: LevelData::Materialized(vec![0, 3, 3, 3]),
1725            rep_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
1726            non_null_indices: vec![3, 4, 5],
1727            max_def_level: 3,
1728            max_rep_level: 1,
1729            array: Arc::new(a_values),
1730            logical_nulls: None,
1731        };
1732        assert_eq!(list_level, &expected_level);
1733    }
1734
1735    #[test]
1736    fn mixed_struct_list() {
1737        // this tests the level generation from the equivalent arrow_writer_complex test
1738
1739        // define schema
1740        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1741        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1742        let struct_field_g = Arc::new(Field::new(
1743            "g",
1744            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1745            false,
1746        ));
1747        let struct_field_e = Arc::new(Field::new(
1748            "e",
1749            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1750            true,
1751        ));
1752        let schema = Schema::new(vec![
1753            Field::new("a", DataType::Int32, false),
1754            Field::new("b", DataType::Int32, true),
1755            Field::new(
1756                "c",
1757                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1758                true, // https://github.com/apache/arrow-rs/issues/245
1759            ),
1760        ]);
1761
1762        // create some data
1763        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1764        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1765        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1766        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1767
1768        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1769
1770        // Construct a buffer for value offsets, for the nested array:
1771        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1772        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1773
1774        // Construct a list array from the above two
1775        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1776            .len(5)
1777            .add_buffer(g_value_offsets)
1778            .add_child_data(g_value.into_data())
1779            .build()
1780            .unwrap();
1781        let g = ListArray::from(g_list_data);
1782
1783        let e = StructArray::from(vec![
1784            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1785            (struct_field_g, Arc::new(g) as ArrayRef),
1786        ]);
1787
1788        let c = StructArray::from(vec![
1789            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1790            (struct_field_e, Arc::new(e) as ArrayRef),
1791        ]);
1792
1793        // build a record batch
1794        let batch = RecordBatch::try_new(
1795            Arc::new(schema),
1796            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1797        )
1798        .unwrap();
1799
1800        //////////////////////////////////////////////
1801        // calculate the list's level
1802        let mut levels = vec![];
1803        batch
1804            .columns()
1805            .iter()
1806            .zip(batch.schema().fields())
1807            .for_each(|(array, field)| {
1808                let mut array_levels = calculate_array_levels(array, field).unwrap();
1809                levels.append(&mut array_levels);
1810            });
1811        assert_eq!(levels.len(), 5);
1812
1813        // test "a" levels
1814        let list_level = &levels[0];
1815
1816        let expected_level = ArrayLevels {
1817            def_levels: LevelData::Absent,
1818            rep_levels: LevelData::Absent,
1819            non_null_indices: vec![0, 1, 2, 3, 4],
1820            max_def_level: 0,
1821            max_rep_level: 0,
1822            array: Arc::new(a),
1823            logical_nulls: None,
1824        };
1825        assert_eq!(list_level, &expected_level);
1826
1827        // test "b" levels
1828        let list_level = levels.get(1).unwrap();
1829
1830        let b_logical_nulls = b.logical_nulls();
1831        let expected_level = ArrayLevels {
1832            def_levels: LevelData::Materialized(vec![1, 0, 0, 1, 1]),
1833            rep_levels: LevelData::Absent,
1834            non_null_indices: vec![0, 3, 4],
1835            max_def_level: 1,
1836            max_rep_level: 0,
1837            array: Arc::new(b),
1838            logical_nulls: b_logical_nulls,
1839        };
1840        assert_eq!(list_level, &expected_level);
1841
1842        // test "d" levels
1843        let list_level = levels.get(2).unwrap();
1844
1845        let d_logical_nulls = d.logical_nulls();
1846        let expected_level = ArrayLevels {
1847            def_levels: LevelData::Materialized(vec![1, 1, 1, 2, 1]),
1848            rep_levels: LevelData::Absent,
1849            non_null_indices: vec![3],
1850            max_def_level: 2,
1851            max_rep_level: 0,
1852            array: Arc::new(d),
1853            logical_nulls: d_logical_nulls,
1854        };
1855        assert_eq!(list_level, &expected_level);
1856
1857        // test "f" levels
1858        let list_level = levels.get(3).unwrap();
1859
1860        let f_logical_nulls = f.logical_nulls();
1861        let expected_level = ArrayLevels {
1862            def_levels: LevelData::Materialized(vec![3, 2, 3, 2, 3]),
1863            rep_levels: LevelData::Absent,
1864            non_null_indices: vec![0, 2, 4],
1865            max_def_level: 3,
1866            max_rep_level: 0,
1867            array: Arc::new(f),
1868            logical_nulls: f_logical_nulls,
1869        };
1870        assert_eq!(list_level, &expected_level);
1871    }
1872
1873    #[test]
1874    fn test_null_vs_nonnull_struct() {
1875        // define schema
1876        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1877        let schema = Schema::new(vec![Field::new(
1878            "some_nested_object",
1879            DataType::Struct(vec![offset_field.clone()].into()),
1880            false,
1881        )]);
1882
1883        // create some data
1884        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1885
1886        let some_nested_object =
1887            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1888
1889        // build a record batch
1890        let batch =
1891            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1892
1893        let struct_null_level =
1894            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1895
1896        // create second batch
1897        // define schema
1898        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1899        let schema = Schema::new(vec![Field::new(
1900            "some_nested_object",
1901            DataType::Struct(vec![offset_field.clone()].into()),
1902            true,
1903        )]);
1904
1905        // create some data
1906        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1907
1908        let some_nested_object =
1909            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1910
1911        // build a record batch
1912        let batch =
1913            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1914
1915        let struct_non_null_level =
1916            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1917
1918        // The 2 levels should not be the same
1919        if struct_non_null_level == struct_null_level {
1920            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1921        }
1922    }
1923
1924    #[test]
1925    fn test_map_array() {
1926        // Note: we are using the JSON Arrow reader for brevity
1927        let json_content = r#"
1928        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1929        {"stocks":{"long": "$CCC", "short": null}}
1930        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1931        "#;
1932        let entries_struct_type = DataType::Struct(Fields::from(vec![
1933            Field::new("key", DataType::Utf8, false),
1934            Field::new("value", DataType::Utf8, true),
1935        ]));
1936        let stocks_field = Field::new(
1937            "stocks",
1938            DataType::Map(
1939                Arc::new(Field::new("entries", entries_struct_type, false)),
1940                false,
1941            ),
1942            // not nullable, so the keys have max level = 1
1943            false,
1944        );
1945        let schema = Arc::new(Schema::new(vec![stocks_field]));
1946        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1947        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1948
1949        let batch = reader.next().unwrap().unwrap();
1950
1951        // calculate the map's level
1952        let mut levels = vec![];
1953        batch
1954            .columns()
1955            .iter()
1956            .zip(batch.schema().fields())
1957            .for_each(|(array, field)| {
1958                let mut array_levels = calculate_array_levels(array, field).unwrap();
1959                levels.append(&mut array_levels);
1960            });
1961        assert_eq!(levels.len(), 2);
1962
1963        let map = batch.column(0).as_map();
1964        let map_keys_logical_nulls = map.keys().logical_nulls();
1965
1966        // test key levels
1967        let list_level = &levels[0];
1968
1969        let expected_level = ArrayLevels {
1970            def_levels: LevelData::Materialized(vec![1; 7]),
1971            rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1972            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1973            max_def_level: 1,
1974            max_rep_level: 1,
1975            array: map.keys().clone(),
1976            logical_nulls: map_keys_logical_nulls,
1977        };
1978        assert_eq!(list_level, &expected_level);
1979
1980        // test values levels
1981        let list_level = levels.get(1).unwrap();
1982        let map_values_logical_nulls = map.values().logical_nulls();
1983
1984        let expected_level = ArrayLevels {
1985            def_levels: LevelData::Materialized(vec![2, 2, 2, 1, 2, 1, 2]),
1986            rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1987            non_null_indices: vec![0, 1, 2, 4, 6],
1988            max_def_level: 2,
1989            max_rep_level: 1,
1990            array: map.values().clone(),
1991            logical_nulls: map_values_logical_nulls,
1992        };
1993        assert_eq!(list_level, &expected_level);
1994    }
1995
1996    #[test]
1997    fn test_list_of_struct() {
1998        // define schema
1999        let int_field = Field::new("a", DataType::Int32, true);
2000        let fields = Fields::from([Arc::new(int_field)]);
2001        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
2002        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
2003
2004        let int_builder = Int32Builder::with_capacity(10);
2005        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
2006        let mut list_builder = ListBuilder::new(struct_builder);
2007
2008        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
2009        //
2010        // [{a: 1}]
2011        let values = list_builder.values();
2012        values
2013            .field_builder::<Int32Builder>(0)
2014            .unwrap()
2015            .append_value(1);
2016        values.append(true);
2017        list_builder.append(true);
2018
2019        // []
2020        list_builder.append(true);
2021
2022        // null
2023        list_builder.append(false);
2024
2025        // [null, null]
2026        let values = list_builder.values();
2027        values
2028            .field_builder::<Int32Builder>(0)
2029            .unwrap()
2030            .append_null();
2031        values.append(false);
2032        values
2033            .field_builder::<Int32Builder>(0)
2034            .unwrap()
2035            .append_null();
2036        values.append(false);
2037        list_builder.append(true);
2038
2039        // [{a: null}]
2040        let values = list_builder.values();
2041        values
2042            .field_builder::<Int32Builder>(0)
2043            .unwrap()
2044            .append_null();
2045        values.append(true);
2046        list_builder.append(true);
2047
2048        // [{a: 2}]
2049        let values = list_builder.values();
2050        values
2051            .field_builder::<Int32Builder>(0)
2052            .unwrap()
2053            .append_value(2);
2054        values.append(true);
2055        list_builder.append(true);
2056
2057        let array = Arc::new(list_builder.finish());
2058
2059        let values = array.values().as_struct().column(0).clone();
2060        let values_len = values.len();
2061        assert_eq!(values_len, 5);
2062
2063        let schema = Arc::new(Schema::new(vec![list_field]));
2064
2065        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2066
2067        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2068        let list_level = &levels[0];
2069
2070        let logical_nulls = values.logical_nulls();
2071        let expected_level = ArrayLevels {
2072            def_levels: LevelData::Materialized(vec![4, 1, 0, 2, 2, 3, 4]),
2073            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 0, 0]),
2074            non_null_indices: vec![0, 4],
2075            max_def_level: 4,
2076            max_rep_level: 1,
2077            array: values,
2078            logical_nulls,
2079        };
2080
2081        assert_eq!(list_level, &expected_level);
2082    }
2083
2084    #[test]
2085    fn test_struct_mask_list() {
2086        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
2087        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2088            Some(vec![Some(1), Some(2)]),
2089            Some(vec![None]),
2090            Some(vec![]),
2091            Some(vec![Some(3), None]), // Masked by struct array
2092            Some(vec![Some(4), Some(5)]),
2093            None, // Masked by struct array
2094            None,
2095        ]);
2096        let values = inner.values().clone();
2097
2098        // This test assumes that nulls don't take up space
2099        assert_eq!(inner.values().len(), 7);
2100
2101        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
2102        let array = Arc::new(inner) as ArrayRef;
2103        let nulls = Buffer::from([0b01010111]);
2104        let struct_a = StructArray::from((vec![(field, array)], nulls));
2105
2106        let field = Field::new("struct", struct_a.data_type().clone(), true);
2107        let array = Arc::new(struct_a) as ArrayRef;
2108        let levels = calculate_array_levels(&array, &field).unwrap();
2109
2110        assert_eq!(levels.len(), 1);
2111
2112        let logical_nulls = values.logical_nulls();
2113        let expected_level = ArrayLevels {
2114            def_levels: LevelData::Materialized(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
2115            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
2116            non_null_indices: vec![0, 1, 5, 6],
2117            max_def_level: 4,
2118            max_rep_level: 1,
2119            array: values,
2120            logical_nulls,
2121        };
2122
2123        assert_eq!(&levels[0], &expected_level);
2124    }
2125
2126    #[test]
2127    fn test_list_mask_struct() {
2128        // Test the null mask of a struct array and the null mask of a list array
2129        // masking out non-null elements of their children
2130
2131        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2132            Some(vec![None]), // Masked by list array
2133            Some(vec![]),     // Masked by list array
2134            Some(vec![Some(3), None]),
2135            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
2136            None,
2137            None,
2138        ]);
2139        let a1_values = a1.values().clone();
2140        let a1 = Arc::new(a1) as ArrayRef;
2141
2142        let a2 = Arc::new(Int32Array::from_iter(vec![
2143            Some(1), // Masked by list array
2144            Some(2), // Masked by list array
2145            None,
2146            Some(4), // Masked by struct array
2147            Some(5),
2148            None,
2149        ])) as ArrayRef;
2150        let a2_values = a2.clone();
2151
2152        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
2153        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
2154
2155        let nulls = Buffer::from([0b00110111]);
2156        let struct_a = Arc::new(StructArray::from((
2157            vec![(field_a1, a1), (field_a2, a2)],
2158            nulls,
2159        ))) as ArrayRef;
2160
2161        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
2162        let nulls = Buffer::from([0b00111100]);
2163
2164        let list_type = DataType::List(Arc::new(Field::new(
2165            "struct",
2166            struct_a.data_type().clone(),
2167            true,
2168        )));
2169
2170        let data = ArrayDataBuilder::new(list_type.clone())
2171            .len(6)
2172            .null_bit_buffer(Some(nulls))
2173            .add_buffer(offsets)
2174            .add_child_data(struct_a.into_data())
2175            .build()
2176            .unwrap();
2177
2178        let list = make_array(data);
2179        let list_field = Field::new("col", list_type, true);
2180
2181        let expected = vec![
2182            r#""#.to_string(),
2183            r#""#.to_string(),
2184            r#"[]"#.to_string(),
2185            r#"[{list: [3, ], integers: }]"#.to_string(),
2186            r#"[, {list: , integers: 5}]"#.to_string(),
2187            r#"[]"#.to_string(),
2188        ];
2189
2190        let actual: Vec<_> = (0..6)
2191            .map(|x| array_value_to_string(&list, x).unwrap())
2192            .collect();
2193        assert_eq!(actual, expected);
2194
2195        let levels = calculate_array_levels(&list, &list_field).unwrap();
2196
2197        assert_eq!(levels.len(), 2);
2198
2199        let a1_logical_nulls = a1_values.logical_nulls();
2200        let expected_level = ArrayLevels {
2201            def_levels: LevelData::Materialized(vec![0, 0, 1, 6, 5, 2, 3, 1]),
2202            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 2, 0, 1, 0]),
2203            non_null_indices: vec![1],
2204            max_def_level: 6,
2205            max_rep_level: 2,
2206            array: a1_values,
2207            logical_nulls: a1_logical_nulls,
2208        };
2209
2210        assert_eq!(&levels[0], &expected_level);
2211
2212        let a2_logical_nulls = a2_values.logical_nulls();
2213        let expected_level = ArrayLevels {
2214            def_levels: LevelData::Materialized(vec![0, 0, 1, 3, 2, 4, 1]),
2215            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 0, 1, 0]),
2216            non_null_indices: vec![4],
2217            max_def_level: 4,
2218            max_rep_level: 1,
2219            array: a2_values,
2220            logical_nulls: a2_logical_nulls,
2221        };
2222
2223        assert_eq!(&levels[1], &expected_level);
2224    }
2225
2226    #[test]
2227    fn test_fixed_size_list() {
2228        // [[1, 2], null, null, [7, 8], null]
2229        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
2230        builder.values().append_slice(&[1, 2]);
2231        builder.append(true);
2232        builder.values().append_slice(&[3, 4]);
2233        builder.append(false);
2234        builder.values().append_slice(&[5, 6]);
2235        builder.append(false);
2236        builder.values().append_slice(&[7, 8]);
2237        builder.append(true);
2238        builder.values().append_slice(&[9, 10]);
2239        builder.append(false);
2240        let a = builder.finish();
2241        let values = a.values().clone();
2242
2243        let item_field = Field::new_list_field(a.data_type().clone(), true);
2244        let mut builder = levels(&item_field, a);
2245        builder.write(1..4);
2246        let levels = builder.finish();
2247
2248        assert_eq!(levels.len(), 1);
2249
2250        let list_level = &levels[0];
2251
2252        let logical_nulls = values.logical_nulls();
2253        let expected_level = ArrayLevels {
2254            def_levels: LevelData::Materialized(vec![0, 0, 3, 3]),
2255            rep_levels: LevelData::Materialized(vec![0, 0, 0, 1]),
2256            non_null_indices: vec![6, 7],
2257            max_def_level: 3,
2258            max_rep_level: 1,
2259            array: values,
2260            logical_nulls,
2261        };
2262        assert_eq!(list_level, &expected_level);
2263    }
2264
2265    #[test]
2266    fn test_fixed_size_list_of_struct() {
2267        // define schema
2268        let field_a = Field::new("a", DataType::Int32, true);
2269        let field_b = Field::new("b", DataType::Int64, false);
2270        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
2271        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
2272        let list_field = Field::new(
2273            "list",
2274            DataType::FixedSizeList(Arc::new(item_field), 2),
2275            true,
2276        );
2277
2278        let builder_a = Int32Builder::with_capacity(10);
2279        let builder_b = Int64Builder::with_capacity(10);
2280        let struct_builder =
2281            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
2282        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
2283
2284        // [
2285        //   [{a: 1, b: 2}, null],
2286        //   null,
2287        //   [null, null],
2288        //   [{a: null, b: 3}, {a: 2, b: 4}]
2289        // ]
2290
2291        // [{a: 1, b: 2}, null]
2292        let values = list_builder.values();
2293        // {a: 1, b: 2}
2294        values
2295            .field_builder::<Int32Builder>(0)
2296            .unwrap()
2297            .append_value(1);
2298        values
2299            .field_builder::<Int64Builder>(1)
2300            .unwrap()
2301            .append_value(2);
2302        values.append(true);
2303        // null
2304        values
2305            .field_builder::<Int32Builder>(0)
2306            .unwrap()
2307            .append_null();
2308        values
2309            .field_builder::<Int64Builder>(1)
2310            .unwrap()
2311            .append_value(0);
2312        values.append(false);
2313        list_builder.append(true);
2314
2315        // null
2316        let values = list_builder.values();
2317        // null
2318        values
2319            .field_builder::<Int32Builder>(0)
2320            .unwrap()
2321            .append_null();
2322        values
2323            .field_builder::<Int64Builder>(1)
2324            .unwrap()
2325            .append_value(0);
2326        values.append(false);
2327        // null
2328        values
2329            .field_builder::<Int32Builder>(0)
2330            .unwrap()
2331            .append_null();
2332        values
2333            .field_builder::<Int64Builder>(1)
2334            .unwrap()
2335            .append_value(0);
2336        values.append(false);
2337        list_builder.append(false);
2338
2339        // [null, null]
2340        let values = list_builder.values();
2341        // null
2342        values
2343            .field_builder::<Int32Builder>(0)
2344            .unwrap()
2345            .append_null();
2346        values
2347            .field_builder::<Int64Builder>(1)
2348            .unwrap()
2349            .append_value(0);
2350        values.append(false);
2351        // null
2352        values
2353            .field_builder::<Int32Builder>(0)
2354            .unwrap()
2355            .append_null();
2356        values
2357            .field_builder::<Int64Builder>(1)
2358            .unwrap()
2359            .append_value(0);
2360        values.append(false);
2361        list_builder.append(true);
2362
2363        // [{a: null, b: 3}, {a: 2, b: 4}]
2364        let values = list_builder.values();
2365        // {a: null, b: 3}
2366        values
2367            .field_builder::<Int32Builder>(0)
2368            .unwrap()
2369            .append_null();
2370        values
2371            .field_builder::<Int64Builder>(1)
2372            .unwrap()
2373            .append_value(3);
2374        values.append(true);
2375        // {a: 2, b: 4}
2376        values
2377            .field_builder::<Int32Builder>(0)
2378            .unwrap()
2379            .append_value(2);
2380        values
2381            .field_builder::<Int64Builder>(1)
2382            .unwrap()
2383            .append_value(4);
2384        values.append(true);
2385        list_builder.append(true);
2386
2387        let array = Arc::new(list_builder.finish());
2388
2389        assert_eq!(array.values().len(), 8);
2390        assert_eq!(array.len(), 4);
2391
2392        let struct_values = array.values().as_struct();
2393        let values_a = struct_values.column(0).clone();
2394        let values_b = struct_values.column(1).clone();
2395
2396        let schema = Arc::new(Schema::new(vec![list_field]));
2397        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2398
2399        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2400        let a_levels = &levels[0];
2401        let b_levels = &levels[1];
2402
2403        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
2404        let values_a_logical_nulls = values_a.logical_nulls();
2405        let expected_a = ArrayLevels {
2406            def_levels: LevelData::Materialized(vec![4, 2, 0, 2, 2, 3, 4]),
2407            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2408            non_null_indices: vec![0, 7],
2409            max_def_level: 4,
2410            max_rep_level: 1,
2411            array: values_a,
2412            logical_nulls: values_a_logical_nulls,
2413        };
2414        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
2415        let values_b_logical_nulls = values_b.logical_nulls();
2416        let expected_b = ArrayLevels {
2417            def_levels: LevelData::Materialized(vec![3, 2, 0, 2, 2, 3, 3]),
2418            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2419            non_null_indices: vec![0, 6, 7],
2420            max_def_level: 3,
2421            max_rep_level: 1,
2422            array: values_b,
2423            logical_nulls: values_b_logical_nulls,
2424        };
2425
2426        assert_eq!(a_levels, &expected_a);
2427        assert_eq!(b_levels, &expected_b);
2428    }
2429
2430    #[test]
2431    fn test_fixed_size_list_empty() {
2432        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2433        builder.append(true);
2434        builder.append(false);
2435        builder.append(true);
2436        let array = builder.finish();
2437        let values = array.values().clone();
2438
2439        let item_field = Field::new_list_field(array.data_type().clone(), true);
2440        let mut builder = levels(&item_field, array);
2441        builder.write(0..3);
2442        let levels = builder.finish();
2443
2444        assert_eq!(levels.len(), 1);
2445
2446        let list_level = &levels[0];
2447
2448        let logical_nulls = values.logical_nulls();
2449        let expected_level = ArrayLevels {
2450            def_levels: LevelData::Materialized(vec![1, 0, 1]),
2451            rep_levels: LevelData::Materialized(vec![0, 0, 0]),
2452            non_null_indices: vec![],
2453            max_def_level: 3,
2454            max_rep_level: 1,
2455            array: values,
2456            logical_nulls,
2457        };
2458        assert_eq!(list_level, &expected_level);
2459    }
2460
2461    #[test]
2462    fn test_fixed_size_list_of_var_lists() {
2463        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
2464        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2465        builder.values().append_value([Some(1), None, Some(3)]);
2466        builder.values().append_null();
2467        builder.append(true);
2468        builder.values().append_value([Some(4)]);
2469        builder.values().append_value([]);
2470        builder.append(true);
2471        builder.values().append_value([Some(5), Some(6)]);
2472        builder.values().append_value([None, None]);
2473        builder.append(true);
2474        builder.values().append_null();
2475        builder.values().append_null();
2476        builder.append(false);
2477        let a = builder.finish();
2478        let values = a.values().as_list::<i32>().values().clone();
2479
2480        let item_field = Field::new_list_field(a.data_type().clone(), true);
2481        let mut builder = levels(&item_field, a);
2482        builder.write(0..4);
2483        let levels = builder.finish();
2484
2485        let logical_nulls = values.logical_nulls();
2486        let expected_level = ArrayLevels {
2487            def_levels: LevelData::Materialized(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2488            rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2489            non_null_indices: vec![0, 2, 3, 4, 5],
2490            max_def_level: 5,
2491            max_rep_level: 2,
2492            array: values,
2493            logical_nulls,
2494        };
2495
2496        assert_eq!(levels[0], expected_level);
2497    }
2498
2499    #[test]
2500    fn test_null_dictionary_values() {
2501        let values = Int32Array::new(
2502            vec![1, 2, 3, 4].into(),
2503            Some(NullBuffer::from(vec![true, false, true, true])),
2504        );
2505        let keys = Int32Array::new(
2506            vec![1, 54, 2, 0].into(),
2507            Some(NullBuffer::from(vec![true, false, true, true])),
2508        );
2509        // [NULL, NULL, 3, 0]
2510        let dict = DictionaryArray::new(keys, Arc::new(values));
2511
2512        let item_field = Field::new_list_field(dict.data_type().clone(), true);
2513
2514        let mut builder = levels(&item_field, dict.clone());
2515        builder.write(0..4);
2516        let levels = builder.finish();
2517
2518        let logical_nulls = dict.logical_nulls();
2519        let expected_level = ArrayLevels {
2520            def_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
2521            rep_levels: LevelData::Absent,
2522            non_null_indices: vec![2, 3],
2523            max_def_level: 1,
2524            max_rep_level: 0,
2525            array: Arc::new(dict),
2526            logical_nulls,
2527        };
2528        assert_eq!(levels[0], expected_level);
2529    }
2530
2531    #[test]
2532    fn mismatched_types() {
2533        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2534        let field = Field::new_list_field(DataType::Float64, false);
2535
2536        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2537            .unwrap_err()
2538            .to_string();
2539
2540        assert_eq!(
2541            err,
2542            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2543        );
2544    }
2545
2546    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2547        let v = Arc::new(array) as ArrayRef;
2548        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2549    }
2550
2551    #[test]
2552    fn test_slice_for_chunk_flat() {
2553        // Case 1: required field (max_def_level=0, no def/rep levels stored).
2554        // Array has 6 values; all are non-null so non_null_indices covers every position.
2555        // value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4].
2556        // Array is sliced (no def_levels → write_batch_internal uses values.len()).
2557        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2558        let logical_nulls = array.logical_nulls();
2559        let levels = ArrayLevels {
2560            def_levels: LevelData::Absent,
2561            rep_levels: LevelData::Absent,
2562            non_null_indices: vec![0, 1, 2, 3, 4, 5],
2563            max_def_level: 0,
2564            max_rep_level: 0,
2565            array,
2566            logical_nulls,
2567        };
2568        let sliced = levels.slice_for_chunk(&CdcChunk {
2569            level_offset: 0,
2570            num_levels: 0,
2571            value_offset: 2,
2572            num_values: 3,
2573        });
2574        assert!(matches!(sliced.def_levels, LevelData::Absent));
2575        assert!(matches!(sliced.rep_levels, LevelData::Absent));
2576        assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2577        assert_eq!(sliced.array.len(), 3);
2578
2579        // Case 2: optional field (max_def_level=1, def levels present, no rep levels).
2580        // Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
2581        // non_null_indices: [0, 2, 4, 5]
2582        // value_offset=1, num_values=1 → non_null_indices[1..2] = [2].
2583        // Array is not sliced (def_levels present → num_levels from def_levels.len()).
2584        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2585            Some(1),
2586            None,
2587            Some(3),
2588            None,
2589            Some(5),
2590            Some(6),
2591        ]));
2592        let logical_nulls = array.logical_nulls();
2593        let levels = ArrayLevels {
2594            def_levels: LevelData::Materialized(vec![1, 0, 1, 0, 1, 1]),
2595            rep_levels: LevelData::Absent,
2596            non_null_indices: vec![0, 2, 4, 5],
2597            max_def_level: 1,
2598            max_rep_level: 0,
2599            array,
2600            logical_nulls,
2601        };
2602        let sliced = levels.slice_for_chunk(&CdcChunk {
2603            level_offset: 1,
2604            num_levels: 3,
2605            value_offset: 1,
2606            num_values: 1,
2607        });
2608        assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 1, 0]));
2609        assert!(matches!(sliced.rep_levels, LevelData::Absent));
2610        assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2 (nni[0])
2611        assert_eq!(sliced.array.len(), 1);
2612    }
2613
2614    #[test]
2615    fn test_slice_for_chunk_nested_with_nulls() {
2616        // Regression test for https://github.com/apache/arrow-rs/issues/9637
2617        //
2618        // Simulates a List<Int32?> where null list entries have non-zero child
2619        // ranges (valid per Arrow spec: "a null value may correspond to a
2620        // non-empty segment in the child array"). This creates gaps in the
2621        // leaf array that don't correspond to any levels.
2622        //
2623        // 5 rows with 2 null list entries owning non-empty child ranges:
2624        //   row 0: [1]       → leaf[0]
2625        //   row 1: null list → owns leaf[1..3] (gap of 2)
2626        //   row 2: [2, null] → leaf[3], leaf[4]=null element
2627        //   row 3: null list → owns leaf[5..8] (gap of 3)
2628        //   row 4: [4, 5]   → leaf[8], leaf[9]
2629        //
2630        // def_levels: [3,  0,  3, 2,  0,  3, 3]
2631        // rep_levels: [0,  0,  0, 1,  0,  0, 1]
2632        // non_null_indices: [0, 3, 8, 9]
2633        //   gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7)
2634        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2635            Some(1), // 0: row 0
2636            None,    // 1: gap (null list row 1)
2637            None,    // 2: gap (null list row 1)
2638            Some(2), // 3: row 2
2639            None,    // 4: row 2, null element
2640            None,    // 5: gap (null list row 3)
2641            None,    // 6: gap (null list row 3)
2642            None,    // 7: gap (null list row 3)
2643            Some(4), // 8: row 4
2644            Some(5), // 9: row 4
2645        ]));
2646        let logical_nulls = array.logical_nulls();
2647        let levels = ArrayLevels {
2648            def_levels: LevelData::Materialized(vec![3, 0, 3, 2, 0, 3, 3]),
2649            rep_levels: LevelData::Materialized(vec![0, 0, 0, 1, 0, 0, 1]),
2650            non_null_indices: vec![0, 3, 8, 9],
2651            max_def_level: 3,
2652            max_rep_level: 1,
2653            array,
2654            logical_nulls,
2655        };
2656
2657        // Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1]
2658        let chunk0 = levels.slice_for_chunk(&CdcChunk {
2659            level_offset: 0,
2660            num_levels: 2,
2661            value_offset: 0,
2662            num_values: 1,
2663        });
2664        assert_eq!(chunk0.non_null_indices, vec![0]);
2665        assert_eq!(chunk0.array.len(), 1);
2666
2667        // Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4]
2668        let chunk1 = levels.slice_for_chunk(&CdcChunk {
2669            level_offset: 2,
2670            num_levels: 3,
2671            value_offset: 1,
2672            num_values: 1,
2673        });
2674        assert_eq!(chunk1.non_null_indices, vec![0]);
2675        assert_eq!(chunk1.array.len(), 1);
2676
2677        // Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10]
2678        let chunk2 = levels.slice_for_chunk(&CdcChunk {
2679            level_offset: 5,
2680            num_levels: 2,
2681            value_offset: 2,
2682            num_values: 2,
2683        });
2684        assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2685        assert_eq!(chunk2.array.len(), 2);
2686    }
2687
2688    #[test]
2689    fn test_slice_for_chunk_all_null() {
2690        // All-null chunk: num_values=0 → empty nni slice → zero-length array.
2691        let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2692        let logical_nulls = array.logical_nulls();
2693        let levels = ArrayLevels {
2694            def_levels: LevelData::Materialized(vec![1, 0, 0, 1]),
2695            rep_levels: LevelData::Absent,
2696            non_null_indices: vec![0, 3],
2697            max_def_level: 1,
2698            max_rep_level: 0,
2699            array,
2700            logical_nulls,
2701        };
2702        // Chunk covering only the two null rows (levels 1..3), zero non-null values.
2703        let sliced = levels.slice_for_chunk(&CdcChunk {
2704            level_offset: 1,
2705            num_levels: 2,
2706            value_offset: 1,
2707            num_values: 0,
2708        });
2709        assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 0]));
2710        assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2711        assert_eq!(sliced.array.len(), 0);
2712    }
2713
2714    #[test]
2715    fn test_all_null_list() {
2716        // List<Int32> where every list slot is null.
2717        // Schema: list (nullable) -> item (int32, nullable)
2718        // Data: [null, null, null, null]
2719        //
2720        // Expected: max_def=3, max_rep=1, def/rep levels all 0.
2721        let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2722        let list = ListArray::new_null(item_field, 4);
2723        let values = list.values().clone();
2724        let field = Field::new("list", list.data_type().clone(), true);
2725        let array = Arc::new(list) as ArrayRef;
2726
2727        let levels = calculate_array_levels(&array, &field).unwrap();
2728        assert_eq!(levels.len(), 1);
2729
2730        let logical_nulls = values.logical_nulls();
2731        let expected = ArrayLevels {
2732            def_levels: LevelData::Uniform { value: 0, count: 4 },
2733            rep_levels: LevelData::Uniform { value: 0, count: 4 },
2734            non_null_indices: vec![],
2735            max_def_level: 3,
2736            max_rep_level: 1,
2737            array: values,
2738            logical_nulls,
2739        };
2740        assert_eq!(&levels[0], &expected);
2741    }
2742
2743    #[test]
2744    fn test_all_null_fixed_size_list() {
2745        // FixedSizeList<Int32; 2> where every list slot is null.
2746        // Schema: list (nullable) -> item (int32, nullable)
2747        // Data: [null, null, null]
2748        //
2749        // Expected: max_def=3, max_rep=1, def/rep levels all 0.
2750        let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2751        let list = FixedSizeListArray::new_null(item_field, 2, 3);
2752        let values = list.values().clone();
2753        let field = Field::new("list", list.data_type().clone(), true);
2754        let array = Arc::new(list) as ArrayRef;
2755
2756        let levels = calculate_array_levels(&array, &field).unwrap();
2757        assert_eq!(levels.len(), 1);
2758
2759        let logical_nulls = values.logical_nulls();
2760        let expected = ArrayLevels {
2761            def_levels: LevelData::Uniform { value: 0, count: 3 },
2762            rep_levels: LevelData::Uniform { value: 0, count: 3 },
2763            non_null_indices: vec![],
2764            max_def_level: 3,
2765            max_rep_level: 1,
2766            array: values,
2767            logical_nulls,
2768        };
2769        assert_eq!(&levels[0], &expected);
2770    }
2771
2772    #[test]
2773    fn test_all_null_struct() {
2774        // Struct<Int32> where every struct slot is null.
2775        // Schema: a (struct, nullable) -> c (int32, nullable)
2776        // Data: [null, null, null, null]
2777        //
2778        // Expected: max_def=2, def_levels all 0 (struct is null → child never reached),
2779        // leaf values are empty.
2780        let c = Int32Array::from(vec![None::<i32>; 4]);
2781        let leaf = Arc::new(c) as ArrayRef;
2782        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2783        let a = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2784        let a_field = Field::new("a", a.data_type().clone(), true);
2785        let a_array = Arc::new(a) as ArrayRef;
2786
2787        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2788        assert_eq!(levels.len(), 1);
2789
2790        let expected = ArrayLevels {
2791            def_levels: LevelData::Uniform { value: 0, count: 4 },
2792            rep_levels: LevelData::Absent,
2793            non_null_indices: vec![],
2794            max_def_level: 2,
2795            max_rep_level: 0,
2796            array: leaf,
2797            logical_nulls: Some(NullBuffer::new_null(4)),
2798        };
2799        assert_eq!(&levels[0], &expected);
2800    }
2801
2802    #[test]
2803    fn test_all_null_nested_struct() {
2804        // Struct<Struct<Int32>> where the outer struct is entirely null.
2805        // Schema: a (struct, nullable) -> b (struct, nullable) -> c (int32, nullable)
2806        // Data: [null, null, null]
2807        //
2808        // Expected: max_def=3, def_levels all 0.
2809        let c = Int32Array::from(vec![None::<i32>; 3]);
2810        let leaf = Arc::new(c) as ArrayRef;
2811        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2812        let b = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2813        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
2814        let a = StructArray::from((
2815            vec![(b_field, Arc::new(b) as ArrayRef)],
2816            Buffer::from([0b00000000]),
2817        ));
2818        let a_field = Field::new("a", a.data_type().clone(), true);
2819        let a_array = Arc::new(a) as ArrayRef;
2820
2821        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2822        assert_eq!(levels.len(), 1);
2823
2824        let expected = ArrayLevels {
2825            def_levels: LevelData::Uniform { value: 0, count: 3 },
2826            rep_levels: LevelData::Absent,
2827            non_null_indices: vec![],
2828            max_def_level: 3,
2829            max_rep_level: 0,
2830            array: leaf,
2831            logical_nulls: Some(NullBuffer::new_null(3)),
2832        };
2833        assert_eq!(&levels[0], &expected);
2834    }
2835
2836    #[test]
2837    fn test_all_null_struct_multiple_children() {
2838        // Struct with two leaf children, entirely null.
2839        // Schema: a (struct, nullable) -> { c1 (int32, nullable), c2 (int32, nullable) }
2840        // Data: [null, null]
2841        //
2842        // Both leaf columns should get uniform def_levels=0.
2843        let c1 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2844        let c2 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2845        let c1_field = Arc::new(Field::new("c1", DataType::Int32, true));
2846        let c2_field = Arc::new(Field::new("c2", DataType::Int32, true));
2847        let a = StructArray::from((
2848            vec![(c1_field, c1.clone()), (c2_field, c2.clone())],
2849            Buffer::from([0b00000000]),
2850        ));
2851        let a_field = Field::new("a", a.data_type().clone(), true);
2852        let a_array = Arc::new(a) as ArrayRef;
2853
2854        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2855        assert_eq!(levels.len(), 2);
2856
2857        for (i, leaf) in [c1, c2].into_iter().enumerate() {
2858            let expected = ArrayLevels {
2859                def_levels: LevelData::Uniform { value: 0, count: 2 },
2860                rep_levels: LevelData::Absent,
2861                non_null_indices: vec![],
2862                max_def_level: 2,
2863                max_rep_level: 0,
2864                array: leaf,
2865                logical_nulls: Some(NullBuffer::new_null(2)),
2866            };
2867            assert_eq!(&levels[i], &expected, "leaf {i} mismatch");
2868        }
2869    }
2870}