Skip to main content

parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::column::chunker::CdcChunk;
44use crate::column::writer::LevelDataRef;
45use crate::errors::{ParquetError, Result};
46use arrow_array::cast::AsArray;
47use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
48use arrow_buffer::bit_iterator::BitIndexIterator;
49use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
50use arrow_schema::{DataType, Field};
51use std::ops::Range;
52use std::sync::Arc;
53
54/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
55/// for each leaf column encountered
56pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
57    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
58    builder.write(0..array.len());
59    Ok(builder.finish())
60}
61
62/// Returns true if the DataType can be represented as a primitive parquet column,
63/// i.e. a leaf array with no children
64fn is_leaf(data_type: &DataType) -> bool {
65    matches!(
66        data_type,
67        DataType::Null
68            | DataType::Boolean
69            | DataType::Int8
70            | DataType::Int16
71            | DataType::Int32
72            | DataType::Int64
73            | DataType::UInt8
74            | DataType::UInt16
75            | DataType::UInt32
76            | DataType::UInt64
77            | DataType::Float16
78            | DataType::Float32
79            | DataType::Float64
80            | DataType::Utf8
81            | DataType::Utf8View
82            | DataType::LargeUtf8
83            | DataType::Timestamp(_, _)
84            | DataType::Date32
85            | DataType::Date64
86            | DataType::Time32(_)
87            | DataType::Time64(_)
88            | DataType::Duration(_)
89            | DataType::Interval(_)
90            | DataType::Binary
91            | DataType::LargeBinary
92            | DataType::BinaryView
93            | DataType::Decimal32(_, _)
94            | DataType::Decimal64(_, _)
95            | DataType::Decimal128(_, _)
96            | DataType::Decimal256(_, _)
97            | DataType::FixedSizeBinary(_)
98    )
99}
100
101/// The definition and repetition level of an array within a potentially nested hierarchy
102#[derive(Debug, Default, Clone, Copy)]
103struct LevelContext {
104    /// The current repetition level
105    rep_level: i16,
106    /// The current definition level
107    def_level: i16,
108}
109
110/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
111#[derive(Debug)]
112enum LevelInfoBuilder {
113    /// A primitive, leaf array
114    Primitive(ArrayLevels),
115    /// A list array
116    List(
117        Box<LevelInfoBuilder>, // Child Values
118        LevelContext,          // Context
119        OffsetBuffer<i32>,     // Offsets
120        Option<NullBuffer>,    // Nulls
121    ),
122    /// A large list array
123    LargeList(
124        Box<LevelInfoBuilder>, // Child Values
125        LevelContext,          // Context
126        OffsetBuffer<i64>,     // Offsets
127        Option<NullBuffer>,    // Nulls
128    ),
129    /// A fixed size list array
130    FixedSizeList(
131        Box<LevelInfoBuilder>, // Values
132        LevelContext,          // Context
133        usize,                 // List Size
134        Option<NullBuffer>,    // Nulls
135    ),
136    /// A list view array
137    ListView(
138        Box<LevelInfoBuilder>, // Child Values
139        LevelContext,          // Context
140        ScalarBuffer<i32>,     // Offsets
141        ScalarBuffer<i32>,     // Sizes
142        Option<NullBuffer>,    // Nulls
143    ),
144    /// A large list view array
145    LargeListView(
146        Box<LevelInfoBuilder>, // Child Values
147        LevelContext,          // Context
148        ScalarBuffer<i64>,     // Offsets
149        ScalarBuffer<i64>,     // Sizes
150        Option<NullBuffer>,    // Nulls
151    ),
152    /// A struct array
153    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
154}
155
156/// Minimum sub-range length before the bulk-fill fast path in `write_leaf`
157/// becomes profitable for null-heavy leaf columns. Below this, per-call
158/// slice + popcount overhead regresses list/struct paths that call
159/// `write_leaf` many times with tiny ranges. Picked via threshold sweep;
160/// see <https://github.com/apache/arrow-rs/pull/9967> for the rationale.
161const BULK_FILL_MIN_LEN: usize = 64;
162
163impl LevelInfoBuilder {
164    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
165    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
166        if !Self::types_compatible(field.data_type(), array.data_type()) {
167            return Err(arrow_err!(format!(
168                "Incompatible type. Field '{}' has type {}, array has type {}",
169                field.name(),
170                field.data_type(),
171                array.data_type(),
172            )));
173        }
174
175        let is_nullable = field.is_nullable();
176
177        match array.data_type() {
178            d if is_leaf(d) => {
179                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
180                Ok(Self::Primitive(levels))
181            }
182            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
183                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
184                Ok(Self::Primitive(levels))
185            }
186            DataType::Struct(children) => {
187                let array = array.as_struct();
188                let def_level = match is_nullable {
189                    true => parent_ctx.def_level + 1,
190                    false => parent_ctx.def_level,
191                };
192
193                let ctx = LevelContext {
194                    rep_level: parent_ctx.rep_level,
195                    def_level,
196                };
197
198                let children = children
199                    .iter()
200                    .zip(array.columns())
201                    .map(|(f, a)| Self::try_new(f, ctx, a))
202                    .collect::<Result<_>>()?;
203
204                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
205            }
206            DataType::List(child)
207            | DataType::LargeList(child)
208            | DataType::Map(child, _)
209            | DataType::FixedSizeList(child, _)
210            | DataType::ListView(child)
211            | DataType::LargeListView(child) => {
212                let def_level = match is_nullable {
213                    true => parent_ctx.def_level + 2,
214                    false => parent_ctx.def_level + 1,
215                };
216
217                let ctx = LevelContext {
218                    rep_level: parent_ctx.rep_level + 1,
219                    def_level,
220                };
221
222                Ok(match field.data_type() {
223                    DataType::List(_) => {
224                        let list = array.as_list();
225                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
226                        let offsets = list.offsets().clone();
227                        Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
228                    }
229                    DataType::LargeList(_) => {
230                        let list = array.as_list();
231                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
232                        let offsets = list.offsets().clone();
233                        let nulls = list.nulls().cloned();
234                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
235                    }
236                    DataType::Map(_, _) => {
237                        let map = array.as_map();
238                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
239                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
240                        let offsets = map.offsets().clone();
241                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
242                    }
243                    DataType::FixedSizeList(_, size) => {
244                        let list = array.as_fixed_size_list();
245                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
246                        let nulls = list.nulls().cloned();
247                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
248                    }
249                    DataType::ListView(_) => {
250                        let list = array.as_list_view();
251                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
252                        let offsets = list.offsets().clone();
253                        let sizes = list.sizes().clone();
254                        let nulls = list.nulls().cloned();
255                        Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
256                    }
257                    DataType::LargeListView(_) => {
258                        let list = array.as_list_view();
259                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
260                        let offsets = list.offsets().clone();
261                        let sizes = list.sizes().clone();
262                        let nulls = list.nulls().cloned();
263                        Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
264                    }
265                    _ => unreachable!(),
266                })
267            }
268            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
269        }
270    }
271
272    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
273    /// as enumerated by a depth-first search
274    fn finish(self) -> Vec<ArrayLevels> {
275        match self {
276            LevelInfoBuilder::Primitive(v) => vec![v],
277            LevelInfoBuilder::List(v, _, _, _)
278            | LevelInfoBuilder::LargeList(v, _, _, _)
279            | LevelInfoBuilder::FixedSizeList(v, _, _, _)
280            | LevelInfoBuilder::ListView(v, _, _, _, _)
281            | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
282            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
283        }
284    }
285
286    /// Given an `array`, write the level data for the elements in `range`
287    fn write(&mut self, range: Range<usize>) {
288        match self {
289            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
290            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
291                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
292            }
293            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
294                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
295            }
296            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
297                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
298            }
299            LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
300                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
301            }
302            LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
303                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
304            }
305            LevelInfoBuilder::Struct(children, ctx, nulls) => {
306                Self::write_struct(children, ctx, nulls.as_ref(), range)
307            }
308        }
309    }
310
311    /// Write `range` elements from ListArray `array`
312    ///
313    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
314    fn write_list<O: OffsetSizeTrait>(
315        child: &mut LevelInfoBuilder,
316        ctx: &LevelContext,
317        offsets: &[O],
318        nulls: Option<&NullBuffer>,
319        range: Range<usize>,
320    ) {
321        // Fast path: entire list array is null; emit bulk null rep/def levels
322        if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
323            let count = range.end - range.start;
324            child.visit_leaves(|leaf| {
325                leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
326            });
327            return;
328        }
329
330        let offsets = &offsets[range.start..range.end + 1];
331
332        let write_non_null_slice =
333            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
334                child.write(start_idx..end_idx);
335                child.visit_leaves(|leaf| {
336                    let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
337                    let mut rev = rep_levels.iter_mut().rev();
338                    let mut remaining = end_idx - start_idx;
339
340                    loop {
341                        let next = rev.next().unwrap();
342                        if *next > ctx.rep_level {
343                            // Nested element - ignore
344                            continue;
345                        }
346
347                        remaining -= 1;
348                        if remaining == 0 {
349                            *next = ctx.rep_level - 1;
350                            break;
351                        }
352                    }
353                })
354            };
355
356        // In a list column, each row falls into one of three categories:
357        // - "null": the list slot is absent (!is_valid), encoded at def_level - 2
358        // - "empty": the list slot is present but has zero elements
359        //   (offsets[i] == offsets[i+1]), encoded at def_level - 1
360        // - non-empty: the list slot has child values, which are recursed into
361        //
362        // Consecutive runs of null or empty rows are batched and written together.
363        let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
364            if count > 0 {
365                child.visit_leaves(|leaf| {
366                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
367                    leaf.append_def_level_run(ctx.def_level - 2, count);
368                });
369            }
370        };
371
372        let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
373            if count > 0 {
374                child.visit_leaves(|leaf| {
375                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
376                    leaf.append_def_level_run(ctx.def_level - 1, count);
377                });
378            }
379        };
380
381        match nulls {
382            Some(nulls) => {
383                let null_offset = range.start;
384                let mut pending_nulls: usize = 0;
385                let mut pending_empties: usize = 0;
386
387                // TODO: Faster bitmask iteration (#1757)
388                for (idx, w) in offsets.windows(2).enumerate() {
389                    let is_valid = nulls.is_valid(idx + null_offset);
390                    let start_idx = w[0].as_usize();
391                    let end_idx = w[1].as_usize();
392
393                    if !is_valid {
394                        write_empty_run(child, pending_empties);
395                        pending_empties = 0;
396                        pending_nulls += 1;
397                    } else if start_idx == end_idx {
398                        write_null_run(child, pending_nulls);
399                        pending_nulls = 0;
400                        pending_empties += 1;
401                    } else {
402                        write_null_run(child, pending_nulls);
403                        pending_nulls = 0;
404                        write_empty_run(child, pending_empties);
405                        pending_empties = 0;
406                        write_non_null_slice(child, start_idx, end_idx);
407                    }
408                }
409                write_null_run(child, pending_nulls);
410                write_empty_run(child, pending_empties);
411            }
412            None => {
413                let mut pending_empties: usize = 0;
414                for w in offsets.windows(2) {
415                    let start_idx = w[0].as_usize();
416                    let end_idx = w[1].as_usize();
417                    if start_idx == end_idx {
418                        pending_empties += 1;
419                    } else {
420                        write_empty_run(child, pending_empties);
421                        pending_empties = 0;
422                        write_non_null_slice(child, start_idx, end_idx);
423                    }
424                }
425                write_empty_run(child, pending_empties);
426            }
427        }
428    }
429
430    /// Write `range` elements from ListViewArray `array`
431    fn write_list_view<O: OffsetSizeTrait>(
432        child: &mut LevelInfoBuilder,
433        ctx: &LevelContext,
434        offsets: &[O],
435        sizes: &[O],
436        nulls: Option<&NullBuffer>,
437        range: Range<usize>,
438    ) {
439        let offsets = &offsets[range.start..range.end];
440        let sizes = &sizes[range.start..range.end];
441
442        let write_non_null_slice =
443            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
444                child.write(start_idx..end_idx);
445                child.visit_leaves(|leaf| {
446                    let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
447                    let mut rev = rep_levels.iter_mut().rev();
448                    let mut remaining = end_idx - start_idx;
449
450                    loop {
451                        let next = rev.next().unwrap();
452                        if *next > ctx.rep_level {
453                            // Nested element - ignore
454                            continue;
455                        }
456
457                        remaining -= 1;
458                        if remaining == 0 {
459                            *next = ctx.rep_level - 1;
460                            break;
461                        }
462                    }
463                })
464            };
465
466        let write_empty_slice = |child: &mut LevelInfoBuilder| {
467            child.visit_leaves(|leaf| {
468                leaf.append_rep_level_run(ctx.rep_level - 1, 1);
469                leaf.append_def_level_run(ctx.def_level - 1, 1);
470            })
471        };
472
473        let write_null_slice = |child: &mut LevelInfoBuilder| {
474            child.visit_leaves(|leaf| {
475                leaf.append_rep_level_run(ctx.rep_level - 1, 1);
476                leaf.append_def_level_run(ctx.def_level - 2, 1);
477            })
478        };
479
480        match nulls {
481            Some(nulls) => {
482                let null_offset = range.start;
483                // TODO: Faster bitmask iteration (#1757)
484                for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
485                    let is_valid = nulls.is_valid(idx + null_offset);
486                    let start_idx = offset.as_usize();
487                    let size = size.as_usize();
488                    let end_idx = start_idx + size;
489                    if !is_valid {
490                        write_null_slice(child)
491                    } else if size == 0 {
492                        write_empty_slice(child)
493                    } else {
494                        write_non_null_slice(child, start_idx, end_idx)
495                    }
496                }
497            }
498            None => {
499                for (offset, size) in offsets.iter().zip(sizes.iter()) {
500                    let start_idx = offset.as_usize();
501                    let size = size.as_usize();
502                    let end_idx = start_idx + size;
503                    if size == 0 {
504                        write_empty_slice(child)
505                    } else {
506                        write_non_null_slice(child, start_idx, end_idx)
507                    }
508                }
509            }
510        }
511    }
512
513    /// Write `range` elements from StructArray `array`
514    fn write_struct(
515        children: &mut [LevelInfoBuilder],
516        ctx: &LevelContext,
517        nulls: Option<&NullBuffer>,
518        range: Range<usize>,
519    ) {
520        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
521            let len = range.end - range.start;
522            for child in children {
523                child.visit_leaves(|info| {
524                    info.extend_uniform_levels(ctx.def_level - 1, ctx.rep_level, len);
525                })
526            }
527        };
528
529        // Fast path: entire struct array is null; emit bulk null def/rep levels
530        if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
531            write_null(children, range);
532            return;
533        }
534
535        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
536            for child in children {
537                child.write(range.clone())
538            }
539        };
540
541        match nulls {
542            Some(validity) => {
543                let mut last_non_null_idx = None;
544                let mut last_null_idx = None;
545
546                // TODO: Faster bitmask iteration (#1757)
547                for i in range.clone() {
548                    match validity.is_valid(i) {
549                        true => {
550                            if let Some(last_idx) = last_null_idx.take() {
551                                write_null(children, last_idx..i)
552                            }
553                            last_non_null_idx.get_or_insert(i);
554                        }
555                        false => {
556                            if let Some(last_idx) = last_non_null_idx.take() {
557                                write_non_null(children, last_idx..i)
558                            }
559                            last_null_idx.get_or_insert(i);
560                        }
561                    }
562                }
563
564                if let Some(last_idx) = last_null_idx.take() {
565                    write_null(children, last_idx..range.end)
566                }
567
568                if let Some(last_idx) = last_non_null_idx.take() {
569                    write_non_null(children, last_idx..range.end)
570                }
571            }
572            None => write_non_null(children, range),
573        }
574    }
575
576    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
577    fn write_fixed_size_list(
578        child: &mut LevelInfoBuilder,
579        ctx: &LevelContext,
580        fixed_size: usize,
581        nulls: Option<&NullBuffer>,
582        range: Range<usize>,
583    ) {
584        // Fast path: entire fixed-size list array is null
585        if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
586            let count = range.end - range.start;
587            child.visit_leaves(|leaf| {
588                leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
589            });
590            return;
591        }
592
593        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
594            let values_start = start_idx * fixed_size;
595            let values_end = end_idx * fixed_size;
596            child.write(values_start..values_end);
597
598            child.visit_leaves(|leaf| {
599                let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
600
601                let row_indices = (0..fixed_size)
602                    .rev()
603                    .cycle()
604                    .take(values_end - values_start);
605
606                // Step backward over the child rep levels and mark the start of each list
607                rep_levels
608                    .iter_mut()
609                    .rev()
610                    // Filter out reps from nested children
611                    .filter(|&&mut r| r == ctx.rep_level)
612                    .zip(row_indices)
613                    .for_each(|(r, idx)| {
614                        if idx == 0 {
615                            *r = ctx.rep_level - 1;
616                        }
617                    });
618            })
619        };
620
621        // If list size is 0, ignore values and just write rep/def levels.
622        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
623            let len = end_idx - start_idx;
624            child.visit_leaves(|leaf| {
625                leaf.append_rep_level_run(ctx.rep_level - 1, len);
626                leaf.append_def_level_run(ctx.def_level - 1, len);
627            })
628        };
629
630        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
631            if fixed_size > 0 {
632                write_non_null(child, start_idx, end_idx)
633            } else {
634                write_empty(child, start_idx, end_idx)
635            }
636        };
637
638        match nulls {
639            Some(nulls) => {
640                let mut start_idx = None;
641                for idx in range.clone() {
642                    if nulls.is_valid(idx) {
643                        // Start a run of valid rows if not already inside of one
644                        start_idx.get_or_insert(idx);
645                    } else {
646                        // Write out any pending valid rows
647                        if let Some(start) = start_idx.take() {
648                            write_rows(child, start, idx);
649                        }
650                        // Add null row
651                        child.visit_leaves(|leaf| {
652                            leaf.append_rep_level_run(ctx.rep_level - 1, 1);
653                            leaf.append_def_level_run(ctx.def_level - 2, 1);
654                        })
655                    }
656                }
657                // Write out any remaining valid rows
658                if let Some(start) = start_idx.take() {
659                    write_rows(child, start, range.end);
660                }
661            }
662            // If all rows are valid then write the whole array
663            None => write_rows(child, range.start, range.end),
664        }
665    }
666
667    /// Write a primitive array, as defined by [`is_leaf`]
668    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
669        let len = range.end - range.start;
670
671        // Fast path: entire leaf array is null
672        if let Some(nulls) = &info.logical_nulls {
673            if !matches!(info.def_levels, LevelData::Absent) && nulls.null_count() == nulls.len() {
674                info.extend_uniform_levels(info.max_def_level - 1, info.max_rep_level, len);
675                return;
676            }
677        }
678
679        if matches!(info.def_levels, LevelData::Absent) {
680            info.non_null_indices.extend(range.clone());
681        } else {
682            let max_def_level = info.max_def_level;
683            match &info.logical_nulls {
684                Some(nulls) => {
685                    assert!(range.end <= nulls.len());
686                    // Bulk-fill is profitable only on null-heavy ranges long enough to
687                    // amortize the slice/popcount cost; see `BULK_FILL_MIN_LEN` and the
688                    // PR description for the threshold sweep. The gate uses the cached
689                    // buffer-wide `null_count` (O(1)) to stay cheap on the cold path.
690                    if len >= BULK_FILL_MIN_LEN && nulls.null_count() * 2 >= nulls.len() {
691                        let range_nulls = nulls.slice(range.start, len);
692                        let valid_in_range = len - range_nulls.null_count();
693                        let null_def_level = max_def_level - 1;
694                        let buf = info
695                            .def_levels
696                            .materialize_mut()
697                            .expect("definition levels present");
698                        let base = buf.len();
699                        buf.resize(base + len, null_def_level);
700                        for i in range_nulls.valid_indices() {
701                            buf[base + i] = max_def_level;
702                        }
703                        info.non_null_indices.reserve(valid_in_range);
704                        info.non_null_indices
705                            .extend(range_nulls.valid_indices().map(|i| i + range.start));
706                    } else {
707                        let bits = nulls.inner();
708                        info.def_levels.extend_from_iter(range.clone().map(|i| {
709                            // Safety: range.end was asserted to be in bounds earlier
710                            let valid = unsafe { bits.value_unchecked(i) };
711                            max_def_level - (!valid as i16)
712                        }));
713                        info.non_null_indices.reserve(len);
714                        info.non_null_indices.extend(
715                            BitIndexIterator::new(bits.inner(), bits.offset() + range.start, len)
716                                .map(|i| i + range.start),
717                        );
718                    }
719                }
720                None => {
721                    info.append_def_level_run(max_def_level, len);
722                    info.non_null_indices.reserve(len);
723                    info.non_null_indices.extend(range.clone());
724                }
725            }
726        }
727
728        if !matches!(info.rep_levels, LevelData::Absent) {
729            info.append_rep_level_run(info.max_rep_level, len);
730        }
731    }
732
733    /// Visits all children of this node in depth first order
734    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
735        match self {
736            LevelInfoBuilder::Primitive(info) => visit(info),
737            LevelInfoBuilder::List(c, _, _, _)
738            | LevelInfoBuilder::LargeList(c, _, _, _)
739            | LevelInfoBuilder::FixedSizeList(c, _, _, _)
740            | LevelInfoBuilder::ListView(c, _, _, _, _)
741            | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
742            LevelInfoBuilder::Struct(children, _, _) => {
743                for c in children {
744                    c.visit_leaves(visit)
745                }
746            }
747        }
748    }
749
750    /// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
751    ///
752    /// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
753    /// and the other is a native array, the dictionary values must have the same type as the
754    /// native array
755    fn types_compatible(a: &DataType, b: &DataType) -> bool {
756        // if the Arrow data types are equal, the types are deemed compatible
757        if a.equals_datatype(b) {
758            return true;
759        }
760
761        // get the values out of the dictionaries
762        let (a, b) = match (a, b) {
763            (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
764                (va.as_ref(), vb.as_ref())
765            }
766            (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
767            (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
768            _ => (a, b),
769        };
770
771        // now that we've got the values from one/both dictionaries, if the values
772        // have the same Arrow data type, they're compatible
773        if a == b {
774            return true;
775        }
776
777        // here we have different Arrow data types, but if the array contains the same type of data
778        // then we consider the type compatible
779        match a {
780            // String, StringView and LargeString are compatible
781            DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
782            DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
783            DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
784
785            // Binary, BinaryView and LargeBinary are compatible
786            DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
787            DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
788            DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
789
790            // otherwise we have incompatible types
791            _ => false,
792        }
793    }
794}
795
796/// The data necessary to write a primitive Arrow array to parquet, taking into account
797/// any non-primitive parents it may have in the arrow representation
798#[derive(Debug, Clone)]
799pub(crate) enum LevelData {
800    Absent,
801    Materialized(Vec<i16>),
802    Uniform { value: i16, count: usize },
803}
804
805// Compare logical level contents rather than physical representation, so a
806// uniform run compares equal to the equivalent materialized buffer.
807impl PartialEq for LevelData {
808    fn eq(&self, other: &Self) -> bool {
809        match (self, other) {
810            (Self::Absent, Self::Absent) => true,
811            (Self::Materialized(a), Self::Materialized(b)) => a == b,
812            (Self::Uniform { value: v, count: n }, Self::Materialized(b))
813            | (Self::Materialized(b), Self::Uniform { value: v, count: n }) => {
814                b.len() == *n && b.iter().all(|x| x == v)
815            }
816            (
817                Self::Uniform {
818                    value: v1,
819                    count: n1,
820                },
821                Self::Uniform {
822                    value: v2,
823                    count: n2,
824                },
825            ) => v1 == v2 && n1 == n2,
826            _ => false,
827        }
828    }
829}
830
831impl Eq for LevelData {}
832
833impl LevelData {
834    fn new(present: bool) -> Self {
835        match present {
836            true => Self::Materialized(Vec::new()),
837            false => Self::Absent,
838        }
839    }
840
841    pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
842        match self {
843            Self::Absent => LevelDataRef::Absent,
844            Self::Materialized(values) => LevelDataRef::Materialized(values),
845            Self::Uniform { value, count } => LevelDataRef::Uniform {
846                value: *value,
847                count: *count,
848            },
849        }
850    }
851
852    pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
853        match self {
854            Self::Absent => Self::Absent,
855            Self::Materialized(values) => Self::Materialized(values[offset..offset + len].to_vec()),
856            Self::Uniform { value, .. } => Self::Uniform {
857                value: *value,
858                count: len,
859            },
860        }
861    }
862
863    fn append_run(&mut self, value: i16, count: usize) {
864        if count == 0 {
865            return;
866        }
867
868        match self {
869            // No physical level stream exists for this schema. Higher-level
870            // traversal may still append implicit levels, so this remains a no-op.
871            Self::Absent => {}
872            // Start compact: the first appended run can be represented without
873            // allocating a level buffer.
874            Self::Materialized(values) if values.is_empty() => {
875                *self = Self::Uniform { value, count };
876            }
877            // Already materialized, so preserve the buffer representation and append.
878            Self::Materialized(values) => values.extend(std::iter::repeat_n(value, count)),
879            // Preserve the compact representation while the appended run has
880            // the same value.
881            Self::Uniform {
882                value: uniform_value,
883                count: uniform_count,
884            } if *uniform_value == value => {
885                *uniform_count += count;
886            }
887            // A different value breaks the uniform representation. Materialize
888            // the existing run, then append the new run to the buffer.
889            Self::Uniform { .. } => {
890                let values = self.materialize_mut().unwrap();
891                values.extend(std::iter::repeat_n(value, count));
892            }
893        }
894    }
895
896    fn extend_from_iter<I>(&mut self, iter: I)
897    where
898        I: IntoIterator<Item = i16>,
899    {
900        if let Some(values) = self.materialize_mut() {
901            values.extend(iter);
902        }
903    }
904
905    /// Convert a uniform run into a materialized buffer if needed, then return
906    /// the mutable level buffer. Returns `None` when no physical level stream exists.
907    fn materialize_mut(&mut self) -> Option<&mut Vec<i16>> {
908        match self {
909            Self::Absent => None,
910            Self::Materialized(values) => Some(values),
911            Self::Uniform { value, count } => {
912                let values = vec![*value; *count];
913                *self = Self::Materialized(values);
914                match self {
915                    Self::Materialized(values) => Some(values),
916                    _ => unreachable!(),
917                }
918            }
919        }
920    }
921}
922
923#[derive(Debug, Clone)]
924pub(crate) struct ArrayLevels {
925    /// Array's definition levels
926    ///
927    /// Present if `max_def_level != 0`
928    def_levels: LevelData,
929
930    /// Array's optional repetition levels
931    ///
932    /// Present if `max_rep_level != 0`
933    rep_levels: LevelData,
934
935    /// The corresponding array identifying non-null slices of data
936    /// from the primitive array
937    non_null_indices: Vec<usize>,
938
939    /// The maximum definition level for this leaf column
940    max_def_level: i16,
941
942    /// The maximum repetition for this leaf column
943    max_rep_level: i16,
944
945    /// The arrow array
946    array: ArrayRef,
947
948    /// cached logical nulls of the array.
949    logical_nulls: Option<NullBuffer>,
950}
951
952impl PartialEq for ArrayLevels {
953    fn eq(&self, other: &Self) -> bool {
954        self.def_levels == other.def_levels
955            && self.rep_levels == other.rep_levels
956            && self.non_null_indices == other.non_null_indices
957            && self.max_def_level == other.max_def_level
958            && self.max_rep_level == other.max_rep_level
959            && self.array.as_ref() == other.array.as_ref()
960            && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
961    }
962}
963impl Eq for ArrayLevels {}
964
965impl ArrayLevels {
966    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
967        let max_rep_level = ctx.rep_level;
968        let max_def_level = match is_nullable {
969            true => ctx.def_level + 1,
970            false => ctx.def_level,
971        };
972
973        let logical_nulls = array.logical_nulls();
974
975        Self {
976            def_levels: LevelData::new(max_def_level != 0),
977            rep_levels: LevelData::new(max_rep_level != 0),
978            non_null_indices: vec![],
979            max_def_level,
980            max_rep_level,
981            array,
982            logical_nulls,
983        }
984    }
985
986    pub fn array(&self) -> &ArrayRef {
987        &self.array
988    }
989
990    pub(crate) fn def_level_data(&self) -> &LevelData {
991        &self.def_levels
992    }
993
994    pub(crate) fn rep_level_data(&self) -> &LevelData {
995        &self.rep_levels
996    }
997
998    pub fn non_null_indices(&self) -> &[usize] {
999        &self.non_null_indices
1000    }
1001
1002    /// Create a sliced view of this `ArrayLevels` for a CDC chunk.
1003    ///
1004    /// The chunk's `value_offset`/`num_values` select the relevant slice of
1005    /// `non_null_indices`. The array is sliced to the range covered by
1006    /// those indices, and they are shifted to be relative to the slice.
1007    pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
1008        let def_levels = self.def_levels.slice(chunk.level_offset, chunk.num_levels);
1009        let rep_levels = self.rep_levels.slice(chunk.level_offset, chunk.num_levels);
1010
1011        // Select the non-null indices for this chunk.
1012        let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
1013        // Compute the array range spanned by the non-null indices.
1014        // When nni is empty (all-null chunk), start=0, end=0 → zero-length
1015        // array slice; write_batch_internal will process only the def/rep
1016        // levels and write no values.
1017        let start = nni.first().copied().unwrap_or(0);
1018        let end = nni.last().map_or(0, |&i| i + 1);
1019        // Shift indices to be relative to the sliced array.
1020        let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
1021        // Slice the array to the computed range.
1022        let array = self.array.slice(start, end - start);
1023        let logical_nulls = array.logical_nulls();
1024
1025        Self {
1026            def_levels,
1027            rep_levels,
1028            non_null_indices,
1029            max_def_level: self.max_def_level,
1030            max_rep_level: self.max_rep_level,
1031            array,
1032            logical_nulls,
1033        }
1034    }
1035
1036    /// Bulk-emit `count` uniform def/rep levels.
1037    fn extend_uniform_levels(&mut self, def_val: i16, rep_val: i16, count: usize) {
1038        self.def_levels.append_run(def_val, count);
1039        self.rep_levels.append_run(rep_val, count);
1040    }
1041
1042    fn append_def_level_run(&mut self, value: i16, count: usize) {
1043        self.def_levels.append_run(value, count);
1044    }
1045
1046    fn append_rep_level_run(&mut self, value: i16, count: usize) {
1047        self.rep_levels.append_run(value, count);
1048    }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053    use super::*;
1054    use crate::column::chunker::CdcChunk;
1055
1056    use arrow_array::builder::*;
1057    use arrow_array::types::Int32Type;
1058    use arrow_array::*;
1059    use arrow_buffer::{Buffer, ToByteSlice};
1060    use arrow_cast::display::array_value_to_string;
1061    use arrow_data::{ArrayData, ArrayDataBuilder};
1062    use arrow_schema::{Fields, Schema};
1063
1064    #[test]
1065    fn test_calculate_array_levels_twitter_example() {
1066        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
1067        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
1068
1069        let leaf_type = Field::new_list_field(DataType::Int32, false);
1070        let inner_type = DataType::List(Arc::new(leaf_type));
1071        let inner_field = Field::new("l2", inner_type.clone(), false);
1072        let outer_type = DataType::List(Arc::new(inner_field));
1073        let outer_field = Field::new("l1", outer_type.clone(), false);
1074
1075        let primitives = Int32Array::from_iter(0..10);
1076
1077        // Cannot use from_iter_primitive as always infers nullable
1078        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
1079        let inner_list = ArrayDataBuilder::new(inner_type)
1080            .len(4)
1081            .add_buffer(offsets)
1082            .add_child_data(primitives.to_data())
1083            .build()
1084            .unwrap();
1085
1086        let offsets = Buffer::from_iter([0_i32, 2, 4]);
1087        let outer_list = ArrayDataBuilder::new(outer_type)
1088            .len(2)
1089            .add_buffer(offsets)
1090            .add_child_data(inner_list)
1091            .build()
1092            .unwrap();
1093        let outer_list = make_array(outer_list);
1094
1095        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
1096        assert_eq!(levels.len(), 1);
1097
1098        let expected = ArrayLevels {
1099            def_levels: LevelData::Materialized(vec![2; 10]),
1100            rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
1101            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
1102            max_def_level: 2,
1103            max_rep_level: 2,
1104            array: Arc::new(primitives),
1105            logical_nulls: None,
1106        };
1107        assert_eq!(&levels[0], &expected);
1108    }
1109
1110    #[test]
1111    fn test_calculate_one_level_1() {
1112        // This test calculates the levels for a non-null primitive array
1113        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1114        let field = Field::new_list_field(DataType::Int32, false);
1115
1116        let levels = calculate_array_levels(&array, &field).unwrap();
1117        assert_eq!(levels.len(), 1);
1118
1119        let expected_levels = ArrayLevels {
1120            def_levels: LevelData::Absent,
1121            rep_levels: LevelData::Absent,
1122            non_null_indices: (0..10).collect(),
1123            max_def_level: 0,
1124            max_rep_level: 0,
1125            array,
1126            logical_nulls: None,
1127        };
1128        assert_eq!(&levels[0], &expected_levels);
1129    }
1130
1131    #[test]
1132    fn test_calculate_one_level_2() {
1133        // This test calculates the levels for a nullable primitive array
1134        let array = Arc::new(Int32Array::from_iter([
1135            Some(0),
1136            None,
1137            Some(0),
1138            Some(0),
1139            None,
1140        ])) as ArrayRef;
1141        let field = Field::new_list_field(DataType::Int32, true);
1142
1143        let levels = calculate_array_levels(&array, &field).unwrap();
1144        assert_eq!(levels.len(), 1);
1145
1146        let logical_nulls = array.logical_nulls();
1147        let expected_levels = ArrayLevels {
1148            def_levels: LevelData::Materialized(vec![1, 0, 1, 1, 0]),
1149            rep_levels: LevelData::Absent,
1150            non_null_indices: vec![0, 2, 3],
1151            max_def_level: 1,
1152            max_rep_level: 0,
1153            array,
1154            logical_nulls,
1155        };
1156        assert_eq!(&levels[0], &expected_levels);
1157    }
1158
1159    #[test]
1160    fn test_calculate_array_levels_1() {
1161        let leaf_field = Field::new_list_field(DataType::Int32, false);
1162        let list_type = DataType::List(Arc::new(leaf_field));
1163
1164        // if all array values are defined (e.g. batch<list<_>>)
1165        // [[0], [1], [2], [3], [4]]
1166
1167        let leaf_array = Int32Array::from_iter(0..5);
1168        // Cannot use from_iter_primitive as always infers nullable
1169        let offsets = Buffer::from_iter(0_i32..6);
1170        let list = ArrayDataBuilder::new(list_type.clone())
1171            .len(5)
1172            .add_buffer(offsets)
1173            .add_child_data(leaf_array.to_data())
1174            .build()
1175            .unwrap();
1176        let list = make_array(list);
1177
1178        let list_field = Field::new("list", list_type.clone(), false);
1179        let levels = calculate_array_levels(&list, &list_field).unwrap();
1180        assert_eq!(levels.len(), 1);
1181
1182        let expected_levels = ArrayLevels {
1183            def_levels: LevelData::Materialized(vec![1; 5]),
1184            rep_levels: LevelData::Materialized(vec![0; 5]),
1185            non_null_indices: (0..5).collect(),
1186            max_def_level: 1,
1187            max_rep_level: 1,
1188            array: Arc::new(leaf_array),
1189            logical_nulls: None,
1190        };
1191        assert_eq!(&levels[0], &expected_levels);
1192
1193        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
1194        // all values are defined as we do not have nulls on the root (batch)
1195        // repetition:
1196        //   0: 0, 1
1197        //   1: 0
1198        //   2: 0, 1
1199        //   3: 0, 1, 1, 1
1200        //   4: 0, 1, 1
1201        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
1202        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
1203        let list = ArrayDataBuilder::new(list_type.clone())
1204            .len(5)
1205            .add_buffer(offsets)
1206            .add_child_data(leaf_array.to_data())
1207            .null_bit_buffer(Some(Buffer::from([0b00011101])))
1208            .build()
1209            .unwrap();
1210        let list = make_array(list);
1211
1212        let list_field = Field::new("list", list_type, true);
1213        let levels = calculate_array_levels(&list, &list_field).unwrap();
1214        assert_eq!(levels.len(), 1);
1215
1216        let expected_levels = ArrayLevels {
1217            def_levels: LevelData::Materialized(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1218            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1219            non_null_indices: (0..11).collect(),
1220            max_def_level: 2,
1221            max_rep_level: 1,
1222            array: Arc::new(leaf_array),
1223            logical_nulls: None,
1224        };
1225        assert_eq!(&levels[0], &expected_levels);
1226    }
1227
1228    #[test]
1229    fn test_calculate_array_levels_2() {
1230        // If some values are null
1231        //
1232        // This emulates an array in the form: <struct<list<?>>
1233        // with values:
1234        // - 0: [0, 1], but is null because of the struct
1235        // - 1: []
1236        // - 2: [2, 3], but is null because of the struct
1237        // - 3: [4, 5, 6, 7]
1238        // - 4: [8, 9, 10]
1239        //
1240        // If the first values of a list are null due to a parent, we have to still account for them
1241        // while indexing, because they would affect the way the child is indexed
1242        // i.e. in the above example, we have to know that [0, 1] has to be skipped
1243        let leaf = Int32Array::from_iter(0..11);
1244        let leaf_field = Field::new("leaf", DataType::Int32, false);
1245
1246        let list_type = DataType::List(Arc::new(leaf_field));
1247        let list = ArrayData::builder(list_type.clone())
1248            .len(5)
1249            .add_child_data(leaf.to_data())
1250            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1251            .build()
1252            .unwrap();
1253
1254        let list = make_array(list);
1255        let list_field = Arc::new(Field::new("list", list_type, true));
1256
1257        let struct_array =
1258            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1259        let array = Arc::new(struct_array) as ArrayRef;
1260
1261        let struct_field = Field::new("struct", array.data_type().clone(), true);
1262
1263        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1264        assert_eq!(levels.len(), 1);
1265
1266        let expected_levels = ArrayLevels {
1267            def_levels: LevelData::Materialized(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1268            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1269            non_null_indices: (4..11).collect(),
1270            max_def_level: 3,
1271            max_rep_level: 1,
1272            array: Arc::new(leaf),
1273            logical_nulls: None,
1274        };
1275
1276        assert_eq!(&levels[0], &expected_levels);
1277
1278        // nested lists
1279
1280        // 0: [[100, 101], [102, 103]]
1281        // 1: []
1282        // 2: [[104, 105], [106, 107]]
1283        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
1284        // 4: [[116, 117], [118, 119], [120, 121]]
1285
1286        let leaf = Int32Array::from_iter(100..122);
1287        let leaf_field = Field::new("leaf", DataType::Int32, true);
1288
1289        let l1_type = DataType::List(Arc::new(leaf_field));
1290        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1291        let l1 = ArrayData::builder(l1_type.clone())
1292            .len(11)
1293            .add_child_data(leaf.to_data())
1294            .add_buffer(offsets)
1295            .build()
1296            .unwrap();
1297
1298        let l1_field = Field::new("l1", l1_type, true);
1299        let l2_type = DataType::List(Arc::new(l1_field));
1300        let l2 = ArrayData::builder(l2_type)
1301            .len(5)
1302            .add_child_data(l1)
1303            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1304            .build()
1305            .unwrap();
1306
1307        let l2 = make_array(l2);
1308        let l2_field = Field::new("l2", l2.data_type().clone(), true);
1309
1310        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1311        assert_eq!(levels.len(), 1);
1312
1313        let expected_levels = ArrayLevels {
1314            def_levels: LevelData::Materialized(vec![
1315                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1316            ]),
1317            rep_levels: LevelData::Materialized(vec![
1318                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1319            ]),
1320            non_null_indices: (0..22).collect(),
1321            max_def_level: 5,
1322            max_rep_level: 2,
1323            array: Arc::new(leaf),
1324            logical_nulls: None,
1325        };
1326
1327        assert_eq!(&levels[0], &expected_levels);
1328    }
1329
1330    #[test]
1331    fn test_calculate_array_levels_nested_list() {
1332        let leaf_field = Field::new("leaf", DataType::Int32, false);
1333        let list_type = DataType::List(Arc::new(leaf_field));
1334
1335        // if all array values are defined (e.g. batch<list<_>>)
1336        // The array at this level looks like:
1337        // 0: [a]
1338        // 1: [a]
1339        // 2: [a]
1340        // 3: [a]
1341
1342        let leaf = Int32Array::from_iter([0; 4]);
1343        let list = ArrayData::builder(list_type.clone())
1344            .len(4)
1345            .add_buffer(Buffer::from_iter(0_i32..5))
1346            .add_child_data(leaf.to_data())
1347            .build()
1348            .unwrap();
1349        let list = make_array(list);
1350
1351        let list_field = Field::new("list", list_type.clone(), false);
1352        let levels = calculate_array_levels(&list, &list_field).unwrap();
1353        assert_eq!(levels.len(), 1);
1354
1355        let expected_levels = ArrayLevels {
1356            def_levels: LevelData::Materialized(vec![1; 4]),
1357            rep_levels: LevelData::Materialized(vec![0; 4]),
1358            non_null_indices: (0..4).collect(),
1359            max_def_level: 1,
1360            max_rep_level: 1,
1361            array: Arc::new(leaf),
1362            logical_nulls: None,
1363        };
1364        assert_eq!(&levels[0], &expected_levels);
1365
1366        // 0: null
1367        // 1: [1, 2, 3]
1368        // 2: [4, 5]
1369        // 3: [6, 7]
1370        let leaf = Int32Array::from_iter(0..8);
1371        let list = ArrayData::builder(list_type.clone())
1372            .len(4)
1373            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1374            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1375            .add_child_data(leaf.to_data())
1376            .build()
1377            .unwrap();
1378        let list = make_array(list);
1379        let list_field = Arc::new(Field::new("list", list_type, true));
1380
1381        let struct_array = StructArray::from(vec![(list_field, list)]);
1382        let array = Arc::new(struct_array) as ArrayRef;
1383
1384        let struct_field = Field::new("struct", array.data_type().clone(), true);
1385        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1386        assert_eq!(levels.len(), 1);
1387
1388        let expected_levels = ArrayLevels {
1389            def_levels: LevelData::Materialized(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1390            rep_levels: LevelData::Materialized(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1391            non_null_indices: (0..7).collect(),
1392            max_def_level: 3,
1393            max_rep_level: 1,
1394            array: Arc::new(leaf),
1395            logical_nulls: None,
1396        };
1397        assert_eq!(&levels[0], &expected_levels);
1398
1399        // nested lists
1400        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
1401        // 0: {"struct": null }
1402        // 1: {"struct": [ [201], [202, 203], [] ]}
1403        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
1404        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
1405
1406        let leaf = Int32Array::from_iter(201..216);
1407        let leaf_field = Field::new("leaf", DataType::Int32, false);
1408        let list_1_type = DataType::List(Arc::new(leaf_field));
1409        let list_1 = ArrayData::builder(list_1_type.clone())
1410            .len(7)
1411            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1412            .add_child_data(leaf.to_data())
1413            .build()
1414            .unwrap();
1415
1416        let list_1_field = Field::new("l1", list_1_type, true);
1417        let list_2_type = DataType::List(Arc::new(list_1_field));
1418        let list_2 = ArrayData::builder(list_2_type.clone())
1419            .len(4)
1420            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1421            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1422            .add_child_data(list_1)
1423            .build()
1424            .unwrap();
1425
1426        let list_2 = make_array(list_2);
1427        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1428
1429        let struct_array =
1430            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1431        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1432
1433        let array = Arc::new(struct_array) as ArrayRef;
1434        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1435        assert_eq!(levels.len(), 1);
1436
1437        let expected_levels = ArrayLevels {
1438            def_levels: LevelData::Materialized(vec![
1439                1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5,
1440            ]),
1441            rep_levels: LevelData::Materialized(vec![
1442                0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2,
1443            ]),
1444            non_null_indices: (0..15).collect(),
1445            max_def_level: 5,
1446            max_rep_level: 2,
1447            array: Arc::new(leaf),
1448            logical_nulls: None,
1449        };
1450        assert_eq!(&levels[0], &expected_levels);
1451    }
1452
1453    #[test]
1454    fn test_calculate_nested_struct_levels() {
1455        // tests a <struct[a]<struct[b]<int[c]>>
1456        // array:
1457        //  - {a: {b: {c: 1}}}
1458        //  - {a: {b: {c: null}}}
1459        //  - {a: {b: {c: 3}}}
1460        //  - {a: {b: null}}
1461        //  - {a: null}}
1462        //  - {a: {b: {c: 6}}}
1463
1464        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1465        let leaf = Arc::new(c) as ArrayRef;
1466        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1467        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1468
1469        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1470        let a = StructArray::from((
1471            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1472            Buffer::from([0b00101111]),
1473        ));
1474
1475        let a_field = Field::new("a", a.data_type().clone(), true);
1476        let a_array = Arc::new(a) as ArrayRef;
1477
1478        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1479        assert_eq!(levels.len(), 1);
1480
1481        let logical_nulls = leaf.logical_nulls();
1482        let expected_levels = ArrayLevels {
1483            def_levels: LevelData::Materialized(vec![3, 2, 3, 1, 0, 3]),
1484            rep_levels: LevelData::Absent,
1485            non_null_indices: vec![0, 2, 5],
1486            max_def_level: 3,
1487            max_rep_level: 0,
1488            array: leaf,
1489            logical_nulls,
1490        };
1491        assert_eq!(&levels[0], &expected_levels);
1492    }
1493
1494    #[test]
1495    fn list_single_column() {
1496        // this tests the level generation from the arrow_writer equivalent test
1497
1498        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1499        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1500        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1501        let a_list_data = ArrayData::builder(a_list_type.clone())
1502            .len(5)
1503            .add_buffer(a_value_offsets)
1504            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1505            .add_child_data(a_values.to_data())
1506            .build()
1507            .unwrap();
1508
1509        assert_eq!(a_list_data.null_count(), 1);
1510
1511        let a = ListArray::from(a_list_data);
1512
1513        let item_field = Field::new_list_field(a_list_type, true);
1514        let mut builder = levels(&item_field, a);
1515        builder.write(2..4);
1516        let levels = builder.finish();
1517
1518        assert_eq!(levels.len(), 1);
1519
1520        let list_level = &levels[0];
1521
1522        let expected_level = ArrayLevels {
1523            def_levels: LevelData::Materialized(vec![0, 3, 3, 3]),
1524            rep_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
1525            non_null_indices: vec![3, 4, 5],
1526            max_def_level: 3,
1527            max_rep_level: 1,
1528            array: Arc::new(a_values),
1529            logical_nulls: None,
1530        };
1531        assert_eq!(list_level, &expected_level);
1532    }
1533
1534    #[test]
1535    fn mixed_struct_list() {
1536        // this tests the level generation from the equivalent arrow_writer_complex test
1537
1538        // define schema
1539        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1540        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1541        let struct_field_g = Arc::new(Field::new(
1542            "g",
1543            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1544            false,
1545        ));
1546        let struct_field_e = Arc::new(Field::new(
1547            "e",
1548            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1549            true,
1550        ));
1551        let schema = Schema::new(vec![
1552            Field::new("a", DataType::Int32, false),
1553            Field::new("b", DataType::Int32, true),
1554            Field::new(
1555                "c",
1556                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1557                true, // https://github.com/apache/arrow-rs/issues/245
1558            ),
1559        ]);
1560
1561        // create some data
1562        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1563        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1564        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1565        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1566
1567        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1568
1569        // Construct a buffer for value offsets, for the nested array:
1570        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1571        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1572
1573        // Construct a list array from the above two
1574        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1575            .len(5)
1576            .add_buffer(g_value_offsets)
1577            .add_child_data(g_value.into_data())
1578            .build()
1579            .unwrap();
1580        let g = ListArray::from(g_list_data);
1581
1582        let e = StructArray::from(vec![
1583            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1584            (struct_field_g, Arc::new(g) as ArrayRef),
1585        ]);
1586
1587        let c = StructArray::from(vec![
1588            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1589            (struct_field_e, Arc::new(e) as ArrayRef),
1590        ]);
1591
1592        // build a record batch
1593        let batch = RecordBatch::try_new(
1594            Arc::new(schema),
1595            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1596        )
1597        .unwrap();
1598
1599        //////////////////////////////////////////////
1600        // calculate the list's level
1601        let mut levels = vec![];
1602        batch
1603            .columns()
1604            .iter()
1605            .zip(batch.schema().fields())
1606            .for_each(|(array, field)| {
1607                let mut array_levels = calculate_array_levels(array, field).unwrap();
1608                levels.append(&mut array_levels);
1609            });
1610        assert_eq!(levels.len(), 5);
1611
1612        // test "a" levels
1613        let list_level = &levels[0];
1614
1615        let expected_level = ArrayLevels {
1616            def_levels: LevelData::Absent,
1617            rep_levels: LevelData::Absent,
1618            non_null_indices: vec![0, 1, 2, 3, 4],
1619            max_def_level: 0,
1620            max_rep_level: 0,
1621            array: Arc::new(a),
1622            logical_nulls: None,
1623        };
1624        assert_eq!(list_level, &expected_level);
1625
1626        // test "b" levels
1627        let list_level = levels.get(1).unwrap();
1628
1629        let b_logical_nulls = b.logical_nulls();
1630        let expected_level = ArrayLevels {
1631            def_levels: LevelData::Materialized(vec![1, 0, 0, 1, 1]),
1632            rep_levels: LevelData::Absent,
1633            non_null_indices: vec![0, 3, 4],
1634            max_def_level: 1,
1635            max_rep_level: 0,
1636            array: Arc::new(b),
1637            logical_nulls: b_logical_nulls,
1638        };
1639        assert_eq!(list_level, &expected_level);
1640
1641        // test "d" levels
1642        let list_level = levels.get(2).unwrap();
1643
1644        let d_logical_nulls = d.logical_nulls();
1645        let expected_level = ArrayLevels {
1646            def_levels: LevelData::Materialized(vec![1, 1, 1, 2, 1]),
1647            rep_levels: LevelData::Absent,
1648            non_null_indices: vec![3],
1649            max_def_level: 2,
1650            max_rep_level: 0,
1651            array: Arc::new(d),
1652            logical_nulls: d_logical_nulls,
1653        };
1654        assert_eq!(list_level, &expected_level);
1655
1656        // test "f" levels
1657        let list_level = levels.get(3).unwrap();
1658
1659        let f_logical_nulls = f.logical_nulls();
1660        let expected_level = ArrayLevels {
1661            def_levels: LevelData::Materialized(vec![3, 2, 3, 2, 3]),
1662            rep_levels: LevelData::Absent,
1663            non_null_indices: vec![0, 2, 4],
1664            max_def_level: 3,
1665            max_rep_level: 0,
1666            array: Arc::new(f),
1667            logical_nulls: f_logical_nulls,
1668        };
1669        assert_eq!(list_level, &expected_level);
1670    }
1671
1672    #[test]
1673    fn test_null_vs_nonnull_struct() {
1674        // define schema
1675        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1676        let schema = Schema::new(vec![Field::new(
1677            "some_nested_object",
1678            DataType::Struct(vec![offset_field.clone()].into()),
1679            false,
1680        )]);
1681
1682        // create some data
1683        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1684
1685        let some_nested_object =
1686            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1687
1688        // build a record batch
1689        let batch =
1690            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1691
1692        let struct_null_level =
1693            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1694
1695        // create second batch
1696        // define schema
1697        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1698        let schema = Schema::new(vec![Field::new(
1699            "some_nested_object",
1700            DataType::Struct(vec![offset_field.clone()].into()),
1701            true,
1702        )]);
1703
1704        // create some data
1705        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1706
1707        let some_nested_object =
1708            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1709
1710        // build a record batch
1711        let batch =
1712            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1713
1714        let struct_non_null_level =
1715            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1716
1717        // The 2 levels should not be the same
1718        if struct_non_null_level == struct_null_level {
1719            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1720        }
1721    }
1722
1723    #[test]
1724    fn test_map_array() {
1725        // Note: we are using the JSON Arrow reader for brevity
1726        let json_content = r#"
1727        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1728        {"stocks":{"long": "$CCC", "short": null}}
1729        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1730        "#;
1731        let entries_struct_type = DataType::Struct(Fields::from(vec![
1732            Field::new("key", DataType::Utf8, false),
1733            Field::new("value", DataType::Utf8, true),
1734        ]));
1735        let stocks_field = Field::new(
1736            "stocks",
1737            DataType::Map(
1738                Arc::new(Field::new("entries", entries_struct_type, false)),
1739                false,
1740            ),
1741            // not nullable, so the keys have max level = 1
1742            false,
1743        );
1744        let schema = Arc::new(Schema::new(vec![stocks_field]));
1745        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1746        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1747
1748        let batch = reader.next().unwrap().unwrap();
1749
1750        // calculate the map's level
1751        let mut levels = vec![];
1752        batch
1753            .columns()
1754            .iter()
1755            .zip(batch.schema().fields())
1756            .for_each(|(array, field)| {
1757                let mut array_levels = calculate_array_levels(array, field).unwrap();
1758                levels.append(&mut array_levels);
1759            });
1760        assert_eq!(levels.len(), 2);
1761
1762        let map = batch.column(0).as_map();
1763        let map_keys_logical_nulls = map.keys().logical_nulls();
1764
1765        // test key levels
1766        let list_level = &levels[0];
1767
1768        let expected_level = ArrayLevels {
1769            def_levels: LevelData::Materialized(vec![1; 7]),
1770            rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1771            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1772            max_def_level: 1,
1773            max_rep_level: 1,
1774            array: map.keys().clone(),
1775            logical_nulls: map_keys_logical_nulls,
1776        };
1777        assert_eq!(list_level, &expected_level);
1778
1779        // test values levels
1780        let list_level = levels.get(1).unwrap();
1781        let map_values_logical_nulls = map.values().logical_nulls();
1782
1783        let expected_level = ArrayLevels {
1784            def_levels: LevelData::Materialized(vec![2, 2, 2, 1, 2, 1, 2]),
1785            rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1786            non_null_indices: vec![0, 1, 2, 4, 6],
1787            max_def_level: 2,
1788            max_rep_level: 1,
1789            array: map.values().clone(),
1790            logical_nulls: map_values_logical_nulls,
1791        };
1792        assert_eq!(list_level, &expected_level);
1793    }
1794
1795    #[test]
1796    fn test_list_of_struct() {
1797        // define schema
1798        let int_field = Field::new("a", DataType::Int32, true);
1799        let fields = Fields::from([Arc::new(int_field)]);
1800        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1801        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1802
1803        let int_builder = Int32Builder::with_capacity(10);
1804        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1805        let mut list_builder = ListBuilder::new(struct_builder);
1806
1807        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
1808        //
1809        // [{a: 1}]
1810        let values = list_builder.values();
1811        values
1812            .field_builder::<Int32Builder>(0)
1813            .unwrap()
1814            .append_value(1);
1815        values.append(true);
1816        list_builder.append(true);
1817
1818        // []
1819        list_builder.append(true);
1820
1821        // null
1822        list_builder.append(false);
1823
1824        // [null, null]
1825        let values = list_builder.values();
1826        values
1827            .field_builder::<Int32Builder>(0)
1828            .unwrap()
1829            .append_null();
1830        values.append(false);
1831        values
1832            .field_builder::<Int32Builder>(0)
1833            .unwrap()
1834            .append_null();
1835        values.append(false);
1836        list_builder.append(true);
1837
1838        // [{a: null}]
1839        let values = list_builder.values();
1840        values
1841            .field_builder::<Int32Builder>(0)
1842            .unwrap()
1843            .append_null();
1844        values.append(true);
1845        list_builder.append(true);
1846
1847        // [{a: 2}]
1848        let values = list_builder.values();
1849        values
1850            .field_builder::<Int32Builder>(0)
1851            .unwrap()
1852            .append_value(2);
1853        values.append(true);
1854        list_builder.append(true);
1855
1856        let array = Arc::new(list_builder.finish());
1857
1858        let values = array.values().as_struct().column(0).clone();
1859        let values_len = values.len();
1860        assert_eq!(values_len, 5);
1861
1862        let schema = Arc::new(Schema::new(vec![list_field]));
1863
1864        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1865
1866        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1867        let list_level = &levels[0];
1868
1869        let logical_nulls = values.logical_nulls();
1870        let expected_level = ArrayLevels {
1871            def_levels: LevelData::Materialized(vec![4, 1, 0, 2, 2, 3, 4]),
1872            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 0, 0]),
1873            non_null_indices: vec![0, 4],
1874            max_def_level: 4,
1875            max_rep_level: 1,
1876            array: values,
1877            logical_nulls,
1878        };
1879
1880        assert_eq!(list_level, &expected_level);
1881    }
1882
1883    #[test]
1884    fn test_struct_mask_list() {
1885        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
1886        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1887            Some(vec![Some(1), Some(2)]),
1888            Some(vec![None]),
1889            Some(vec![]),
1890            Some(vec![Some(3), None]), // Masked by struct array
1891            Some(vec![Some(4), Some(5)]),
1892            None, // Masked by struct array
1893            None,
1894        ]);
1895        let values = inner.values().clone();
1896
1897        // This test assumes that nulls don't take up space
1898        assert_eq!(inner.values().len(), 7);
1899
1900        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1901        let array = Arc::new(inner) as ArrayRef;
1902        let nulls = Buffer::from([0b01010111]);
1903        let struct_a = StructArray::from((vec![(field, array)], nulls));
1904
1905        let field = Field::new("struct", struct_a.data_type().clone(), true);
1906        let array = Arc::new(struct_a) as ArrayRef;
1907        let levels = calculate_array_levels(&array, &field).unwrap();
1908
1909        assert_eq!(levels.len(), 1);
1910
1911        let logical_nulls = values.logical_nulls();
1912        let expected_level = ArrayLevels {
1913            def_levels: LevelData::Materialized(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1914            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1915            non_null_indices: vec![0, 1, 5, 6],
1916            max_def_level: 4,
1917            max_rep_level: 1,
1918            array: values,
1919            logical_nulls,
1920        };
1921
1922        assert_eq!(&levels[0], &expected_level);
1923    }
1924
1925    #[test]
1926    fn test_list_mask_struct() {
1927        // Test the null mask of a struct array and the null mask of a list array
1928        // masking out non-null elements of their children
1929
1930        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1931            Some(vec![None]), // Masked by list array
1932            Some(vec![]),     // Masked by list array
1933            Some(vec![Some(3), None]),
1934            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
1935            None,
1936            None,
1937        ]);
1938        let a1_values = a1.values().clone();
1939        let a1 = Arc::new(a1) as ArrayRef;
1940
1941        let a2 = Arc::new(Int32Array::from_iter(vec![
1942            Some(1), // Masked by list array
1943            Some(2), // Masked by list array
1944            None,
1945            Some(4), // Masked by struct array
1946            Some(5),
1947            None,
1948        ])) as ArrayRef;
1949        let a2_values = a2.clone();
1950
1951        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1952        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1953
1954        let nulls = Buffer::from([0b00110111]);
1955        let struct_a = Arc::new(StructArray::from((
1956            vec![(field_a1, a1), (field_a2, a2)],
1957            nulls,
1958        ))) as ArrayRef;
1959
1960        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1961        let nulls = Buffer::from([0b00111100]);
1962
1963        let list_type = DataType::List(Arc::new(Field::new(
1964            "struct",
1965            struct_a.data_type().clone(),
1966            true,
1967        )));
1968
1969        let data = ArrayDataBuilder::new(list_type.clone())
1970            .len(6)
1971            .null_bit_buffer(Some(nulls))
1972            .add_buffer(offsets)
1973            .add_child_data(struct_a.into_data())
1974            .build()
1975            .unwrap();
1976
1977        let list = make_array(data);
1978        let list_field = Field::new("col", list_type, true);
1979
1980        let expected = vec![
1981            r#""#.to_string(),
1982            r#""#.to_string(),
1983            r#"[]"#.to_string(),
1984            r#"[{list: [3, ], integers: }]"#.to_string(),
1985            r#"[, {list: , integers: 5}]"#.to_string(),
1986            r#"[]"#.to_string(),
1987        ];
1988
1989        let actual: Vec<_> = (0..6)
1990            .map(|x| array_value_to_string(&list, x).unwrap())
1991            .collect();
1992        assert_eq!(actual, expected);
1993
1994        let levels = calculate_array_levels(&list, &list_field).unwrap();
1995
1996        assert_eq!(levels.len(), 2);
1997
1998        let a1_logical_nulls = a1_values.logical_nulls();
1999        let expected_level = ArrayLevels {
2000            def_levels: LevelData::Materialized(vec![0, 0, 1, 6, 5, 2, 3, 1]),
2001            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 2, 0, 1, 0]),
2002            non_null_indices: vec![1],
2003            max_def_level: 6,
2004            max_rep_level: 2,
2005            array: a1_values,
2006            logical_nulls: a1_logical_nulls,
2007        };
2008
2009        assert_eq!(&levels[0], &expected_level);
2010
2011        let a2_logical_nulls = a2_values.logical_nulls();
2012        let expected_level = ArrayLevels {
2013            def_levels: LevelData::Materialized(vec![0, 0, 1, 3, 2, 4, 1]),
2014            rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 0, 1, 0]),
2015            non_null_indices: vec![4],
2016            max_def_level: 4,
2017            max_rep_level: 1,
2018            array: a2_values,
2019            logical_nulls: a2_logical_nulls,
2020        };
2021
2022        assert_eq!(&levels[1], &expected_level);
2023    }
2024
2025    #[test]
2026    fn test_fixed_size_list() {
2027        // [[1, 2], null, null, [7, 8], null]
2028        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
2029        builder.values().append_slice(&[1, 2]);
2030        builder.append(true);
2031        builder.values().append_slice(&[3, 4]);
2032        builder.append(false);
2033        builder.values().append_slice(&[5, 6]);
2034        builder.append(false);
2035        builder.values().append_slice(&[7, 8]);
2036        builder.append(true);
2037        builder.values().append_slice(&[9, 10]);
2038        builder.append(false);
2039        let a = builder.finish();
2040        let values = a.values().clone();
2041
2042        let item_field = Field::new_list_field(a.data_type().clone(), true);
2043        let mut builder = levels(&item_field, a);
2044        builder.write(1..4);
2045        let levels = builder.finish();
2046
2047        assert_eq!(levels.len(), 1);
2048
2049        let list_level = &levels[0];
2050
2051        let logical_nulls = values.logical_nulls();
2052        let expected_level = ArrayLevels {
2053            def_levels: LevelData::Materialized(vec![0, 0, 3, 3]),
2054            rep_levels: LevelData::Materialized(vec![0, 0, 0, 1]),
2055            non_null_indices: vec![6, 7],
2056            max_def_level: 3,
2057            max_rep_level: 1,
2058            array: values,
2059            logical_nulls,
2060        };
2061        assert_eq!(list_level, &expected_level);
2062    }
2063
2064    #[test]
2065    fn test_fixed_size_list_of_struct() {
2066        // define schema
2067        let field_a = Field::new("a", DataType::Int32, true);
2068        let field_b = Field::new("b", DataType::Int64, false);
2069        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
2070        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
2071        let list_field = Field::new(
2072            "list",
2073            DataType::FixedSizeList(Arc::new(item_field), 2),
2074            true,
2075        );
2076
2077        let builder_a = Int32Builder::with_capacity(10);
2078        let builder_b = Int64Builder::with_capacity(10);
2079        let struct_builder =
2080            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
2081        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
2082
2083        // [
2084        //   [{a: 1, b: 2}, null],
2085        //   null,
2086        //   [null, null],
2087        //   [{a: null, b: 3}, {a: 2, b: 4}]
2088        // ]
2089
2090        // [{a: 1, b: 2}, null]
2091        let values = list_builder.values();
2092        // {a: 1, b: 2}
2093        values
2094            .field_builder::<Int32Builder>(0)
2095            .unwrap()
2096            .append_value(1);
2097        values
2098            .field_builder::<Int64Builder>(1)
2099            .unwrap()
2100            .append_value(2);
2101        values.append(true);
2102        // null
2103        values
2104            .field_builder::<Int32Builder>(0)
2105            .unwrap()
2106            .append_null();
2107        values
2108            .field_builder::<Int64Builder>(1)
2109            .unwrap()
2110            .append_value(0);
2111        values.append(false);
2112        list_builder.append(true);
2113
2114        // null
2115        let values = list_builder.values();
2116        // null
2117        values
2118            .field_builder::<Int32Builder>(0)
2119            .unwrap()
2120            .append_null();
2121        values
2122            .field_builder::<Int64Builder>(1)
2123            .unwrap()
2124            .append_value(0);
2125        values.append(false);
2126        // null
2127        values
2128            .field_builder::<Int32Builder>(0)
2129            .unwrap()
2130            .append_null();
2131        values
2132            .field_builder::<Int64Builder>(1)
2133            .unwrap()
2134            .append_value(0);
2135        values.append(false);
2136        list_builder.append(false);
2137
2138        // [null, null]
2139        let values = list_builder.values();
2140        // null
2141        values
2142            .field_builder::<Int32Builder>(0)
2143            .unwrap()
2144            .append_null();
2145        values
2146            .field_builder::<Int64Builder>(1)
2147            .unwrap()
2148            .append_value(0);
2149        values.append(false);
2150        // null
2151        values
2152            .field_builder::<Int32Builder>(0)
2153            .unwrap()
2154            .append_null();
2155        values
2156            .field_builder::<Int64Builder>(1)
2157            .unwrap()
2158            .append_value(0);
2159        values.append(false);
2160        list_builder.append(true);
2161
2162        // [{a: null, b: 3}, {a: 2, b: 4}]
2163        let values = list_builder.values();
2164        // {a: null, b: 3}
2165        values
2166            .field_builder::<Int32Builder>(0)
2167            .unwrap()
2168            .append_null();
2169        values
2170            .field_builder::<Int64Builder>(1)
2171            .unwrap()
2172            .append_value(3);
2173        values.append(true);
2174        // {a: 2, b: 4}
2175        values
2176            .field_builder::<Int32Builder>(0)
2177            .unwrap()
2178            .append_value(2);
2179        values
2180            .field_builder::<Int64Builder>(1)
2181            .unwrap()
2182            .append_value(4);
2183        values.append(true);
2184        list_builder.append(true);
2185
2186        let array = Arc::new(list_builder.finish());
2187
2188        assert_eq!(array.values().len(), 8);
2189        assert_eq!(array.len(), 4);
2190
2191        let struct_values = array.values().as_struct();
2192        let values_a = struct_values.column(0).clone();
2193        let values_b = struct_values.column(1).clone();
2194
2195        let schema = Arc::new(Schema::new(vec![list_field]));
2196        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2197
2198        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2199        let a_levels = &levels[0];
2200        let b_levels = &levels[1];
2201
2202        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
2203        let values_a_logical_nulls = values_a.logical_nulls();
2204        let expected_a = ArrayLevels {
2205            def_levels: LevelData::Materialized(vec![4, 2, 0, 2, 2, 3, 4]),
2206            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2207            non_null_indices: vec![0, 7],
2208            max_def_level: 4,
2209            max_rep_level: 1,
2210            array: values_a,
2211            logical_nulls: values_a_logical_nulls,
2212        };
2213        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
2214        let values_b_logical_nulls = values_b.logical_nulls();
2215        let expected_b = ArrayLevels {
2216            def_levels: LevelData::Materialized(vec![3, 2, 0, 2, 2, 3, 3]),
2217            rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2218            non_null_indices: vec![0, 6, 7],
2219            max_def_level: 3,
2220            max_rep_level: 1,
2221            array: values_b,
2222            logical_nulls: values_b_logical_nulls,
2223        };
2224
2225        assert_eq!(a_levels, &expected_a);
2226        assert_eq!(b_levels, &expected_b);
2227    }
2228
2229    #[test]
2230    fn test_fixed_size_list_empty() {
2231        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2232        builder.append(true);
2233        builder.append(false);
2234        builder.append(true);
2235        let array = builder.finish();
2236        let values = array.values().clone();
2237
2238        let item_field = Field::new_list_field(array.data_type().clone(), true);
2239        let mut builder = levels(&item_field, array);
2240        builder.write(0..3);
2241        let levels = builder.finish();
2242
2243        assert_eq!(levels.len(), 1);
2244
2245        let list_level = &levels[0];
2246
2247        let logical_nulls = values.logical_nulls();
2248        let expected_level = ArrayLevels {
2249            def_levels: LevelData::Materialized(vec![1, 0, 1]),
2250            rep_levels: LevelData::Materialized(vec![0, 0, 0]),
2251            non_null_indices: vec![],
2252            max_def_level: 3,
2253            max_rep_level: 1,
2254            array: values,
2255            logical_nulls,
2256        };
2257        assert_eq!(list_level, &expected_level);
2258    }
2259
2260    #[test]
2261    fn test_fixed_size_list_of_var_lists() {
2262        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
2263        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2264        builder.values().append_value([Some(1), None, Some(3)]);
2265        builder.values().append_null();
2266        builder.append(true);
2267        builder.values().append_value([Some(4)]);
2268        builder.values().append_value([]);
2269        builder.append(true);
2270        builder.values().append_value([Some(5), Some(6)]);
2271        builder.values().append_value([None, None]);
2272        builder.append(true);
2273        builder.values().append_null();
2274        builder.values().append_null();
2275        builder.append(false);
2276        let a = builder.finish();
2277        let values = a.values().as_list::<i32>().values().clone();
2278
2279        let item_field = Field::new_list_field(a.data_type().clone(), true);
2280        let mut builder = levels(&item_field, a);
2281        builder.write(0..4);
2282        let levels = builder.finish();
2283
2284        let logical_nulls = values.logical_nulls();
2285        let expected_level = ArrayLevels {
2286            def_levels: LevelData::Materialized(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2287            rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2288            non_null_indices: vec![0, 2, 3, 4, 5],
2289            max_def_level: 5,
2290            max_rep_level: 2,
2291            array: values,
2292            logical_nulls,
2293        };
2294
2295        assert_eq!(levels[0], expected_level);
2296    }
2297
2298    #[test]
2299    fn test_null_dictionary_values() {
2300        let values = Int32Array::new(
2301            vec![1, 2, 3, 4].into(),
2302            Some(NullBuffer::from(vec![true, false, true, true])),
2303        );
2304        let keys = Int32Array::new(
2305            vec![1, 54, 2, 0].into(),
2306            Some(NullBuffer::from(vec![true, false, true, true])),
2307        );
2308        // [NULL, NULL, 3, 0]
2309        let dict = DictionaryArray::new(keys, Arc::new(values));
2310
2311        let item_field = Field::new_list_field(dict.data_type().clone(), true);
2312
2313        let mut builder = levels(&item_field, dict.clone());
2314        builder.write(0..4);
2315        let levels = builder.finish();
2316
2317        let logical_nulls = dict.logical_nulls();
2318        let expected_level = ArrayLevels {
2319            def_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
2320            rep_levels: LevelData::Absent,
2321            non_null_indices: vec![2, 3],
2322            max_def_level: 1,
2323            max_rep_level: 0,
2324            array: Arc::new(dict),
2325            logical_nulls,
2326        };
2327        assert_eq!(levels[0], expected_level);
2328    }
2329
2330    #[test]
2331    fn mismatched_types() {
2332        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2333        let field = Field::new_list_field(DataType::Float64, false);
2334
2335        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2336            .unwrap_err()
2337            .to_string();
2338
2339        assert_eq!(
2340            err,
2341            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2342        );
2343    }
2344
2345    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2346        let v = Arc::new(array) as ArrayRef;
2347        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2348    }
2349
2350    #[test]
2351    fn test_slice_for_chunk_flat() {
2352        // Case 1: required field (max_def_level=0, no def/rep levels stored).
2353        // Array has 6 values; all are non-null so non_null_indices covers every position.
2354        // value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4].
2355        // Array is sliced (no def_levels → write_batch_internal uses values.len()).
2356        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2357        let logical_nulls = array.logical_nulls();
2358        let levels = ArrayLevels {
2359            def_levels: LevelData::Absent,
2360            rep_levels: LevelData::Absent,
2361            non_null_indices: vec![0, 1, 2, 3, 4, 5],
2362            max_def_level: 0,
2363            max_rep_level: 0,
2364            array,
2365            logical_nulls,
2366        };
2367        let sliced = levels.slice_for_chunk(&CdcChunk {
2368            level_offset: 0,
2369            num_levels: 0,
2370            value_offset: 2,
2371            num_values: 3,
2372        });
2373        assert!(matches!(sliced.def_levels, LevelData::Absent));
2374        assert!(matches!(sliced.rep_levels, LevelData::Absent));
2375        assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2376        assert_eq!(sliced.array.len(), 3);
2377
2378        // Case 2: optional field (max_def_level=1, def levels present, no rep levels).
2379        // Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
2380        // non_null_indices: [0, 2, 4, 5]
2381        // value_offset=1, num_values=1 → non_null_indices[1..2] = [2].
2382        // Array is not sliced (def_levels present → num_levels from def_levels.len()).
2383        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2384            Some(1),
2385            None,
2386            Some(3),
2387            None,
2388            Some(5),
2389            Some(6),
2390        ]));
2391        let logical_nulls = array.logical_nulls();
2392        let levels = ArrayLevels {
2393            def_levels: LevelData::Materialized(vec![1, 0, 1, 0, 1, 1]),
2394            rep_levels: LevelData::Absent,
2395            non_null_indices: vec![0, 2, 4, 5],
2396            max_def_level: 1,
2397            max_rep_level: 0,
2398            array,
2399            logical_nulls,
2400        };
2401        let sliced = levels.slice_for_chunk(&CdcChunk {
2402            level_offset: 1,
2403            num_levels: 3,
2404            value_offset: 1,
2405            num_values: 1,
2406        });
2407        assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 1, 0]));
2408        assert!(matches!(sliced.rep_levels, LevelData::Absent));
2409        assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2 (nni[0])
2410        assert_eq!(sliced.array.len(), 1);
2411    }
2412
2413    #[test]
2414    fn test_slice_for_chunk_nested_with_nulls() {
2415        // Regression test for https://github.com/apache/arrow-rs/issues/9637
2416        //
2417        // Simulates a List<Int32?> where null list entries have non-zero child
2418        // ranges (valid per Arrow spec: "a null value may correspond to a
2419        // non-empty segment in the child array"). This creates gaps in the
2420        // leaf array that don't correspond to any levels.
2421        //
2422        // 5 rows with 2 null list entries owning non-empty child ranges:
2423        //   row 0: [1]       → leaf[0]
2424        //   row 1: null list → owns leaf[1..3] (gap of 2)
2425        //   row 2: [2, null] → leaf[3], leaf[4]=null element
2426        //   row 3: null list → owns leaf[5..8] (gap of 3)
2427        //   row 4: [4, 5]   → leaf[8], leaf[9]
2428        //
2429        // def_levels: [3,  0,  3, 2,  0,  3, 3]
2430        // rep_levels: [0,  0,  0, 1,  0,  0, 1]
2431        // non_null_indices: [0, 3, 8, 9]
2432        //   gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7)
2433        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2434            Some(1), // 0: row 0
2435            None,    // 1: gap (null list row 1)
2436            None,    // 2: gap (null list row 1)
2437            Some(2), // 3: row 2
2438            None,    // 4: row 2, null element
2439            None,    // 5: gap (null list row 3)
2440            None,    // 6: gap (null list row 3)
2441            None,    // 7: gap (null list row 3)
2442            Some(4), // 8: row 4
2443            Some(5), // 9: row 4
2444        ]));
2445        let logical_nulls = array.logical_nulls();
2446        let levels = ArrayLevels {
2447            def_levels: LevelData::Materialized(vec![3, 0, 3, 2, 0, 3, 3]),
2448            rep_levels: LevelData::Materialized(vec![0, 0, 0, 1, 0, 0, 1]),
2449            non_null_indices: vec![0, 3, 8, 9],
2450            max_def_level: 3,
2451            max_rep_level: 1,
2452            array,
2453            logical_nulls,
2454        };
2455
2456        // Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1]
2457        let chunk0 = levels.slice_for_chunk(&CdcChunk {
2458            level_offset: 0,
2459            num_levels: 2,
2460            value_offset: 0,
2461            num_values: 1,
2462        });
2463        assert_eq!(chunk0.non_null_indices, vec![0]);
2464        assert_eq!(chunk0.array.len(), 1);
2465
2466        // Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4]
2467        let chunk1 = levels.slice_for_chunk(&CdcChunk {
2468            level_offset: 2,
2469            num_levels: 3,
2470            value_offset: 1,
2471            num_values: 1,
2472        });
2473        assert_eq!(chunk1.non_null_indices, vec![0]);
2474        assert_eq!(chunk1.array.len(), 1);
2475
2476        // Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10]
2477        let chunk2 = levels.slice_for_chunk(&CdcChunk {
2478            level_offset: 5,
2479            num_levels: 2,
2480            value_offset: 2,
2481            num_values: 2,
2482        });
2483        assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2484        assert_eq!(chunk2.array.len(), 2);
2485    }
2486
2487    #[test]
2488    fn test_slice_for_chunk_all_null() {
2489        // All-null chunk: num_values=0 → empty nni slice → zero-length array.
2490        let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2491        let logical_nulls = array.logical_nulls();
2492        let levels = ArrayLevels {
2493            def_levels: LevelData::Materialized(vec![1, 0, 0, 1]),
2494            rep_levels: LevelData::Absent,
2495            non_null_indices: vec![0, 3],
2496            max_def_level: 1,
2497            max_rep_level: 0,
2498            array,
2499            logical_nulls,
2500        };
2501        // Chunk covering only the two null rows (levels 1..3), zero non-null values.
2502        let sliced = levels.slice_for_chunk(&CdcChunk {
2503            level_offset: 1,
2504            num_levels: 2,
2505            value_offset: 1,
2506            num_values: 0,
2507        });
2508        assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 0]));
2509        assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2510        assert_eq!(sliced.array.len(), 0);
2511    }
2512
2513    #[test]
2514    fn test_all_null_list() {
2515        // List<Int32> where every list slot is null.
2516        // Schema: list (nullable) -> item (int32, nullable)
2517        // Data: [null, null, null, null]
2518        //
2519        // Expected: max_def=3, max_rep=1, def/rep levels all 0.
2520        let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2521        let list = ListArray::new_null(item_field, 4);
2522        let values = list.values().clone();
2523        let field = Field::new("list", list.data_type().clone(), true);
2524        let array = Arc::new(list) as ArrayRef;
2525
2526        let levels = calculate_array_levels(&array, &field).unwrap();
2527        assert_eq!(levels.len(), 1);
2528
2529        let logical_nulls = values.logical_nulls();
2530        let expected = ArrayLevels {
2531            def_levels: LevelData::Uniform { value: 0, count: 4 },
2532            rep_levels: LevelData::Uniform { value: 0, count: 4 },
2533            non_null_indices: vec![],
2534            max_def_level: 3,
2535            max_rep_level: 1,
2536            array: values,
2537            logical_nulls,
2538        };
2539        assert_eq!(&levels[0], &expected);
2540    }
2541
2542    #[test]
2543    fn test_all_null_fixed_size_list() {
2544        // FixedSizeList<Int32; 2> where every list slot is null.
2545        // Schema: list (nullable) -> item (int32, nullable)
2546        // Data: [null, null, null]
2547        //
2548        // Expected: max_def=3, max_rep=1, def/rep levels all 0.
2549        let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2550        let list = FixedSizeListArray::new_null(item_field, 2, 3);
2551        let values = list.values().clone();
2552        let field = Field::new("list", list.data_type().clone(), true);
2553        let array = Arc::new(list) as ArrayRef;
2554
2555        let levels = calculate_array_levels(&array, &field).unwrap();
2556        assert_eq!(levels.len(), 1);
2557
2558        let logical_nulls = values.logical_nulls();
2559        let expected = ArrayLevels {
2560            def_levels: LevelData::Uniform { value: 0, count: 3 },
2561            rep_levels: LevelData::Uniform { value: 0, count: 3 },
2562            non_null_indices: vec![],
2563            max_def_level: 3,
2564            max_rep_level: 1,
2565            array: values,
2566            logical_nulls,
2567        };
2568        assert_eq!(&levels[0], &expected);
2569    }
2570
2571    #[test]
2572    fn test_all_null_struct() {
2573        // Struct<Int32> where every struct slot is null.
2574        // Schema: a (struct, nullable) -> c (int32, nullable)
2575        // Data: [null, null, null, null]
2576        //
2577        // Expected: max_def=2, def_levels all 0 (struct is null → child never reached),
2578        // leaf values are empty.
2579        let c = Int32Array::from(vec![None::<i32>; 4]);
2580        let leaf = Arc::new(c) as ArrayRef;
2581        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2582        let a = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2583        let a_field = Field::new("a", a.data_type().clone(), true);
2584        let a_array = Arc::new(a) as ArrayRef;
2585
2586        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2587        assert_eq!(levels.len(), 1);
2588
2589        let expected = ArrayLevels {
2590            def_levels: LevelData::Uniform { value: 0, count: 4 },
2591            rep_levels: LevelData::Absent,
2592            non_null_indices: vec![],
2593            max_def_level: 2,
2594            max_rep_level: 0,
2595            array: leaf,
2596            logical_nulls: Some(NullBuffer::new_null(4)),
2597        };
2598        assert_eq!(&levels[0], &expected);
2599    }
2600
2601    #[test]
2602    fn test_all_null_nested_struct() {
2603        // Struct<Struct<Int32>> where the outer struct is entirely null.
2604        // Schema: a (struct, nullable) -> b (struct, nullable) -> c (int32, nullable)
2605        // Data: [null, null, null]
2606        //
2607        // Expected: max_def=3, def_levels all 0.
2608        let c = Int32Array::from(vec![None::<i32>; 3]);
2609        let leaf = Arc::new(c) as ArrayRef;
2610        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2611        let b = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2612        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
2613        let a = StructArray::from((
2614            vec![(b_field, Arc::new(b) as ArrayRef)],
2615            Buffer::from([0b00000000]),
2616        ));
2617        let a_field = Field::new("a", a.data_type().clone(), true);
2618        let a_array = Arc::new(a) as ArrayRef;
2619
2620        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2621        assert_eq!(levels.len(), 1);
2622
2623        let expected = ArrayLevels {
2624            def_levels: LevelData::Uniform { value: 0, count: 3 },
2625            rep_levels: LevelData::Absent,
2626            non_null_indices: vec![],
2627            max_def_level: 3,
2628            max_rep_level: 0,
2629            array: leaf,
2630            logical_nulls: Some(NullBuffer::new_null(3)),
2631        };
2632        assert_eq!(&levels[0], &expected);
2633    }
2634
2635    #[test]
2636    fn test_all_null_struct_multiple_children() {
2637        // Struct with two leaf children, entirely null.
2638        // Schema: a (struct, nullable) -> { c1 (int32, nullable), c2 (int32, nullable) }
2639        // Data: [null, null]
2640        //
2641        // Both leaf columns should get uniform def_levels=0.
2642        let c1 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2643        let c2 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2644        let c1_field = Arc::new(Field::new("c1", DataType::Int32, true));
2645        let c2_field = Arc::new(Field::new("c2", DataType::Int32, true));
2646        let a = StructArray::from((
2647            vec![(c1_field, c1.clone()), (c2_field, c2.clone())],
2648            Buffer::from([0b00000000]),
2649        ));
2650        let a_field = Field::new("a", a.data_type().clone(), true);
2651        let a_array = Arc::new(a) as ArrayRef;
2652
2653        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2654        assert_eq!(levels.len(), 2);
2655
2656        for (i, leaf) in [c1, c2].into_iter().enumerate() {
2657            let expected = ArrayLevels {
2658                def_levels: LevelData::Uniform { value: 0, count: 2 },
2659                rep_levels: LevelData::Absent,
2660                non_null_indices: vec![],
2661                max_def_level: 2,
2662                max_rep_level: 0,
2663                array: leaf,
2664                logical_nulls: Some(NullBuffer::new_null(2)),
2665            };
2666            assert_eq!(&levels[i], &expected, "leaf {i} mismatch");
2667        }
2668    }
2669}