Skip to main content

parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::column::chunker::CdcChunk;
44use crate::errors::{ParquetError, Result};
45use arrow_array::cast::AsArray;
46use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
47use arrow_buffer::bit_iterator::BitIndexIterator;
48use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
49use arrow_schema::{DataType, Field};
50use std::ops::Range;
51use std::sync::Arc;
52
53/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
54/// for each leaf column encountered
55pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
56    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
57    builder.write(0..array.len());
58    Ok(builder.finish())
59}
60
61/// Returns true if the DataType can be represented as a primitive parquet column,
62/// i.e. a leaf array with no children
63fn is_leaf(data_type: &DataType) -> bool {
64    matches!(
65        data_type,
66        DataType::Null
67            | DataType::Boolean
68            | DataType::Int8
69            | DataType::Int16
70            | DataType::Int32
71            | DataType::Int64
72            | DataType::UInt8
73            | DataType::UInt16
74            | DataType::UInt32
75            | DataType::UInt64
76            | DataType::Float16
77            | DataType::Float32
78            | DataType::Float64
79            | DataType::Utf8
80            | DataType::Utf8View
81            | DataType::LargeUtf8
82            | DataType::Timestamp(_, _)
83            | DataType::Date32
84            | DataType::Date64
85            | DataType::Time32(_)
86            | DataType::Time64(_)
87            | DataType::Duration(_)
88            | DataType::Interval(_)
89            | DataType::Binary
90            | DataType::LargeBinary
91            | DataType::BinaryView
92            | DataType::Decimal32(_, _)
93            | DataType::Decimal64(_, _)
94            | DataType::Decimal128(_, _)
95            | DataType::Decimal256(_, _)
96            | DataType::FixedSizeBinary(_)
97    )
98}
99
100/// The definition and repetition level of an array within a potentially nested hierarchy
101#[derive(Debug, Default, Clone, Copy)]
102struct LevelContext {
103    /// The current repetition level
104    rep_level: i16,
105    /// The current definition level
106    def_level: i16,
107}
108
109/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
110#[derive(Debug)]
111enum LevelInfoBuilder {
112    /// A primitive, leaf array
113    Primitive(ArrayLevels),
114    /// A list array
115    List(
116        Box<LevelInfoBuilder>, // Child Values
117        LevelContext,          // Context
118        OffsetBuffer<i32>,     // Offsets
119        Option<NullBuffer>,    // Nulls
120    ),
121    /// A large list array
122    LargeList(
123        Box<LevelInfoBuilder>, // Child Values
124        LevelContext,          // Context
125        OffsetBuffer<i64>,     // Offsets
126        Option<NullBuffer>,    // Nulls
127    ),
128    /// A fixed size list array
129    FixedSizeList(
130        Box<LevelInfoBuilder>, // Values
131        LevelContext,          // Context
132        usize,                 // List Size
133        Option<NullBuffer>,    // Nulls
134    ),
135    /// A list view array
136    ListView(
137        Box<LevelInfoBuilder>, // Child Values
138        LevelContext,          // Context
139        ScalarBuffer<i32>,     // Offsets
140        ScalarBuffer<i32>,     // Sizes
141        Option<NullBuffer>,    // Nulls
142    ),
143    /// A large list view array
144    LargeListView(
145        Box<LevelInfoBuilder>, // Child Values
146        LevelContext,          // Context
147        ScalarBuffer<i64>,     // Offsets
148        ScalarBuffer<i64>,     // Sizes
149        Option<NullBuffer>,    // Nulls
150    ),
151    /// A struct array
152    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
153}
154
155impl LevelInfoBuilder {
156    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
157    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
158        if !Self::types_compatible(field.data_type(), array.data_type()) {
159            return Err(arrow_err!(format!(
160                "Incompatible type. Field '{}' has type {}, array has type {}",
161                field.name(),
162                field.data_type(),
163                array.data_type(),
164            )));
165        }
166
167        let is_nullable = field.is_nullable();
168
169        match array.data_type() {
170            d if is_leaf(d) => {
171                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
172                Ok(Self::Primitive(levels))
173            }
174            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
175                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
176                Ok(Self::Primitive(levels))
177            }
178            DataType::Struct(children) => {
179                let array = array.as_struct();
180                let def_level = match is_nullable {
181                    true => parent_ctx.def_level + 1,
182                    false => parent_ctx.def_level,
183                };
184
185                let ctx = LevelContext {
186                    rep_level: parent_ctx.rep_level,
187                    def_level,
188                };
189
190                let children = children
191                    .iter()
192                    .zip(array.columns())
193                    .map(|(f, a)| Self::try_new(f, ctx, a))
194                    .collect::<Result<_>>()?;
195
196                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
197            }
198            DataType::List(child)
199            | DataType::LargeList(child)
200            | DataType::Map(child, _)
201            | DataType::FixedSizeList(child, _)
202            | DataType::ListView(child)
203            | DataType::LargeListView(child) => {
204                let def_level = match is_nullable {
205                    true => parent_ctx.def_level + 2,
206                    false => parent_ctx.def_level + 1,
207                };
208
209                let ctx = LevelContext {
210                    rep_level: parent_ctx.rep_level + 1,
211                    def_level,
212                };
213
214                Ok(match field.data_type() {
215                    DataType::List(_) => {
216                        let list = array.as_list();
217                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
218                        let offsets = list.offsets().clone();
219                        Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
220                    }
221                    DataType::LargeList(_) => {
222                        let list = array.as_list();
223                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
224                        let offsets = list.offsets().clone();
225                        let nulls = list.nulls().cloned();
226                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
227                    }
228                    DataType::Map(_, _) => {
229                        let map = array.as_map();
230                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
231                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
232                        let offsets = map.offsets().clone();
233                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
234                    }
235                    DataType::FixedSizeList(_, size) => {
236                        let list = array.as_fixed_size_list();
237                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
238                        let nulls = list.nulls().cloned();
239                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
240                    }
241                    DataType::ListView(_) => {
242                        let list = array.as_list_view();
243                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
244                        let offsets = list.offsets().clone();
245                        let sizes = list.sizes().clone();
246                        let nulls = list.nulls().cloned();
247                        Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
248                    }
249                    DataType::LargeListView(_) => {
250                        let list = array.as_list_view();
251                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
252                        let offsets = list.offsets().clone();
253                        let sizes = list.sizes().clone();
254                        let nulls = list.nulls().cloned();
255                        Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
256                    }
257                    _ => unreachable!(),
258                })
259            }
260            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
261        }
262    }
263
264    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
265    /// as enumerated by a depth-first search
266    fn finish(self) -> Vec<ArrayLevels> {
267        match self {
268            LevelInfoBuilder::Primitive(v) => vec![v],
269            LevelInfoBuilder::List(v, _, _, _)
270            | LevelInfoBuilder::LargeList(v, _, _, _)
271            | LevelInfoBuilder::FixedSizeList(v, _, _, _)
272            | LevelInfoBuilder::ListView(v, _, _, _, _)
273            | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
274            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
275        }
276    }
277
278    /// Given an `array`, write the level data for the elements in `range`
279    fn write(&mut self, range: Range<usize>) {
280        match self {
281            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
282            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
283                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
284            }
285            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
286                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
287            }
288            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
289                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
290            }
291            LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
292                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
293            }
294            LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
295                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
296            }
297            LevelInfoBuilder::Struct(children, ctx, nulls) => {
298                Self::write_struct(children, ctx, nulls.as_ref(), range)
299            }
300        }
301    }
302
303    /// Write `range` elements from ListArray `array`
304    ///
305    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
306    fn write_list<O: OffsetSizeTrait>(
307        child: &mut LevelInfoBuilder,
308        ctx: &LevelContext,
309        offsets: &[O],
310        nulls: Option<&NullBuffer>,
311        range: Range<usize>,
312    ) {
313        let offsets = &offsets[range.start..range.end + 1];
314
315        let write_non_null_slice =
316            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
317                child.write(start_idx..end_idx);
318                child.visit_leaves(|leaf| {
319                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
320                    let mut rev = rep_levels.iter_mut().rev();
321                    let mut remaining = end_idx - start_idx;
322
323                    loop {
324                        let next = rev.next().unwrap();
325                        if *next > ctx.rep_level {
326                            // Nested element - ignore
327                            continue;
328                        }
329
330                        remaining -= 1;
331                        if remaining == 0 {
332                            *next = ctx.rep_level - 1;
333                            break;
334                        }
335                    }
336                })
337            };
338
339        let write_empty_slice = |child: &mut LevelInfoBuilder| {
340            child.visit_leaves(|leaf| {
341                let rep_levels = leaf.rep_levels.as_mut().unwrap();
342                rep_levels.push(ctx.rep_level - 1);
343                let def_levels = leaf.def_levels.as_mut().unwrap();
344                def_levels.push(ctx.def_level - 1);
345            })
346        };
347
348        let write_null_slice = |child: &mut LevelInfoBuilder| {
349            child.visit_leaves(|leaf| {
350                let rep_levels = leaf.rep_levels.as_mut().unwrap();
351                rep_levels.push(ctx.rep_level - 1);
352                let def_levels = leaf.def_levels.as_mut().unwrap();
353                def_levels.push(ctx.def_level - 2);
354            })
355        };
356
357        match nulls {
358            Some(nulls) => {
359                let null_offset = range.start;
360                // TODO: Faster bitmask iteration (#1757)
361                for (idx, w) in offsets.windows(2).enumerate() {
362                    let is_valid = nulls.is_valid(idx + null_offset);
363                    let start_idx = w[0].as_usize();
364                    let end_idx = w[1].as_usize();
365                    if !is_valid {
366                        write_null_slice(child)
367                    } else if start_idx == end_idx {
368                        write_empty_slice(child)
369                    } else {
370                        write_non_null_slice(child, start_idx, end_idx)
371                    }
372                }
373            }
374            None => {
375                for w in offsets.windows(2) {
376                    let start_idx = w[0].as_usize();
377                    let end_idx = w[1].as_usize();
378                    if start_idx == end_idx {
379                        write_empty_slice(child)
380                    } else {
381                        write_non_null_slice(child, start_idx, end_idx)
382                    }
383                }
384            }
385        }
386    }
387
388    /// Write `range` elements from ListViewArray `array`
389    fn write_list_view<O: OffsetSizeTrait>(
390        child: &mut LevelInfoBuilder,
391        ctx: &LevelContext,
392        offsets: &[O],
393        sizes: &[O],
394        nulls: Option<&NullBuffer>,
395        range: Range<usize>,
396    ) {
397        let offsets = &offsets[range.start..range.end];
398        let sizes = &sizes[range.start..range.end];
399
400        let write_non_null_slice =
401            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
402                child.write(start_idx..end_idx);
403                child.visit_leaves(|leaf| {
404                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
405                    let mut rev = rep_levels.iter_mut().rev();
406                    let mut remaining = end_idx - start_idx;
407
408                    loop {
409                        let next = rev.next().unwrap();
410                        if *next > ctx.rep_level {
411                            // Nested element - ignore
412                            continue;
413                        }
414
415                        remaining -= 1;
416                        if remaining == 0 {
417                            *next = ctx.rep_level - 1;
418                            break;
419                        }
420                    }
421                })
422            };
423
424        let write_empty_slice = |child: &mut LevelInfoBuilder| {
425            child.visit_leaves(|leaf| {
426                let rep_levels = leaf.rep_levels.as_mut().unwrap();
427                rep_levels.push(ctx.rep_level - 1);
428                let def_levels = leaf.def_levels.as_mut().unwrap();
429                def_levels.push(ctx.def_level - 1);
430            })
431        };
432
433        let write_null_slice = |child: &mut LevelInfoBuilder| {
434            child.visit_leaves(|leaf| {
435                let rep_levels = leaf.rep_levels.as_mut().unwrap();
436                rep_levels.push(ctx.rep_level - 1);
437                let def_levels = leaf.def_levels.as_mut().unwrap();
438                def_levels.push(ctx.def_level - 2);
439            })
440        };
441
442        match nulls {
443            Some(nulls) => {
444                let null_offset = range.start;
445                // TODO: Faster bitmask iteration (#1757)
446                for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
447                    let is_valid = nulls.is_valid(idx + null_offset);
448                    let start_idx = offset.as_usize();
449                    let size = size.as_usize();
450                    let end_idx = start_idx + size;
451                    if !is_valid {
452                        write_null_slice(child)
453                    } else if size == 0 {
454                        write_empty_slice(child)
455                    } else {
456                        write_non_null_slice(child, start_idx, end_idx)
457                    }
458                }
459            }
460            None => {
461                for (offset, size) in offsets.iter().zip(sizes.iter()) {
462                    let start_idx = offset.as_usize();
463                    let size = size.as_usize();
464                    let end_idx = start_idx + size;
465                    if size == 0 {
466                        write_empty_slice(child)
467                    } else {
468                        write_non_null_slice(child, start_idx, end_idx)
469                    }
470                }
471            }
472        }
473    }
474
475    /// Write `range` elements from StructArray `array`
476    fn write_struct(
477        children: &mut [LevelInfoBuilder],
478        ctx: &LevelContext,
479        nulls: Option<&NullBuffer>,
480        range: Range<usize>,
481    ) {
482        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
483            for child in children {
484                child.visit_leaves(|info| {
485                    let len = range.end - range.start;
486
487                    let def_levels = info.def_levels.as_mut().unwrap();
488                    def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
489
490                    if let Some(rep_levels) = info.rep_levels.as_mut() {
491                        rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
492                    }
493                })
494            }
495        };
496
497        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
498            for child in children {
499                child.write(range.clone())
500            }
501        };
502
503        match nulls {
504            Some(validity) => {
505                let mut last_non_null_idx = None;
506                let mut last_null_idx = None;
507
508                // TODO: Faster bitmask iteration (#1757)
509                for i in range.clone() {
510                    match validity.is_valid(i) {
511                        true => {
512                            if let Some(last_idx) = last_null_idx.take() {
513                                write_null(children, last_idx..i)
514                            }
515                            last_non_null_idx.get_or_insert(i);
516                        }
517                        false => {
518                            if let Some(last_idx) = last_non_null_idx.take() {
519                                write_non_null(children, last_idx..i)
520                            }
521                            last_null_idx.get_or_insert(i);
522                        }
523                    }
524                }
525
526                if let Some(last_idx) = last_null_idx.take() {
527                    write_null(children, last_idx..range.end)
528                }
529
530                if let Some(last_idx) = last_non_null_idx.take() {
531                    write_non_null(children, last_idx..range.end)
532                }
533            }
534            None => write_non_null(children, range),
535        }
536    }
537
538    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
539    fn write_fixed_size_list(
540        child: &mut LevelInfoBuilder,
541        ctx: &LevelContext,
542        fixed_size: usize,
543        nulls: Option<&NullBuffer>,
544        range: Range<usize>,
545    ) {
546        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
547            let values_start = start_idx * fixed_size;
548            let values_end = end_idx * fixed_size;
549            child.write(values_start..values_end);
550
551            child.visit_leaves(|leaf| {
552                let rep_levels = leaf.rep_levels.as_mut().unwrap();
553
554                let row_indices = (0..fixed_size)
555                    .rev()
556                    .cycle()
557                    .take(values_end - values_start);
558
559                // Step backward over the child rep levels and mark the start of each list
560                rep_levels
561                    .iter_mut()
562                    .rev()
563                    // Filter out reps from nested children
564                    .filter(|&&mut r| r == ctx.rep_level)
565                    .zip(row_indices)
566                    .for_each(|(r, idx)| {
567                        if idx == 0 {
568                            *r = ctx.rep_level - 1;
569                        }
570                    });
571            })
572        };
573
574        // If list size is 0, ignore values and just write rep/def levels.
575        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
576            let len = end_idx - start_idx;
577            child.visit_leaves(|leaf| {
578                let rep_levels = leaf.rep_levels.as_mut().unwrap();
579                rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
580                let def_levels = leaf.def_levels.as_mut().unwrap();
581                def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
582            })
583        };
584
585        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
586            if fixed_size > 0 {
587                write_non_null(child, start_idx, end_idx)
588            } else {
589                write_empty(child, start_idx, end_idx)
590            }
591        };
592
593        match nulls {
594            Some(nulls) => {
595                let mut start_idx = None;
596                for idx in range.clone() {
597                    if nulls.is_valid(idx) {
598                        // Start a run of valid rows if not already inside of one
599                        start_idx.get_or_insert(idx);
600                    } else {
601                        // Write out any pending valid rows
602                        if let Some(start) = start_idx.take() {
603                            write_rows(child, start, idx);
604                        }
605                        // Add null row
606                        child.visit_leaves(|leaf| {
607                            let rep_levels = leaf.rep_levels.as_mut().unwrap();
608                            rep_levels.push(ctx.rep_level - 1);
609                            let def_levels = leaf.def_levels.as_mut().unwrap();
610                            def_levels.push(ctx.def_level - 2);
611                        })
612                    }
613                }
614                // Write out any remaining valid rows
615                if let Some(start) = start_idx.take() {
616                    write_rows(child, start, range.end);
617                }
618            }
619            // If all rows are valid then write the whole array
620            None => write_rows(child, range.start, range.end),
621        }
622    }
623
624    /// Write a primitive array, as defined by [`is_leaf`]
625    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
626        let len = range.end - range.start;
627
628        match &mut info.def_levels {
629            Some(def_levels) => {
630                def_levels.reserve(len);
631                info.non_null_indices.reserve(len);
632
633                match &info.logical_nulls {
634                    Some(nulls) => {
635                        assert!(range.end <= nulls.len());
636                        let nulls = nulls.inner();
637                        def_levels.extend(range.clone().map(|i| {
638                            // Safety: range.end was asserted to be in bounds earlier
639                            let valid = unsafe { nulls.value_unchecked(i) };
640                            info.max_def_level - (!valid as i16)
641                        }));
642                        info.non_null_indices.extend(
643                            BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
644                                .map(|i| i + range.start),
645                        );
646                    }
647                    None => {
648                        let iter = std::iter::repeat_n(info.max_def_level, len);
649                        def_levels.extend(iter);
650                        info.non_null_indices.extend(range);
651                    }
652                }
653            }
654            None => info.non_null_indices.extend(range),
655        }
656
657        if let Some(rep_levels) = &mut info.rep_levels {
658            rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
659        }
660    }
661
662    /// Visits all children of this node in depth first order
663    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
664        match self {
665            LevelInfoBuilder::Primitive(info) => visit(info),
666            LevelInfoBuilder::List(c, _, _, _)
667            | LevelInfoBuilder::LargeList(c, _, _, _)
668            | LevelInfoBuilder::FixedSizeList(c, _, _, _)
669            | LevelInfoBuilder::ListView(c, _, _, _, _)
670            | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
671            LevelInfoBuilder::Struct(children, _, _) => {
672                for c in children {
673                    c.visit_leaves(visit)
674                }
675            }
676        }
677    }
678
679    /// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
680    ///
681    /// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
682    /// and the other is a native array, the dictionary values must have the same type as the
683    /// native array
684    fn types_compatible(a: &DataType, b: &DataType) -> bool {
685        // if the Arrow data types are equal, the types are deemed compatible
686        if a.equals_datatype(b) {
687            return true;
688        }
689
690        // get the values out of the dictionaries
691        let (a, b) = match (a, b) {
692            (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
693                (va.as_ref(), vb.as_ref())
694            }
695            (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
696            (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
697            _ => (a, b),
698        };
699
700        // now that we've got the values from one/both dictionaries, if the values
701        // have the same Arrow data type, they're compatible
702        if a == b {
703            return true;
704        }
705
706        // here we have different Arrow data types, but if the array contains the same type of data
707        // then we consider the type compatible
708        match a {
709            // String, StringView and LargeString are compatible
710            DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
711            DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
712            DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
713
714            // Binary, BinaryView and LargeBinary are compatible
715            DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
716            DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
717            DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
718
719            // otherwise we have incompatible types
720            _ => false,
721        }
722    }
723}
724
725/// The data necessary to write a primitive Arrow array to parquet, taking into account
726/// any non-primitive parents it may have in the arrow representation
727#[derive(Debug, Clone)]
728pub(crate) struct ArrayLevels {
729    /// Array's definition levels
730    ///
731    /// Present if `max_def_level != 0`
732    def_levels: Option<Vec<i16>>,
733
734    /// Array's optional repetition levels
735    ///
736    /// Present if `max_rep_level != 0`
737    rep_levels: Option<Vec<i16>>,
738
739    /// The corresponding array identifying non-null slices of data
740    /// from the primitive array
741    non_null_indices: Vec<usize>,
742
743    /// The maximum definition level for this leaf column
744    max_def_level: i16,
745
746    /// The maximum repetition for this leaf column
747    max_rep_level: i16,
748
749    /// The arrow array
750    array: ArrayRef,
751
752    /// cached logical nulls of the array.
753    logical_nulls: Option<NullBuffer>,
754}
755
756impl PartialEq for ArrayLevels {
757    fn eq(&self, other: &Self) -> bool {
758        self.def_levels == other.def_levels
759            && self.rep_levels == other.rep_levels
760            && self.non_null_indices == other.non_null_indices
761            && self.max_def_level == other.max_def_level
762            && self.max_rep_level == other.max_rep_level
763            && self.array.as_ref() == other.array.as_ref()
764            && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
765    }
766}
767impl Eq for ArrayLevels {}
768
769impl ArrayLevels {
770    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
771        let max_rep_level = ctx.rep_level;
772        let max_def_level = match is_nullable {
773            true => ctx.def_level + 1,
774            false => ctx.def_level,
775        };
776
777        let logical_nulls = array.logical_nulls();
778
779        Self {
780            def_levels: (max_def_level != 0).then(Vec::new),
781            rep_levels: (max_rep_level != 0).then(Vec::new),
782            non_null_indices: vec![],
783            max_def_level,
784            max_rep_level,
785            array,
786            logical_nulls,
787        }
788    }
789
790    pub fn array(&self) -> &ArrayRef {
791        &self.array
792    }
793
794    pub fn def_levels(&self) -> Option<&[i16]> {
795        self.def_levels.as_deref()
796    }
797
798    pub fn rep_levels(&self) -> Option<&[i16]> {
799        self.rep_levels.as_deref()
800    }
801
802    pub fn non_null_indices(&self) -> &[usize] {
803        &self.non_null_indices
804    }
805
806    /// Create a sliced view of this `ArrayLevels` for a CDC chunk.
807    ///
808    /// The chunk's `value_offset`/`num_values` select the relevant slice of
809    /// `non_null_indices`. The array is sliced to the range covered by
810    /// those indices, and they are shifted to be relative to the slice.
811    pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
812        let def_levels = self.def_levels.as_ref().map(|levels| {
813            levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
814        });
815        let rep_levels = self.rep_levels.as_ref().map(|levels| {
816            levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
817        });
818
819        // Select the non-null indices for this chunk.
820        let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
821        // Compute the array range spanned by the non-null indices.
822        // When nni is empty (all-null chunk), start=0, end=0 → zero-length
823        // array slice; write_batch_internal will process only the def/rep
824        // levels and write no values.
825        let start = nni.first().copied().unwrap_or(0);
826        let end = nni.last().map_or(0, |&i| i + 1);
827        // Shift indices to be relative to the sliced array.
828        let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
829        // Slice the array to the computed range.
830        let array = self.array.slice(start, end - start);
831        let logical_nulls = array.logical_nulls();
832
833        Self {
834            def_levels,
835            rep_levels,
836            non_null_indices,
837            max_def_level: self.max_def_level,
838            max_rep_level: self.max_rep_level,
839            array,
840            logical_nulls,
841        }
842    }
843}
844
845#[cfg(test)]
846mod tests {
847    use super::*;
848    use crate::column::chunker::CdcChunk;
849
850    use arrow_array::builder::*;
851    use arrow_array::types::Int32Type;
852    use arrow_array::*;
853    use arrow_buffer::{Buffer, ToByteSlice};
854    use arrow_cast::display::array_value_to_string;
855    use arrow_data::{ArrayData, ArrayDataBuilder};
856    use arrow_schema::{Fields, Schema};
857
858    #[test]
859    fn test_calculate_array_levels_twitter_example() {
860        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
861        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
862
863        let leaf_type = Field::new_list_field(DataType::Int32, false);
864        let inner_type = DataType::List(Arc::new(leaf_type));
865        let inner_field = Field::new("l2", inner_type.clone(), false);
866        let outer_type = DataType::List(Arc::new(inner_field));
867        let outer_field = Field::new("l1", outer_type.clone(), false);
868
869        let primitives = Int32Array::from_iter(0..10);
870
871        // Cannot use from_iter_primitive as always infers nullable
872        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
873        let inner_list = ArrayDataBuilder::new(inner_type)
874            .len(4)
875            .add_buffer(offsets)
876            .add_child_data(primitives.to_data())
877            .build()
878            .unwrap();
879
880        let offsets = Buffer::from_iter([0_i32, 2, 4]);
881        let outer_list = ArrayDataBuilder::new(outer_type)
882            .len(2)
883            .add_buffer(offsets)
884            .add_child_data(inner_list)
885            .build()
886            .unwrap();
887        let outer_list = make_array(outer_list);
888
889        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
890        assert_eq!(levels.len(), 1);
891
892        let expected = ArrayLevels {
893            def_levels: Some(vec![2; 10]),
894            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
895            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
896            max_def_level: 2,
897            max_rep_level: 2,
898            array: Arc::new(primitives),
899            logical_nulls: None,
900        };
901        assert_eq!(&levels[0], &expected);
902    }
903
904    #[test]
905    fn test_calculate_one_level_1() {
906        // This test calculates the levels for a non-null primitive array
907        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
908        let field = Field::new_list_field(DataType::Int32, false);
909
910        let levels = calculate_array_levels(&array, &field).unwrap();
911        assert_eq!(levels.len(), 1);
912
913        let expected_levels = ArrayLevels {
914            def_levels: None,
915            rep_levels: None,
916            non_null_indices: (0..10).collect(),
917            max_def_level: 0,
918            max_rep_level: 0,
919            array,
920            logical_nulls: None,
921        };
922        assert_eq!(&levels[0], &expected_levels);
923    }
924
925    #[test]
926    fn test_calculate_one_level_2() {
927        // This test calculates the levels for a nullable primitive array
928        let array = Arc::new(Int32Array::from_iter([
929            Some(0),
930            None,
931            Some(0),
932            Some(0),
933            None,
934        ])) as ArrayRef;
935        let field = Field::new_list_field(DataType::Int32, true);
936
937        let levels = calculate_array_levels(&array, &field).unwrap();
938        assert_eq!(levels.len(), 1);
939
940        let logical_nulls = array.logical_nulls();
941        let expected_levels = ArrayLevels {
942            def_levels: Some(vec![1, 0, 1, 1, 0]),
943            rep_levels: None,
944            non_null_indices: vec![0, 2, 3],
945            max_def_level: 1,
946            max_rep_level: 0,
947            array,
948            logical_nulls,
949        };
950        assert_eq!(&levels[0], &expected_levels);
951    }
952
953    #[test]
954    fn test_calculate_array_levels_1() {
955        let leaf_field = Field::new_list_field(DataType::Int32, false);
956        let list_type = DataType::List(Arc::new(leaf_field));
957
958        // if all array values are defined (e.g. batch<list<_>>)
959        // [[0], [1], [2], [3], [4]]
960
961        let leaf_array = Int32Array::from_iter(0..5);
962        // Cannot use from_iter_primitive as always infers nullable
963        let offsets = Buffer::from_iter(0_i32..6);
964        let list = ArrayDataBuilder::new(list_type.clone())
965            .len(5)
966            .add_buffer(offsets)
967            .add_child_data(leaf_array.to_data())
968            .build()
969            .unwrap();
970        let list = make_array(list);
971
972        let list_field = Field::new("list", list_type.clone(), false);
973        let levels = calculate_array_levels(&list, &list_field).unwrap();
974        assert_eq!(levels.len(), 1);
975
976        let expected_levels = ArrayLevels {
977            def_levels: Some(vec![1; 5]),
978            rep_levels: Some(vec![0; 5]),
979            non_null_indices: (0..5).collect(),
980            max_def_level: 1,
981            max_rep_level: 1,
982            array: Arc::new(leaf_array),
983            logical_nulls: None,
984        };
985        assert_eq!(&levels[0], &expected_levels);
986
987        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
988        // all values are defined as we do not have nulls on the root (batch)
989        // repetition:
990        //   0: 0, 1
991        //   1: 0
992        //   2: 0, 1
993        //   3: 0, 1, 1, 1
994        //   4: 0, 1, 1
995        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
996        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
997        let list = ArrayDataBuilder::new(list_type.clone())
998            .len(5)
999            .add_buffer(offsets)
1000            .add_child_data(leaf_array.to_data())
1001            .null_bit_buffer(Some(Buffer::from([0b00011101])))
1002            .build()
1003            .unwrap();
1004        let list = make_array(list);
1005
1006        let list_field = Field::new("list", list_type, true);
1007        let levels = calculate_array_levels(&list, &list_field).unwrap();
1008        assert_eq!(levels.len(), 1);
1009
1010        let expected_levels = ArrayLevels {
1011            def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1012            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1013            non_null_indices: (0..11).collect(),
1014            max_def_level: 2,
1015            max_rep_level: 1,
1016            array: Arc::new(leaf_array),
1017            logical_nulls: None,
1018        };
1019        assert_eq!(&levels[0], &expected_levels);
1020    }
1021
1022    #[test]
1023    fn test_calculate_array_levels_2() {
1024        // If some values are null
1025        //
1026        // This emulates an array in the form: <struct<list<?>>
1027        // with values:
1028        // - 0: [0, 1], but is null because of the struct
1029        // - 1: []
1030        // - 2: [2, 3], but is null because of the struct
1031        // - 3: [4, 5, 6, 7]
1032        // - 4: [8, 9, 10]
1033        //
1034        // If the first values of a list are null due to a parent, we have to still account for them
1035        // while indexing, because they would affect the way the child is indexed
1036        // i.e. in the above example, we have to know that [0, 1] has to be skipped
1037        let leaf = Int32Array::from_iter(0..11);
1038        let leaf_field = Field::new("leaf", DataType::Int32, false);
1039
1040        let list_type = DataType::List(Arc::new(leaf_field));
1041        let list = ArrayData::builder(list_type.clone())
1042            .len(5)
1043            .add_child_data(leaf.to_data())
1044            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1045            .build()
1046            .unwrap();
1047
1048        let list = make_array(list);
1049        let list_field = Arc::new(Field::new("list", list_type, true));
1050
1051        let struct_array =
1052            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1053        let array = Arc::new(struct_array) as ArrayRef;
1054
1055        let struct_field = Field::new("struct", array.data_type().clone(), true);
1056
1057        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1058        assert_eq!(levels.len(), 1);
1059
1060        let expected_levels = ArrayLevels {
1061            def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1062            rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1063            non_null_indices: (4..11).collect(),
1064            max_def_level: 3,
1065            max_rep_level: 1,
1066            array: Arc::new(leaf),
1067            logical_nulls: None,
1068        };
1069
1070        assert_eq!(&levels[0], &expected_levels);
1071
1072        // nested lists
1073
1074        // 0: [[100, 101], [102, 103]]
1075        // 1: []
1076        // 2: [[104, 105], [106, 107]]
1077        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
1078        // 4: [[116, 117], [118, 119], [120, 121]]
1079
1080        let leaf = Int32Array::from_iter(100..122);
1081        let leaf_field = Field::new("leaf", DataType::Int32, true);
1082
1083        let l1_type = DataType::List(Arc::new(leaf_field));
1084        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1085        let l1 = ArrayData::builder(l1_type.clone())
1086            .len(11)
1087            .add_child_data(leaf.to_data())
1088            .add_buffer(offsets)
1089            .build()
1090            .unwrap();
1091
1092        let l1_field = Field::new("l1", l1_type, true);
1093        let l2_type = DataType::List(Arc::new(l1_field));
1094        let l2 = ArrayData::builder(l2_type)
1095            .len(5)
1096            .add_child_data(l1)
1097            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1098            .build()
1099            .unwrap();
1100
1101        let l2 = make_array(l2);
1102        let l2_field = Field::new("l2", l2.data_type().clone(), true);
1103
1104        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1105        assert_eq!(levels.len(), 1);
1106
1107        let expected_levels = ArrayLevels {
1108            def_levels: Some(vec![
1109                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1110            ]),
1111            rep_levels: Some(vec![
1112                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1113            ]),
1114            non_null_indices: (0..22).collect(),
1115            max_def_level: 5,
1116            max_rep_level: 2,
1117            array: Arc::new(leaf),
1118            logical_nulls: None,
1119        };
1120
1121        assert_eq!(&levels[0], &expected_levels);
1122    }
1123
1124    #[test]
1125    fn test_calculate_array_levels_nested_list() {
1126        let leaf_field = Field::new("leaf", DataType::Int32, false);
1127        let list_type = DataType::List(Arc::new(leaf_field));
1128
1129        // if all array values are defined (e.g. batch<list<_>>)
1130        // The array at this level looks like:
1131        // 0: [a]
1132        // 1: [a]
1133        // 2: [a]
1134        // 3: [a]
1135
1136        let leaf = Int32Array::from_iter([0; 4]);
1137        let list = ArrayData::builder(list_type.clone())
1138            .len(4)
1139            .add_buffer(Buffer::from_iter(0_i32..5))
1140            .add_child_data(leaf.to_data())
1141            .build()
1142            .unwrap();
1143        let list = make_array(list);
1144
1145        let list_field = Field::new("list", list_type.clone(), false);
1146        let levels = calculate_array_levels(&list, &list_field).unwrap();
1147        assert_eq!(levels.len(), 1);
1148
1149        let expected_levels = ArrayLevels {
1150            def_levels: Some(vec![1; 4]),
1151            rep_levels: Some(vec![0; 4]),
1152            non_null_indices: (0..4).collect(),
1153            max_def_level: 1,
1154            max_rep_level: 1,
1155            array: Arc::new(leaf),
1156            logical_nulls: None,
1157        };
1158        assert_eq!(&levels[0], &expected_levels);
1159
1160        // 0: null
1161        // 1: [1, 2, 3]
1162        // 2: [4, 5]
1163        // 3: [6, 7]
1164        let leaf = Int32Array::from_iter(0..8);
1165        let list = ArrayData::builder(list_type.clone())
1166            .len(4)
1167            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1168            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1169            .add_child_data(leaf.to_data())
1170            .build()
1171            .unwrap();
1172        let list = make_array(list);
1173        let list_field = Arc::new(Field::new("list", list_type, true));
1174
1175        let struct_array = StructArray::from(vec![(list_field, list)]);
1176        let array = Arc::new(struct_array) as ArrayRef;
1177
1178        let struct_field = Field::new("struct", array.data_type().clone(), true);
1179        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1180        assert_eq!(levels.len(), 1);
1181
1182        let expected_levels = ArrayLevels {
1183            def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1184            rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1185            non_null_indices: (0..7).collect(),
1186            max_def_level: 3,
1187            max_rep_level: 1,
1188            array: Arc::new(leaf),
1189            logical_nulls: None,
1190        };
1191        assert_eq!(&levels[0], &expected_levels);
1192
1193        // nested lists
1194        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
1195        // 0: {"struct": null }
1196        // 1: {"struct": [ [201], [202, 203], [] ]}
1197        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
1198        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
1199
1200        let leaf = Int32Array::from_iter(201..216);
1201        let leaf_field = Field::new("leaf", DataType::Int32, false);
1202        let list_1_type = DataType::List(Arc::new(leaf_field));
1203        let list_1 = ArrayData::builder(list_1_type.clone())
1204            .len(7)
1205            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1206            .add_child_data(leaf.to_data())
1207            .build()
1208            .unwrap();
1209
1210        let list_1_field = Field::new("l1", list_1_type, true);
1211        let list_2_type = DataType::List(Arc::new(list_1_field));
1212        let list_2 = ArrayData::builder(list_2_type.clone())
1213            .len(4)
1214            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1215            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1216            .add_child_data(list_1)
1217            .build()
1218            .unwrap();
1219
1220        let list_2 = make_array(list_2);
1221        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1222
1223        let struct_array =
1224            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1225        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1226
1227        let array = Arc::new(struct_array) as ArrayRef;
1228        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1229        assert_eq!(levels.len(), 1);
1230
1231        let expected_levels = ArrayLevels {
1232            def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1233            rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1234            non_null_indices: (0..15).collect(),
1235            max_def_level: 5,
1236            max_rep_level: 2,
1237            array: Arc::new(leaf),
1238            logical_nulls: None,
1239        };
1240        assert_eq!(&levels[0], &expected_levels);
1241    }
1242
1243    #[test]
1244    fn test_calculate_nested_struct_levels() {
1245        // tests a <struct[a]<struct[b]<int[c]>>
1246        // array:
1247        //  - {a: {b: {c: 1}}}
1248        //  - {a: {b: {c: null}}}
1249        //  - {a: {b: {c: 3}}}
1250        //  - {a: {b: null}}
1251        //  - {a: null}}
1252        //  - {a: {b: {c: 6}}}
1253
1254        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1255        let leaf = Arc::new(c) as ArrayRef;
1256        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1257        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1258
1259        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1260        let a = StructArray::from((
1261            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1262            Buffer::from([0b00101111]),
1263        ));
1264
1265        let a_field = Field::new("a", a.data_type().clone(), true);
1266        let a_array = Arc::new(a) as ArrayRef;
1267
1268        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1269        assert_eq!(levels.len(), 1);
1270
1271        let logical_nulls = leaf.logical_nulls();
1272        let expected_levels = ArrayLevels {
1273            def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1274            rep_levels: None,
1275            non_null_indices: vec![0, 2, 5],
1276            max_def_level: 3,
1277            max_rep_level: 0,
1278            array: leaf,
1279            logical_nulls,
1280        };
1281        assert_eq!(&levels[0], &expected_levels);
1282    }
1283
1284    #[test]
1285    fn list_single_column() {
1286        // this tests the level generation from the arrow_writer equivalent test
1287
1288        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1289        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1290        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1291        let a_list_data = ArrayData::builder(a_list_type.clone())
1292            .len(5)
1293            .add_buffer(a_value_offsets)
1294            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1295            .add_child_data(a_values.to_data())
1296            .build()
1297            .unwrap();
1298
1299        assert_eq!(a_list_data.null_count(), 1);
1300
1301        let a = ListArray::from(a_list_data);
1302
1303        let item_field = Field::new_list_field(a_list_type, true);
1304        let mut builder = levels(&item_field, a);
1305        builder.write(2..4);
1306        let levels = builder.finish();
1307
1308        assert_eq!(levels.len(), 1);
1309
1310        let list_level = &levels[0];
1311
1312        let expected_level = ArrayLevels {
1313            def_levels: Some(vec![0, 3, 3, 3]),
1314            rep_levels: Some(vec![0, 0, 1, 1]),
1315            non_null_indices: vec![3, 4, 5],
1316            max_def_level: 3,
1317            max_rep_level: 1,
1318            array: Arc::new(a_values),
1319            logical_nulls: None,
1320        };
1321        assert_eq!(list_level, &expected_level);
1322    }
1323
1324    #[test]
1325    fn mixed_struct_list() {
1326        // this tests the level generation from the equivalent arrow_writer_complex test
1327
1328        // define schema
1329        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1330        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1331        let struct_field_g = Arc::new(Field::new(
1332            "g",
1333            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1334            false,
1335        ));
1336        let struct_field_e = Arc::new(Field::new(
1337            "e",
1338            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1339            true,
1340        ));
1341        let schema = Schema::new(vec![
1342            Field::new("a", DataType::Int32, false),
1343            Field::new("b", DataType::Int32, true),
1344            Field::new(
1345                "c",
1346                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1347                true, // https://github.com/apache/arrow-rs/issues/245
1348            ),
1349        ]);
1350
1351        // create some data
1352        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1353        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1354        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1355        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1356
1357        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1358
1359        // Construct a buffer for value offsets, for the nested array:
1360        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1361        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1362
1363        // Construct a list array from the above two
1364        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1365            .len(5)
1366            .add_buffer(g_value_offsets)
1367            .add_child_data(g_value.into_data())
1368            .build()
1369            .unwrap();
1370        let g = ListArray::from(g_list_data);
1371
1372        let e = StructArray::from(vec![
1373            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1374            (struct_field_g, Arc::new(g) as ArrayRef),
1375        ]);
1376
1377        let c = StructArray::from(vec![
1378            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1379            (struct_field_e, Arc::new(e) as ArrayRef),
1380        ]);
1381
1382        // build a record batch
1383        let batch = RecordBatch::try_new(
1384            Arc::new(schema),
1385            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1386        )
1387        .unwrap();
1388
1389        //////////////////////////////////////////////
1390        // calculate the list's level
1391        let mut levels = vec![];
1392        batch
1393            .columns()
1394            .iter()
1395            .zip(batch.schema().fields())
1396            .for_each(|(array, field)| {
1397                let mut array_levels = calculate_array_levels(array, field).unwrap();
1398                levels.append(&mut array_levels);
1399            });
1400        assert_eq!(levels.len(), 5);
1401
1402        // test "a" levels
1403        let list_level = &levels[0];
1404
1405        let expected_level = ArrayLevels {
1406            def_levels: None,
1407            rep_levels: None,
1408            non_null_indices: vec![0, 1, 2, 3, 4],
1409            max_def_level: 0,
1410            max_rep_level: 0,
1411            array: Arc::new(a),
1412            logical_nulls: None,
1413        };
1414        assert_eq!(list_level, &expected_level);
1415
1416        // test "b" levels
1417        let list_level = levels.get(1).unwrap();
1418
1419        let b_logical_nulls = b.logical_nulls();
1420        let expected_level = ArrayLevels {
1421            def_levels: Some(vec![1, 0, 0, 1, 1]),
1422            rep_levels: None,
1423            non_null_indices: vec![0, 3, 4],
1424            max_def_level: 1,
1425            max_rep_level: 0,
1426            array: Arc::new(b),
1427            logical_nulls: b_logical_nulls,
1428        };
1429        assert_eq!(list_level, &expected_level);
1430
1431        // test "d" levels
1432        let list_level = levels.get(2).unwrap();
1433
1434        let d_logical_nulls = d.logical_nulls();
1435        let expected_level = ArrayLevels {
1436            def_levels: Some(vec![1, 1, 1, 2, 1]),
1437            rep_levels: None,
1438            non_null_indices: vec![3],
1439            max_def_level: 2,
1440            max_rep_level: 0,
1441            array: Arc::new(d),
1442            logical_nulls: d_logical_nulls,
1443        };
1444        assert_eq!(list_level, &expected_level);
1445
1446        // test "f" levels
1447        let list_level = levels.get(3).unwrap();
1448
1449        let f_logical_nulls = f.logical_nulls();
1450        let expected_level = ArrayLevels {
1451            def_levels: Some(vec![3, 2, 3, 2, 3]),
1452            rep_levels: None,
1453            non_null_indices: vec![0, 2, 4],
1454            max_def_level: 3,
1455            max_rep_level: 0,
1456            array: Arc::new(f),
1457            logical_nulls: f_logical_nulls,
1458        };
1459        assert_eq!(list_level, &expected_level);
1460    }
1461
1462    #[test]
1463    fn test_null_vs_nonnull_struct() {
1464        // define schema
1465        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1466        let schema = Schema::new(vec![Field::new(
1467            "some_nested_object",
1468            DataType::Struct(vec![offset_field.clone()].into()),
1469            false,
1470        )]);
1471
1472        // create some data
1473        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1474
1475        let some_nested_object =
1476            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1477
1478        // build a record batch
1479        let batch =
1480            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1481
1482        let struct_null_level =
1483            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1484
1485        // create second batch
1486        // define schema
1487        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1488        let schema = Schema::new(vec![Field::new(
1489            "some_nested_object",
1490            DataType::Struct(vec![offset_field.clone()].into()),
1491            true,
1492        )]);
1493
1494        // create some data
1495        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1496
1497        let some_nested_object =
1498            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1499
1500        // build a record batch
1501        let batch =
1502            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1503
1504        let struct_non_null_level =
1505            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1506
1507        // The 2 levels should not be the same
1508        if struct_non_null_level == struct_null_level {
1509            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1510        }
1511    }
1512
1513    #[test]
1514    fn test_map_array() {
1515        // Note: we are using the JSON Arrow reader for brevity
1516        let json_content = r#"
1517        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1518        {"stocks":{"long": "$CCC", "short": null}}
1519        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1520        "#;
1521        let entries_struct_type = DataType::Struct(Fields::from(vec![
1522            Field::new("key", DataType::Utf8, false),
1523            Field::new("value", DataType::Utf8, true),
1524        ]));
1525        let stocks_field = Field::new(
1526            "stocks",
1527            DataType::Map(
1528                Arc::new(Field::new("entries", entries_struct_type, false)),
1529                false,
1530            ),
1531            // not nullable, so the keys have max level = 1
1532            false,
1533        );
1534        let schema = Arc::new(Schema::new(vec![stocks_field]));
1535        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1536        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1537
1538        let batch = reader.next().unwrap().unwrap();
1539
1540        // calculate the map's level
1541        let mut levels = vec![];
1542        batch
1543            .columns()
1544            .iter()
1545            .zip(batch.schema().fields())
1546            .for_each(|(array, field)| {
1547                let mut array_levels = calculate_array_levels(array, field).unwrap();
1548                levels.append(&mut array_levels);
1549            });
1550        assert_eq!(levels.len(), 2);
1551
1552        let map = batch.column(0).as_map();
1553        let map_keys_logical_nulls = map.keys().logical_nulls();
1554
1555        // test key levels
1556        let list_level = &levels[0];
1557
1558        let expected_level = ArrayLevels {
1559            def_levels: Some(vec![1; 7]),
1560            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1561            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1562            max_def_level: 1,
1563            max_rep_level: 1,
1564            array: map.keys().clone(),
1565            logical_nulls: map_keys_logical_nulls,
1566        };
1567        assert_eq!(list_level, &expected_level);
1568
1569        // test values levels
1570        let list_level = levels.get(1).unwrap();
1571        let map_values_logical_nulls = map.values().logical_nulls();
1572
1573        let expected_level = ArrayLevels {
1574            def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1575            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1576            non_null_indices: vec![0, 1, 2, 4, 6],
1577            max_def_level: 2,
1578            max_rep_level: 1,
1579            array: map.values().clone(),
1580            logical_nulls: map_values_logical_nulls,
1581        };
1582        assert_eq!(list_level, &expected_level);
1583    }
1584
1585    #[test]
1586    fn test_list_of_struct() {
1587        // define schema
1588        let int_field = Field::new("a", DataType::Int32, true);
1589        let fields = Fields::from([Arc::new(int_field)]);
1590        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1591        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1592
1593        let int_builder = Int32Builder::with_capacity(10);
1594        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1595        let mut list_builder = ListBuilder::new(struct_builder);
1596
1597        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
1598        //
1599        // [{a: 1}]
1600        let values = list_builder.values();
1601        values
1602            .field_builder::<Int32Builder>(0)
1603            .unwrap()
1604            .append_value(1);
1605        values.append(true);
1606        list_builder.append(true);
1607
1608        // []
1609        list_builder.append(true);
1610
1611        // null
1612        list_builder.append(false);
1613
1614        // [null, null]
1615        let values = list_builder.values();
1616        values
1617            .field_builder::<Int32Builder>(0)
1618            .unwrap()
1619            .append_null();
1620        values.append(false);
1621        values
1622            .field_builder::<Int32Builder>(0)
1623            .unwrap()
1624            .append_null();
1625        values.append(false);
1626        list_builder.append(true);
1627
1628        // [{a: null}]
1629        let values = list_builder.values();
1630        values
1631            .field_builder::<Int32Builder>(0)
1632            .unwrap()
1633            .append_null();
1634        values.append(true);
1635        list_builder.append(true);
1636
1637        // [{a: 2}]
1638        let values = list_builder.values();
1639        values
1640            .field_builder::<Int32Builder>(0)
1641            .unwrap()
1642            .append_value(2);
1643        values.append(true);
1644        list_builder.append(true);
1645
1646        let array = Arc::new(list_builder.finish());
1647
1648        let values = array.values().as_struct().column(0).clone();
1649        let values_len = values.len();
1650        assert_eq!(values_len, 5);
1651
1652        let schema = Arc::new(Schema::new(vec![list_field]));
1653
1654        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1655
1656        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1657        let list_level = &levels[0];
1658
1659        let logical_nulls = values.logical_nulls();
1660        let expected_level = ArrayLevels {
1661            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1662            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1663            non_null_indices: vec![0, 4],
1664            max_def_level: 4,
1665            max_rep_level: 1,
1666            array: values,
1667            logical_nulls,
1668        };
1669
1670        assert_eq!(list_level, &expected_level);
1671    }
1672
1673    #[test]
1674    fn test_struct_mask_list() {
1675        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
1676        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1677            Some(vec![Some(1), Some(2)]),
1678            Some(vec![None]),
1679            Some(vec![]),
1680            Some(vec![Some(3), None]), // Masked by struct array
1681            Some(vec![Some(4), Some(5)]),
1682            None, // Masked by struct array
1683            None,
1684        ]);
1685        let values = inner.values().clone();
1686
1687        // This test assumes that nulls don't take up space
1688        assert_eq!(inner.values().len(), 7);
1689
1690        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1691        let array = Arc::new(inner) as ArrayRef;
1692        let nulls = Buffer::from([0b01010111]);
1693        let struct_a = StructArray::from((vec![(field, array)], nulls));
1694
1695        let field = Field::new("struct", struct_a.data_type().clone(), true);
1696        let array = Arc::new(struct_a) as ArrayRef;
1697        let levels = calculate_array_levels(&array, &field).unwrap();
1698
1699        assert_eq!(levels.len(), 1);
1700
1701        let logical_nulls = values.logical_nulls();
1702        let expected_level = ArrayLevels {
1703            def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1704            rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1705            non_null_indices: vec![0, 1, 5, 6],
1706            max_def_level: 4,
1707            max_rep_level: 1,
1708            array: values,
1709            logical_nulls,
1710        };
1711
1712        assert_eq!(&levels[0], &expected_level);
1713    }
1714
1715    #[test]
1716    fn test_list_mask_struct() {
1717        // Test the null mask of a struct array and the null mask of a list array
1718        // masking out non-null elements of their children
1719
1720        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1721            Some(vec![None]), // Masked by list array
1722            Some(vec![]),     // Masked by list array
1723            Some(vec![Some(3), None]),
1724            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
1725            None,
1726            None,
1727        ]);
1728        let a1_values = a1.values().clone();
1729        let a1 = Arc::new(a1) as ArrayRef;
1730
1731        let a2 = Arc::new(Int32Array::from_iter(vec![
1732            Some(1), // Masked by list array
1733            Some(2), // Masked by list array
1734            None,
1735            Some(4), // Masked by struct array
1736            Some(5),
1737            None,
1738        ])) as ArrayRef;
1739        let a2_values = a2.clone();
1740
1741        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1742        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1743
1744        let nulls = Buffer::from([0b00110111]);
1745        let struct_a = Arc::new(StructArray::from((
1746            vec![(field_a1, a1), (field_a2, a2)],
1747            nulls,
1748        ))) as ArrayRef;
1749
1750        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1751        let nulls = Buffer::from([0b00111100]);
1752
1753        let list_type = DataType::List(Arc::new(Field::new(
1754            "struct",
1755            struct_a.data_type().clone(),
1756            true,
1757        )));
1758
1759        let data = ArrayDataBuilder::new(list_type.clone())
1760            .len(6)
1761            .null_bit_buffer(Some(nulls))
1762            .add_buffer(offsets)
1763            .add_child_data(struct_a.into_data())
1764            .build()
1765            .unwrap();
1766
1767        let list = make_array(data);
1768        let list_field = Field::new("col", list_type, true);
1769
1770        let expected = vec![
1771            r#""#.to_string(),
1772            r#""#.to_string(),
1773            r#"[]"#.to_string(),
1774            r#"[{list: [3, ], integers: }]"#.to_string(),
1775            r#"[, {list: , integers: 5}]"#.to_string(),
1776            r#"[]"#.to_string(),
1777        ];
1778
1779        let actual: Vec<_> = (0..6)
1780            .map(|x| array_value_to_string(&list, x).unwrap())
1781            .collect();
1782        assert_eq!(actual, expected);
1783
1784        let levels = calculate_array_levels(&list, &list_field).unwrap();
1785
1786        assert_eq!(levels.len(), 2);
1787
1788        let a1_logical_nulls = a1_values.logical_nulls();
1789        let expected_level = ArrayLevels {
1790            def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1791            rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1792            non_null_indices: vec![1],
1793            max_def_level: 6,
1794            max_rep_level: 2,
1795            array: a1_values,
1796            logical_nulls: a1_logical_nulls,
1797        };
1798
1799        assert_eq!(&levels[0], &expected_level);
1800
1801        let a2_logical_nulls = a2_values.logical_nulls();
1802        let expected_level = ArrayLevels {
1803            def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1804            rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1805            non_null_indices: vec![4],
1806            max_def_level: 4,
1807            max_rep_level: 1,
1808            array: a2_values,
1809            logical_nulls: a2_logical_nulls,
1810        };
1811
1812        assert_eq!(&levels[1], &expected_level);
1813    }
1814
1815    #[test]
1816    fn test_fixed_size_list() {
1817        // [[1, 2], null, null, [7, 8], null]
1818        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1819        builder.values().append_slice(&[1, 2]);
1820        builder.append(true);
1821        builder.values().append_slice(&[3, 4]);
1822        builder.append(false);
1823        builder.values().append_slice(&[5, 6]);
1824        builder.append(false);
1825        builder.values().append_slice(&[7, 8]);
1826        builder.append(true);
1827        builder.values().append_slice(&[9, 10]);
1828        builder.append(false);
1829        let a = builder.finish();
1830        let values = a.values().clone();
1831
1832        let item_field = Field::new_list_field(a.data_type().clone(), true);
1833        let mut builder = levels(&item_field, a);
1834        builder.write(1..4);
1835        let levels = builder.finish();
1836
1837        assert_eq!(levels.len(), 1);
1838
1839        let list_level = &levels[0];
1840
1841        let logical_nulls = values.logical_nulls();
1842        let expected_level = ArrayLevels {
1843            def_levels: Some(vec![0, 0, 3, 3]),
1844            rep_levels: Some(vec![0, 0, 0, 1]),
1845            non_null_indices: vec![6, 7],
1846            max_def_level: 3,
1847            max_rep_level: 1,
1848            array: values,
1849            logical_nulls,
1850        };
1851        assert_eq!(list_level, &expected_level);
1852    }
1853
1854    #[test]
1855    fn test_fixed_size_list_of_struct() {
1856        // define schema
1857        let field_a = Field::new("a", DataType::Int32, true);
1858        let field_b = Field::new("b", DataType::Int64, false);
1859        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1860        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1861        let list_field = Field::new(
1862            "list",
1863            DataType::FixedSizeList(Arc::new(item_field), 2),
1864            true,
1865        );
1866
1867        let builder_a = Int32Builder::with_capacity(10);
1868        let builder_b = Int64Builder::with_capacity(10);
1869        let struct_builder =
1870            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1871        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1872
1873        // [
1874        //   [{a: 1, b: 2}, null],
1875        //   null,
1876        //   [null, null],
1877        //   [{a: null, b: 3}, {a: 2, b: 4}]
1878        // ]
1879
1880        // [{a: 1, b: 2}, null]
1881        let values = list_builder.values();
1882        // {a: 1, b: 2}
1883        values
1884            .field_builder::<Int32Builder>(0)
1885            .unwrap()
1886            .append_value(1);
1887        values
1888            .field_builder::<Int64Builder>(1)
1889            .unwrap()
1890            .append_value(2);
1891        values.append(true);
1892        // null
1893        values
1894            .field_builder::<Int32Builder>(0)
1895            .unwrap()
1896            .append_null();
1897        values
1898            .field_builder::<Int64Builder>(1)
1899            .unwrap()
1900            .append_value(0);
1901        values.append(false);
1902        list_builder.append(true);
1903
1904        // null
1905        let values = list_builder.values();
1906        // null
1907        values
1908            .field_builder::<Int32Builder>(0)
1909            .unwrap()
1910            .append_null();
1911        values
1912            .field_builder::<Int64Builder>(1)
1913            .unwrap()
1914            .append_value(0);
1915        values.append(false);
1916        // null
1917        values
1918            .field_builder::<Int32Builder>(0)
1919            .unwrap()
1920            .append_null();
1921        values
1922            .field_builder::<Int64Builder>(1)
1923            .unwrap()
1924            .append_value(0);
1925        values.append(false);
1926        list_builder.append(false);
1927
1928        // [null, null]
1929        let values = list_builder.values();
1930        // null
1931        values
1932            .field_builder::<Int32Builder>(0)
1933            .unwrap()
1934            .append_null();
1935        values
1936            .field_builder::<Int64Builder>(1)
1937            .unwrap()
1938            .append_value(0);
1939        values.append(false);
1940        // null
1941        values
1942            .field_builder::<Int32Builder>(0)
1943            .unwrap()
1944            .append_null();
1945        values
1946            .field_builder::<Int64Builder>(1)
1947            .unwrap()
1948            .append_value(0);
1949        values.append(false);
1950        list_builder.append(true);
1951
1952        // [{a: null, b: 3}, {a: 2, b: 4}]
1953        let values = list_builder.values();
1954        // {a: null, b: 3}
1955        values
1956            .field_builder::<Int32Builder>(0)
1957            .unwrap()
1958            .append_null();
1959        values
1960            .field_builder::<Int64Builder>(1)
1961            .unwrap()
1962            .append_value(3);
1963        values.append(true);
1964        // {a: 2, b: 4}
1965        values
1966            .field_builder::<Int32Builder>(0)
1967            .unwrap()
1968            .append_value(2);
1969        values
1970            .field_builder::<Int64Builder>(1)
1971            .unwrap()
1972            .append_value(4);
1973        values.append(true);
1974        list_builder.append(true);
1975
1976        let array = Arc::new(list_builder.finish());
1977
1978        assert_eq!(array.values().len(), 8);
1979        assert_eq!(array.len(), 4);
1980
1981        let struct_values = array.values().as_struct();
1982        let values_a = struct_values.column(0).clone();
1983        let values_b = struct_values.column(1).clone();
1984
1985        let schema = Arc::new(Schema::new(vec![list_field]));
1986        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1987
1988        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1989        let a_levels = &levels[0];
1990        let b_levels = &levels[1];
1991
1992        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
1993        let values_a_logical_nulls = values_a.logical_nulls();
1994        let expected_a = ArrayLevels {
1995            def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1996            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1997            non_null_indices: vec![0, 7],
1998            max_def_level: 4,
1999            max_rep_level: 1,
2000            array: values_a,
2001            logical_nulls: values_a_logical_nulls,
2002        };
2003        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
2004        let values_b_logical_nulls = values_b.logical_nulls();
2005        let expected_b = ArrayLevels {
2006            def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
2007            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2008            non_null_indices: vec![0, 6, 7],
2009            max_def_level: 3,
2010            max_rep_level: 1,
2011            array: values_b,
2012            logical_nulls: values_b_logical_nulls,
2013        };
2014
2015        assert_eq!(a_levels, &expected_a);
2016        assert_eq!(b_levels, &expected_b);
2017    }
2018
2019    #[test]
2020    fn test_fixed_size_list_empty() {
2021        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2022        builder.append(true);
2023        builder.append(false);
2024        builder.append(true);
2025        let array = builder.finish();
2026        let values = array.values().clone();
2027
2028        let item_field = Field::new_list_field(array.data_type().clone(), true);
2029        let mut builder = levels(&item_field, array);
2030        builder.write(0..3);
2031        let levels = builder.finish();
2032
2033        assert_eq!(levels.len(), 1);
2034
2035        let list_level = &levels[0];
2036
2037        let logical_nulls = values.logical_nulls();
2038        let expected_level = ArrayLevels {
2039            def_levels: Some(vec![1, 0, 1]),
2040            rep_levels: Some(vec![0, 0, 0]),
2041            non_null_indices: vec![],
2042            max_def_level: 3,
2043            max_rep_level: 1,
2044            array: values,
2045            logical_nulls,
2046        };
2047        assert_eq!(list_level, &expected_level);
2048    }
2049
2050    #[test]
2051    fn test_fixed_size_list_of_var_lists() {
2052        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
2053        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2054        builder.values().append_value([Some(1), None, Some(3)]);
2055        builder.values().append_null();
2056        builder.append(true);
2057        builder.values().append_value([Some(4)]);
2058        builder.values().append_value([]);
2059        builder.append(true);
2060        builder.values().append_value([Some(5), Some(6)]);
2061        builder.values().append_value([None, None]);
2062        builder.append(true);
2063        builder.values().append_null();
2064        builder.values().append_null();
2065        builder.append(false);
2066        let a = builder.finish();
2067        let values = a.values().as_list::<i32>().values().clone();
2068
2069        let item_field = Field::new_list_field(a.data_type().clone(), true);
2070        let mut builder = levels(&item_field, a);
2071        builder.write(0..4);
2072        let levels = builder.finish();
2073
2074        let logical_nulls = values.logical_nulls();
2075        let expected_level = ArrayLevels {
2076            def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2077            rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2078            non_null_indices: vec![0, 2, 3, 4, 5],
2079            max_def_level: 5,
2080            max_rep_level: 2,
2081            array: values,
2082            logical_nulls,
2083        };
2084
2085        assert_eq!(levels[0], expected_level);
2086    }
2087
2088    #[test]
2089    fn test_null_dictionary_values() {
2090        let values = Int32Array::new(
2091            vec![1, 2, 3, 4].into(),
2092            Some(NullBuffer::from(vec![true, false, true, true])),
2093        );
2094        let keys = Int32Array::new(
2095            vec![1, 54, 2, 0].into(),
2096            Some(NullBuffer::from(vec![true, false, true, true])),
2097        );
2098        // [NULL, NULL, 3, 0]
2099        let dict = DictionaryArray::new(keys, Arc::new(values));
2100
2101        let item_field = Field::new_list_field(dict.data_type().clone(), true);
2102
2103        let mut builder = levels(&item_field, dict.clone());
2104        builder.write(0..4);
2105        let levels = builder.finish();
2106
2107        let logical_nulls = dict.logical_nulls();
2108        let expected_level = ArrayLevels {
2109            def_levels: Some(vec![0, 0, 1, 1]),
2110            rep_levels: None,
2111            non_null_indices: vec![2, 3],
2112            max_def_level: 1,
2113            max_rep_level: 0,
2114            array: Arc::new(dict),
2115            logical_nulls,
2116        };
2117        assert_eq!(levels[0], expected_level);
2118    }
2119
2120    #[test]
2121    fn mismatched_types() {
2122        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2123        let field = Field::new_list_field(DataType::Float64, false);
2124
2125        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2126            .unwrap_err()
2127            .to_string();
2128
2129        assert_eq!(
2130            err,
2131            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2132        );
2133    }
2134
2135    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2136        let v = Arc::new(array) as ArrayRef;
2137        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2138    }
2139
2140    #[test]
2141    fn test_slice_for_chunk_flat() {
2142        // Case 1: required field (max_def_level=0, no def/rep levels stored).
2143        // Array has 6 values; all are non-null so non_null_indices covers every position.
2144        // value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4].
2145        // Array is sliced (no def_levels → write_batch_internal uses values.len()).
2146        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2147        let logical_nulls = array.logical_nulls();
2148        let levels = ArrayLevels {
2149            def_levels: None,
2150            rep_levels: None,
2151            non_null_indices: vec![0, 1, 2, 3, 4, 5],
2152            max_def_level: 0,
2153            max_rep_level: 0,
2154            array,
2155            logical_nulls,
2156        };
2157        let sliced = levels.slice_for_chunk(&CdcChunk {
2158            level_offset: 0,
2159            num_levels: 0,
2160            value_offset: 2,
2161            num_values: 3,
2162        });
2163        assert!(sliced.def_levels.is_none());
2164        assert!(sliced.rep_levels.is_none());
2165        assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2166        assert_eq!(sliced.array.len(), 3);
2167
2168        // Case 2: optional field (max_def_level=1, def levels present, no rep levels).
2169        // Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
2170        // non_null_indices: [0, 2, 4, 5]
2171        // value_offset=1, num_values=1 → non_null_indices[1..2] = [2].
2172        // Array is not sliced (def_levels present → num_levels from def_levels.len()).
2173        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2174            Some(1),
2175            None,
2176            Some(3),
2177            None,
2178            Some(5),
2179            Some(6),
2180        ]));
2181        let logical_nulls = array.logical_nulls();
2182        let levels = ArrayLevels {
2183            def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
2184            rep_levels: None,
2185            non_null_indices: vec![0, 2, 4, 5],
2186            max_def_level: 1,
2187            max_rep_level: 0,
2188            array,
2189            logical_nulls,
2190        };
2191        let sliced = levels.slice_for_chunk(&CdcChunk {
2192            level_offset: 1,
2193            num_levels: 3,
2194            value_offset: 1,
2195            num_values: 1,
2196        });
2197        assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
2198        assert!(sliced.rep_levels.is_none());
2199        assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2 (nni[0])
2200        assert_eq!(sliced.array.len(), 1);
2201    }
2202
2203    #[test]
2204    fn test_slice_for_chunk_nested_with_nulls() {
2205        // Regression test for https://github.com/apache/arrow-rs/issues/9637
2206        //
2207        // Simulates a List<Int32?> where null list entries have non-zero child
2208        // ranges (valid per Arrow spec: "a null value may correspond to a
2209        // non-empty segment in the child array"). This creates gaps in the
2210        // leaf array that don't correspond to any levels.
2211        //
2212        // 5 rows with 2 null list entries owning non-empty child ranges:
2213        //   row 0: [1]       → leaf[0]
2214        //   row 1: null list → owns leaf[1..3] (gap of 2)
2215        //   row 2: [2, null] → leaf[3], leaf[4]=null element
2216        //   row 3: null list → owns leaf[5..8] (gap of 3)
2217        //   row 4: [4, 5]   → leaf[8], leaf[9]
2218        //
2219        // def_levels: [3,  0,  3, 2,  0,  3, 3]
2220        // rep_levels: [0,  0,  0, 1,  0,  0, 1]
2221        // non_null_indices: [0, 3, 8, 9]
2222        //   gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7)
2223        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2224            Some(1), // 0: row 0
2225            None,    // 1: gap (null list row 1)
2226            None,    // 2: gap (null list row 1)
2227            Some(2), // 3: row 2
2228            None,    // 4: row 2, null element
2229            None,    // 5: gap (null list row 3)
2230            None,    // 6: gap (null list row 3)
2231            None,    // 7: gap (null list row 3)
2232            Some(4), // 8: row 4
2233            Some(5), // 9: row 4
2234        ]));
2235        let logical_nulls = array.logical_nulls();
2236        let levels = ArrayLevels {
2237            def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
2238            rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
2239            non_null_indices: vec![0, 3, 8, 9],
2240            max_def_level: 3,
2241            max_rep_level: 1,
2242            array,
2243            logical_nulls,
2244        };
2245
2246        // Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1]
2247        let chunk0 = levels.slice_for_chunk(&CdcChunk {
2248            level_offset: 0,
2249            num_levels: 2,
2250            value_offset: 0,
2251            num_values: 1,
2252        });
2253        assert_eq!(chunk0.non_null_indices, vec![0]);
2254        assert_eq!(chunk0.array.len(), 1);
2255
2256        // Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4]
2257        let chunk1 = levels.slice_for_chunk(&CdcChunk {
2258            level_offset: 2,
2259            num_levels: 3,
2260            value_offset: 1,
2261            num_values: 1,
2262        });
2263        assert_eq!(chunk1.non_null_indices, vec![0]);
2264        assert_eq!(chunk1.array.len(), 1);
2265
2266        // Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10]
2267        let chunk2 = levels.slice_for_chunk(&CdcChunk {
2268            level_offset: 5,
2269            num_levels: 2,
2270            value_offset: 2,
2271            num_values: 2,
2272        });
2273        assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2274        assert_eq!(chunk2.array.len(), 2);
2275    }
2276
2277    #[test]
2278    fn test_slice_for_chunk_all_null() {
2279        // All-null chunk: num_values=0 → empty nni slice → zero-length array.
2280        let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2281        let logical_nulls = array.logical_nulls();
2282        let levels = ArrayLevels {
2283            def_levels: Some(vec![1, 0, 0, 1]),
2284            rep_levels: None,
2285            non_null_indices: vec![0, 3],
2286            max_def_level: 1,
2287            max_rep_level: 0,
2288            array,
2289            logical_nulls,
2290        };
2291        // Chunk covering only the two null rows (levels 1..3), zero non-null values.
2292        let sliced = levels.slice_for_chunk(&CdcChunk {
2293            level_offset: 1,
2294            num_levels: 2,
2295            value_offset: 1,
2296            num_values: 0,
2297        });
2298        assert_eq!(sliced.def_levels, Some(vec![0, 0]));
2299        assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2300        assert_eq!(sliced.array.len(), 0);
2301    }
2302}