parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::errors::{ParquetError, Result};
44use arrow_array::cast::AsArray;
45use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
46use arrow_buffer::bit_iterator::BitIndexIterator;
47use arrow_buffer::{NullBuffer, OffsetBuffer};
48use arrow_schema::{DataType, Field};
49use std::ops::Range;
50use std::sync::Arc;
51
52/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
53/// for each leaf column encountered
54pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
55    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
56    builder.write(0..array.len());
57    Ok(builder.finish())
58}
59
60/// Returns true if the DataType can be represented as a primitive parquet column,
61/// i.e. a leaf array with no children
62fn is_leaf(data_type: &DataType) -> bool {
63    matches!(
64        data_type,
65        DataType::Null
66            | DataType::Boolean
67            | DataType::Int8
68            | DataType::Int16
69            | DataType::Int32
70            | DataType::Int64
71            | DataType::UInt8
72            | DataType::UInt16
73            | DataType::UInt32
74            | DataType::UInt64
75            | DataType::Float16
76            | DataType::Float32
77            | DataType::Float64
78            | DataType::Utf8
79            | DataType::Utf8View
80            | DataType::LargeUtf8
81            | DataType::Timestamp(_, _)
82            | DataType::Date32
83            | DataType::Date64
84            | DataType::Time32(_)
85            | DataType::Time64(_)
86            | DataType::Duration(_)
87            | DataType::Interval(_)
88            | DataType::Binary
89            | DataType::LargeBinary
90            | DataType::BinaryView
91            | DataType::Decimal32(_, _)
92            | DataType::Decimal64(_, _)
93            | DataType::Decimal128(_, _)
94            | DataType::Decimal256(_, _)
95            | DataType::FixedSizeBinary(_)
96    )
97}
98
99/// The definition and repetition level of an array within a potentially nested hierarchy
100#[derive(Debug, Default, Clone, Copy)]
101struct LevelContext {
102    /// The current repetition level
103    rep_level: i16,
104    /// The current definition level
105    def_level: i16,
106}
107
108/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
109#[derive(Debug)]
110enum LevelInfoBuilder {
111    /// A primitive, leaf array
112    Primitive(ArrayLevels),
113    /// A list array
114    List(
115        Box<LevelInfoBuilder>, // Child Values
116        LevelContext,          // Context
117        OffsetBuffer<i32>,     // Offsets
118        Option<NullBuffer>,    // Nulls
119    ),
120    /// A large list array
121    LargeList(
122        Box<LevelInfoBuilder>, // Child Values
123        LevelContext,          // Context
124        OffsetBuffer<i64>,     // Offsets
125        Option<NullBuffer>,    // Nulls
126    ),
127    /// A fixed size list array
128    FixedSizeList(
129        Box<LevelInfoBuilder>, // Values
130        LevelContext,          // Context
131        usize,                 // List Size
132        Option<NullBuffer>,    // Nulls
133    ),
134    /// A struct array
135    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
136}
137
138impl LevelInfoBuilder {
139    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
140    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
141        if !Self::types_compatible(field.data_type(), array.data_type()) {
142            return Err(arrow_err!(format!(
143                "Incompatible type. Field '{}' has type {}, array has type {}",
144                field.name(),
145                field.data_type(),
146                array.data_type(),
147            )));
148        }
149
150        let is_nullable = field.is_nullable();
151
152        match array.data_type() {
153            d if is_leaf(d) => {
154                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
155                Ok(Self::Primitive(levels))
156            }
157            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
158                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
159                Ok(Self::Primitive(levels))
160            }
161            DataType::Struct(children) => {
162                let array = array.as_struct();
163                let def_level = match is_nullable {
164                    true => parent_ctx.def_level + 1,
165                    false => parent_ctx.def_level,
166                };
167
168                let ctx = LevelContext {
169                    rep_level: parent_ctx.rep_level,
170                    def_level,
171                };
172
173                let children = children
174                    .iter()
175                    .zip(array.columns())
176                    .map(|(f, a)| Self::try_new(f, ctx, a))
177                    .collect::<Result<_>>()?;
178
179                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
180            }
181            DataType::List(child)
182            | DataType::LargeList(child)
183            | DataType::Map(child, _)
184            | DataType::FixedSizeList(child, _) => {
185                let def_level = match is_nullable {
186                    true => parent_ctx.def_level + 2,
187                    false => parent_ctx.def_level + 1,
188                };
189
190                let ctx = LevelContext {
191                    rep_level: parent_ctx.rep_level + 1,
192                    def_level,
193                };
194
195                Ok(match field.data_type() {
196                    DataType::List(_) => {
197                        let list = array.as_list();
198                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
199                        let offsets = list.offsets().clone();
200                        Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
201                    }
202                    DataType::LargeList(_) => {
203                        let list = array.as_list();
204                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
205                        let offsets = list.offsets().clone();
206                        let nulls = list.nulls().cloned();
207                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
208                    }
209                    DataType::Map(_, _) => {
210                        let map = array.as_map();
211                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
212                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
213                        let offsets = map.offsets().clone();
214                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
215                    }
216                    DataType::FixedSizeList(_, size) => {
217                        let list = array.as_fixed_size_list();
218                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
219                        let nulls = list.nulls().cloned();
220                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
221                    }
222                    _ => unreachable!(),
223                })
224            }
225            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
226        }
227    }
228
229    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
230    /// as enumerated by a depth-first search
231    fn finish(self) -> Vec<ArrayLevels> {
232        match self {
233            LevelInfoBuilder::Primitive(v) => vec![v],
234            LevelInfoBuilder::List(v, _, _, _)
235            | LevelInfoBuilder::LargeList(v, _, _, _)
236            | LevelInfoBuilder::FixedSizeList(v, _, _, _) => v.finish(),
237            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
238        }
239    }
240
241    /// Given an `array`, write the level data for the elements in `range`
242    fn write(&mut self, range: Range<usize>) {
243        match self {
244            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
245            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
246                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
247            }
248            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
249                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
250            }
251            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
252                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
253            }
254            LevelInfoBuilder::Struct(children, ctx, nulls) => {
255                Self::write_struct(children, ctx, nulls.as_ref(), range)
256            }
257        }
258    }
259
260    /// Write `range` elements from ListArray `array`
261    ///
262    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
263    fn write_list<O: OffsetSizeTrait>(
264        child: &mut LevelInfoBuilder,
265        ctx: &LevelContext,
266        offsets: &[O],
267        nulls: Option<&NullBuffer>,
268        range: Range<usize>,
269    ) {
270        let offsets = &offsets[range.start..range.end + 1];
271
272        let write_non_null_slice =
273            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
274                child.write(start_idx..end_idx);
275                child.visit_leaves(|leaf| {
276                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
277                    let mut rev = rep_levels.iter_mut().rev();
278                    let mut remaining = end_idx - start_idx;
279
280                    loop {
281                        let next = rev.next().unwrap();
282                        if *next > ctx.rep_level {
283                            // Nested element - ignore
284                            continue;
285                        }
286
287                        remaining -= 1;
288                        if remaining == 0 {
289                            *next = ctx.rep_level - 1;
290                            break;
291                        }
292                    }
293                })
294            };
295
296        let write_empty_slice = |child: &mut LevelInfoBuilder| {
297            child.visit_leaves(|leaf| {
298                let rep_levels = leaf.rep_levels.as_mut().unwrap();
299                rep_levels.push(ctx.rep_level - 1);
300                let def_levels = leaf.def_levels.as_mut().unwrap();
301                def_levels.push(ctx.def_level - 1);
302            })
303        };
304
305        let write_null_slice = |child: &mut LevelInfoBuilder| {
306            child.visit_leaves(|leaf| {
307                let rep_levels = leaf.rep_levels.as_mut().unwrap();
308                rep_levels.push(ctx.rep_level - 1);
309                let def_levels = leaf.def_levels.as_mut().unwrap();
310                def_levels.push(ctx.def_level - 2);
311            })
312        };
313
314        match nulls {
315            Some(nulls) => {
316                let null_offset = range.start;
317                // TODO: Faster bitmask iteration (#1757)
318                for (idx, w) in offsets.windows(2).enumerate() {
319                    let is_valid = nulls.is_valid(idx + null_offset);
320                    let start_idx = w[0].as_usize();
321                    let end_idx = w[1].as_usize();
322                    if !is_valid {
323                        write_null_slice(child)
324                    } else if start_idx == end_idx {
325                        write_empty_slice(child)
326                    } else {
327                        write_non_null_slice(child, start_idx, end_idx)
328                    }
329                }
330            }
331            None => {
332                for w in offsets.windows(2) {
333                    let start_idx = w[0].as_usize();
334                    let end_idx = w[1].as_usize();
335                    if start_idx == end_idx {
336                        write_empty_slice(child)
337                    } else {
338                        write_non_null_slice(child, start_idx, end_idx)
339                    }
340                }
341            }
342        }
343    }
344
345    /// Write `range` elements from StructArray `array`
346    fn write_struct(
347        children: &mut [LevelInfoBuilder],
348        ctx: &LevelContext,
349        nulls: Option<&NullBuffer>,
350        range: Range<usize>,
351    ) {
352        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
353            for child in children {
354                child.visit_leaves(|info| {
355                    let len = range.end - range.start;
356
357                    let def_levels = info.def_levels.as_mut().unwrap();
358                    def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
359
360                    if let Some(rep_levels) = info.rep_levels.as_mut() {
361                        rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
362                    }
363                })
364            }
365        };
366
367        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
368            for child in children {
369                child.write(range.clone())
370            }
371        };
372
373        match nulls {
374            Some(validity) => {
375                let mut last_non_null_idx = None;
376                let mut last_null_idx = None;
377
378                // TODO: Faster bitmask iteration (#1757)
379                for i in range.clone() {
380                    match validity.is_valid(i) {
381                        true => {
382                            if let Some(last_idx) = last_null_idx.take() {
383                                write_null(children, last_idx..i)
384                            }
385                            last_non_null_idx.get_or_insert(i);
386                        }
387                        false => {
388                            if let Some(last_idx) = last_non_null_idx.take() {
389                                write_non_null(children, last_idx..i)
390                            }
391                            last_null_idx.get_or_insert(i);
392                        }
393                    }
394                }
395
396                if let Some(last_idx) = last_null_idx.take() {
397                    write_null(children, last_idx..range.end)
398                }
399
400                if let Some(last_idx) = last_non_null_idx.take() {
401                    write_non_null(children, last_idx..range.end)
402                }
403            }
404            None => write_non_null(children, range),
405        }
406    }
407
408    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
409    fn write_fixed_size_list(
410        child: &mut LevelInfoBuilder,
411        ctx: &LevelContext,
412        fixed_size: usize,
413        nulls: Option<&NullBuffer>,
414        range: Range<usize>,
415    ) {
416        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
417            let values_start = start_idx * fixed_size;
418            let values_end = end_idx * fixed_size;
419            child.write(values_start..values_end);
420
421            child.visit_leaves(|leaf| {
422                let rep_levels = leaf.rep_levels.as_mut().unwrap();
423
424                let row_indices = (0..fixed_size)
425                    .rev()
426                    .cycle()
427                    .take(values_end - values_start);
428
429                // Step backward over the child rep levels and mark the start of each list
430                rep_levels
431                    .iter_mut()
432                    .rev()
433                    // Filter out reps from nested children
434                    .filter(|&&mut r| r == ctx.rep_level)
435                    .zip(row_indices)
436                    .for_each(|(r, idx)| {
437                        if idx == 0 {
438                            *r = ctx.rep_level - 1;
439                        }
440                    });
441            })
442        };
443
444        // If list size is 0, ignore values and just write rep/def levels.
445        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
446            let len = end_idx - start_idx;
447            child.visit_leaves(|leaf| {
448                let rep_levels = leaf.rep_levels.as_mut().unwrap();
449                rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
450                let def_levels = leaf.def_levels.as_mut().unwrap();
451                def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
452            })
453        };
454
455        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
456            if fixed_size > 0 {
457                write_non_null(child, start_idx, end_idx)
458            } else {
459                write_empty(child, start_idx, end_idx)
460            }
461        };
462
463        match nulls {
464            Some(nulls) => {
465                let mut start_idx = None;
466                for idx in range.clone() {
467                    if nulls.is_valid(idx) {
468                        // Start a run of valid rows if not already inside of one
469                        start_idx.get_or_insert(idx);
470                    } else {
471                        // Write out any pending valid rows
472                        if let Some(start) = start_idx.take() {
473                            write_rows(child, start, idx);
474                        }
475                        // Add null row
476                        child.visit_leaves(|leaf| {
477                            let rep_levels = leaf.rep_levels.as_mut().unwrap();
478                            rep_levels.push(ctx.rep_level - 1);
479                            let def_levels = leaf.def_levels.as_mut().unwrap();
480                            def_levels.push(ctx.def_level - 2);
481                        })
482                    }
483                }
484                // Write out any remaining valid rows
485                if let Some(start) = start_idx.take() {
486                    write_rows(child, start, range.end);
487                }
488            }
489            // If all rows are valid then write the whole array
490            None => write_rows(child, range.start, range.end),
491        }
492    }
493
494    /// Write a primitive array, as defined by [`is_leaf`]
495    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
496        let len = range.end - range.start;
497
498        match &mut info.def_levels {
499            Some(def_levels) => {
500                def_levels.reserve(len);
501                info.non_null_indices.reserve(len);
502
503                match &info.logical_nulls {
504                    Some(nulls) => {
505                        assert!(range.end <= nulls.len());
506                        let nulls = nulls.inner();
507                        def_levels.extend(range.clone().map(|i| {
508                            // Safety: range.end was asserted to be in bounds earlier
509                            let valid = unsafe { nulls.value_unchecked(i) };
510                            info.max_def_level - (!valid as i16)
511                        }));
512                        info.non_null_indices.extend(
513                            BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
514                                .map(|i| i + range.start),
515                        );
516                    }
517                    None => {
518                        let iter = std::iter::repeat_n(info.max_def_level, len);
519                        def_levels.extend(iter);
520                        info.non_null_indices.extend(range);
521                    }
522                }
523            }
524            None => info.non_null_indices.extend(range),
525        }
526
527        if let Some(rep_levels) = &mut info.rep_levels {
528            rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
529        }
530    }
531
532    /// Visits all children of this node in depth first order
533    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
534        match self {
535            LevelInfoBuilder::Primitive(info) => visit(info),
536            LevelInfoBuilder::List(c, _, _, _)
537            | LevelInfoBuilder::LargeList(c, _, _, _)
538            | LevelInfoBuilder::FixedSizeList(c, _, _, _) => c.visit_leaves(visit),
539            LevelInfoBuilder::Struct(children, _, _) => {
540                for c in children {
541                    c.visit_leaves(visit)
542                }
543            }
544        }
545    }
546
547    /// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
548    ///
549    /// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
550    /// and the other is a native array, the dictionary values must have the same type as the
551    /// native array
552    fn types_compatible(a: &DataType, b: &DataType) -> bool {
553        if a == b {
554            return true;
555        }
556
557        match (a, b) {
558            (DataType::Dictionary(_, v), b) => v.as_ref() == b,
559            (a, DataType::Dictionary(_, v)) => a == v.as_ref(),
560            _ => false,
561        }
562    }
563}
564
565/// The data necessary to write a primitive Arrow array to parquet, taking into account
566/// any non-primitive parents it may have in the arrow representation
567#[derive(Debug, Clone)]
568pub(crate) struct ArrayLevels {
569    /// Array's definition levels
570    ///
571    /// Present if `max_def_level != 0`
572    def_levels: Option<Vec<i16>>,
573
574    /// Array's optional repetition levels
575    ///
576    /// Present if `max_rep_level != 0`
577    rep_levels: Option<Vec<i16>>,
578
579    /// The corresponding array identifying non-null slices of data
580    /// from the primitive array
581    non_null_indices: Vec<usize>,
582
583    /// The maximum definition level for this leaf column
584    max_def_level: i16,
585
586    /// The maximum repetition for this leaf column
587    max_rep_level: i16,
588
589    /// The arrow array
590    array: ArrayRef,
591
592    /// cached logical nulls of the array.
593    logical_nulls: Option<NullBuffer>,
594}
595
596impl PartialEq for ArrayLevels {
597    fn eq(&self, other: &Self) -> bool {
598        self.def_levels == other.def_levels
599            && self.rep_levels == other.rep_levels
600            && self.non_null_indices == other.non_null_indices
601            && self.max_def_level == other.max_def_level
602            && self.max_rep_level == other.max_rep_level
603            && self.array.as_ref() == other.array.as_ref()
604            && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
605    }
606}
607impl Eq for ArrayLevels {}
608
609impl ArrayLevels {
610    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
611        let max_rep_level = ctx.rep_level;
612        let max_def_level = match is_nullable {
613            true => ctx.def_level + 1,
614            false => ctx.def_level,
615        };
616
617        let logical_nulls = array.logical_nulls();
618
619        Self {
620            def_levels: (max_def_level != 0).then(Vec::new),
621            rep_levels: (max_rep_level != 0).then(Vec::new),
622            non_null_indices: vec![],
623            max_def_level,
624            max_rep_level,
625            array,
626            logical_nulls,
627        }
628    }
629
630    pub fn array(&self) -> &ArrayRef {
631        &self.array
632    }
633
634    pub fn def_levels(&self) -> Option<&[i16]> {
635        self.def_levels.as_deref()
636    }
637
638    pub fn rep_levels(&self) -> Option<&[i16]> {
639        self.rep_levels.as_deref()
640    }
641
642    pub fn non_null_indices(&self) -> &[usize] {
643        &self.non_null_indices
644    }
645}
646
647#[cfg(test)]
648mod tests {
649    use super::*;
650
651    use arrow_array::builder::*;
652    use arrow_array::types::Int32Type;
653    use arrow_array::*;
654    use arrow_buffer::{Buffer, ToByteSlice};
655    use arrow_cast::display::array_value_to_string;
656    use arrow_data::{ArrayData, ArrayDataBuilder};
657    use arrow_schema::{Fields, Schema};
658
659    #[test]
660    fn test_calculate_array_levels_twitter_example() {
661        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
662        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
663
664        let leaf_type = Field::new_list_field(DataType::Int32, false);
665        let inner_type = DataType::List(Arc::new(leaf_type));
666        let inner_field = Field::new("l2", inner_type.clone(), false);
667        let outer_type = DataType::List(Arc::new(inner_field));
668        let outer_field = Field::new("l1", outer_type.clone(), false);
669
670        let primitives = Int32Array::from_iter(0..10);
671
672        // Cannot use from_iter_primitive as always infers nullable
673        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
674        let inner_list = ArrayDataBuilder::new(inner_type)
675            .len(4)
676            .add_buffer(offsets)
677            .add_child_data(primitives.to_data())
678            .build()
679            .unwrap();
680
681        let offsets = Buffer::from_iter([0_i32, 2, 4]);
682        let outer_list = ArrayDataBuilder::new(outer_type)
683            .len(2)
684            .add_buffer(offsets)
685            .add_child_data(inner_list)
686            .build()
687            .unwrap();
688        let outer_list = make_array(outer_list);
689
690        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
691        assert_eq!(levels.len(), 1);
692
693        let expected = ArrayLevels {
694            def_levels: Some(vec![2; 10]),
695            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
696            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
697            max_def_level: 2,
698            max_rep_level: 2,
699            array: Arc::new(primitives),
700            logical_nulls: None,
701        };
702        assert_eq!(&levels[0], &expected);
703    }
704
705    #[test]
706    fn test_calculate_one_level_1() {
707        // This test calculates the levels for a non-null primitive array
708        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
709        let field = Field::new_list_field(DataType::Int32, false);
710
711        let levels = calculate_array_levels(&array, &field).unwrap();
712        assert_eq!(levels.len(), 1);
713
714        let expected_levels = ArrayLevels {
715            def_levels: None,
716            rep_levels: None,
717            non_null_indices: (0..10).collect(),
718            max_def_level: 0,
719            max_rep_level: 0,
720            array,
721            logical_nulls: None,
722        };
723        assert_eq!(&levels[0], &expected_levels);
724    }
725
726    #[test]
727    fn test_calculate_one_level_2() {
728        // This test calculates the levels for a nullable primitive array
729        let array = Arc::new(Int32Array::from_iter([
730            Some(0),
731            None,
732            Some(0),
733            Some(0),
734            None,
735        ])) as ArrayRef;
736        let field = Field::new_list_field(DataType::Int32, true);
737
738        let levels = calculate_array_levels(&array, &field).unwrap();
739        assert_eq!(levels.len(), 1);
740
741        let logical_nulls = array.logical_nulls();
742        let expected_levels = ArrayLevels {
743            def_levels: Some(vec![1, 0, 1, 1, 0]),
744            rep_levels: None,
745            non_null_indices: vec![0, 2, 3],
746            max_def_level: 1,
747            max_rep_level: 0,
748            array,
749            logical_nulls,
750        };
751        assert_eq!(&levels[0], &expected_levels);
752    }
753
754    #[test]
755    fn test_calculate_array_levels_1() {
756        let leaf_field = Field::new_list_field(DataType::Int32, false);
757        let list_type = DataType::List(Arc::new(leaf_field));
758
759        // if all array values are defined (e.g. batch<list<_>>)
760        // [[0], [1], [2], [3], [4]]
761
762        let leaf_array = Int32Array::from_iter(0..5);
763        // Cannot use from_iter_primitive as always infers nullable
764        let offsets = Buffer::from_iter(0_i32..6);
765        let list = ArrayDataBuilder::new(list_type.clone())
766            .len(5)
767            .add_buffer(offsets)
768            .add_child_data(leaf_array.to_data())
769            .build()
770            .unwrap();
771        let list = make_array(list);
772
773        let list_field = Field::new("list", list_type.clone(), false);
774        let levels = calculate_array_levels(&list, &list_field).unwrap();
775        assert_eq!(levels.len(), 1);
776
777        let expected_levels = ArrayLevels {
778            def_levels: Some(vec![1; 5]),
779            rep_levels: Some(vec![0; 5]),
780            non_null_indices: (0..5).collect(),
781            max_def_level: 1,
782            max_rep_level: 1,
783            array: Arc::new(leaf_array),
784            logical_nulls: None,
785        };
786        assert_eq!(&levels[0], &expected_levels);
787
788        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
789        // all values are defined as we do not have nulls on the root (batch)
790        // repetition:
791        //   0: 0, 1
792        //   1: 0
793        //   2: 0, 1
794        //   3: 0, 1, 1, 1
795        //   4: 0, 1, 1
796        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
797        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
798        let list = ArrayDataBuilder::new(list_type.clone())
799            .len(5)
800            .add_buffer(offsets)
801            .add_child_data(leaf_array.to_data())
802            .null_bit_buffer(Some(Buffer::from([0b00011101])))
803            .build()
804            .unwrap();
805        let list = make_array(list);
806
807        let list_field = Field::new("list", list_type, true);
808        let levels = calculate_array_levels(&list, &list_field).unwrap();
809        assert_eq!(levels.len(), 1);
810
811        let expected_levels = ArrayLevels {
812            def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
813            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
814            non_null_indices: (0..11).collect(),
815            max_def_level: 2,
816            max_rep_level: 1,
817            array: Arc::new(leaf_array),
818            logical_nulls: None,
819        };
820        assert_eq!(&levels[0], &expected_levels);
821    }
822
823    #[test]
824    fn test_calculate_array_levels_2() {
825        // If some values are null
826        //
827        // This emulates an array in the form: <struct<list<?>>
828        // with values:
829        // - 0: [0, 1], but is null because of the struct
830        // - 1: []
831        // - 2: [2, 3], but is null because of the struct
832        // - 3: [4, 5, 6, 7]
833        // - 4: [8, 9, 10]
834        //
835        // If the first values of a list are null due to a parent, we have to still account for them
836        // while indexing, because they would affect the way the child is indexed
837        // i.e. in the above example, we have to know that [0, 1] has to be skipped
838        let leaf = Int32Array::from_iter(0..11);
839        let leaf_field = Field::new("leaf", DataType::Int32, false);
840
841        let list_type = DataType::List(Arc::new(leaf_field));
842        let list = ArrayData::builder(list_type.clone())
843            .len(5)
844            .add_child_data(leaf.to_data())
845            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
846            .build()
847            .unwrap();
848
849        let list = make_array(list);
850        let list_field = Arc::new(Field::new("list", list_type, true));
851
852        let struct_array =
853            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
854        let array = Arc::new(struct_array) as ArrayRef;
855
856        let struct_field = Field::new("struct", array.data_type().clone(), true);
857
858        let levels = calculate_array_levels(&array, &struct_field).unwrap();
859        assert_eq!(levels.len(), 1);
860
861        let expected_levels = ArrayLevels {
862            def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
863            rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
864            non_null_indices: (4..11).collect(),
865            max_def_level: 3,
866            max_rep_level: 1,
867            array: Arc::new(leaf),
868            logical_nulls: None,
869        };
870
871        assert_eq!(&levels[0], &expected_levels);
872
873        // nested lists
874
875        // 0: [[100, 101], [102, 103]]
876        // 1: []
877        // 2: [[104, 105], [106, 107]]
878        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
879        // 4: [[116, 117], [118, 119], [120, 121]]
880
881        let leaf = Int32Array::from_iter(100..122);
882        let leaf_field = Field::new("leaf", DataType::Int32, true);
883
884        let l1_type = DataType::List(Arc::new(leaf_field));
885        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
886        let l1 = ArrayData::builder(l1_type.clone())
887            .len(11)
888            .add_child_data(leaf.to_data())
889            .add_buffer(offsets)
890            .build()
891            .unwrap();
892
893        let l1_field = Field::new("l1", l1_type, true);
894        let l2_type = DataType::List(Arc::new(l1_field));
895        let l2 = ArrayData::builder(l2_type)
896            .len(5)
897            .add_child_data(l1)
898            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
899            .build()
900            .unwrap();
901
902        let l2 = make_array(l2);
903        let l2_field = Field::new("l2", l2.data_type().clone(), true);
904
905        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
906        assert_eq!(levels.len(), 1);
907
908        let expected_levels = ArrayLevels {
909            def_levels: Some(vec![
910                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
911            ]),
912            rep_levels: Some(vec![
913                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
914            ]),
915            non_null_indices: (0..22).collect(),
916            max_def_level: 5,
917            max_rep_level: 2,
918            array: Arc::new(leaf),
919            logical_nulls: None,
920        };
921
922        assert_eq!(&levels[0], &expected_levels);
923    }
924
925    #[test]
926    fn test_calculate_array_levels_nested_list() {
927        let leaf_field = Field::new("leaf", DataType::Int32, false);
928        let list_type = DataType::List(Arc::new(leaf_field));
929
930        // if all array values are defined (e.g. batch<list<_>>)
931        // The array at this level looks like:
932        // 0: [a]
933        // 1: [a]
934        // 2: [a]
935        // 3: [a]
936
937        let leaf = Int32Array::from_iter([0; 4]);
938        let list = ArrayData::builder(list_type.clone())
939            .len(4)
940            .add_buffer(Buffer::from_iter(0_i32..5))
941            .add_child_data(leaf.to_data())
942            .build()
943            .unwrap();
944        let list = make_array(list);
945
946        let list_field = Field::new("list", list_type.clone(), false);
947        let levels = calculate_array_levels(&list, &list_field).unwrap();
948        assert_eq!(levels.len(), 1);
949
950        let expected_levels = ArrayLevels {
951            def_levels: Some(vec![1; 4]),
952            rep_levels: Some(vec![0; 4]),
953            non_null_indices: (0..4).collect(),
954            max_def_level: 1,
955            max_rep_level: 1,
956            array: Arc::new(leaf),
957            logical_nulls: None,
958        };
959        assert_eq!(&levels[0], &expected_levels);
960
961        // 0: null
962        // 1: [1, 2, 3]
963        // 2: [4, 5]
964        // 3: [6, 7]
965        let leaf = Int32Array::from_iter(0..8);
966        let list = ArrayData::builder(list_type.clone())
967            .len(4)
968            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
969            .null_bit_buffer(Some(Buffer::from([0b00001110])))
970            .add_child_data(leaf.to_data())
971            .build()
972            .unwrap();
973        let list = make_array(list);
974        let list_field = Arc::new(Field::new("list", list_type, true));
975
976        let struct_array = StructArray::from(vec![(list_field, list)]);
977        let array = Arc::new(struct_array) as ArrayRef;
978
979        let struct_field = Field::new("struct", array.data_type().clone(), true);
980        let levels = calculate_array_levels(&array, &struct_field).unwrap();
981        assert_eq!(levels.len(), 1);
982
983        let expected_levels = ArrayLevels {
984            def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
985            rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
986            non_null_indices: (0..7).collect(),
987            max_def_level: 3,
988            max_rep_level: 1,
989            array: Arc::new(leaf),
990            logical_nulls: None,
991        };
992        assert_eq!(&levels[0], &expected_levels);
993
994        // nested lists
995        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
996        // 0: {"struct": null }
997        // 1: {"struct": [ [201], [202, 203], [] ]}
998        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
999        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
1000
1001        let leaf = Int32Array::from_iter(201..216);
1002        let leaf_field = Field::new("leaf", DataType::Int32, false);
1003        let list_1_type = DataType::List(Arc::new(leaf_field));
1004        let list_1 = ArrayData::builder(list_1_type.clone())
1005            .len(7)
1006            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1007            .add_child_data(leaf.to_data())
1008            .build()
1009            .unwrap();
1010
1011        let list_1_field = Field::new("l1", list_1_type, true);
1012        let list_2_type = DataType::List(Arc::new(list_1_field));
1013        let list_2 = ArrayData::builder(list_2_type.clone())
1014            .len(4)
1015            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1016            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1017            .add_child_data(list_1)
1018            .build()
1019            .unwrap();
1020
1021        let list_2 = make_array(list_2);
1022        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1023
1024        let struct_array =
1025            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1026        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1027
1028        let array = Arc::new(struct_array) as ArrayRef;
1029        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1030        assert_eq!(levels.len(), 1);
1031
1032        let expected_levels = ArrayLevels {
1033            def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1034            rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1035            non_null_indices: (0..15).collect(),
1036            max_def_level: 5,
1037            max_rep_level: 2,
1038            array: Arc::new(leaf),
1039            logical_nulls: None,
1040        };
1041        assert_eq!(&levels[0], &expected_levels);
1042    }
1043
1044    #[test]
1045    fn test_calculate_nested_struct_levels() {
1046        // tests a <struct[a]<struct[b]<int[c]>>
1047        // array:
1048        //  - {a: {b: {c: 1}}}
1049        //  - {a: {b: {c: null}}}
1050        //  - {a: {b: {c: 3}}}
1051        //  - {a: {b: null}}
1052        //  - {a: null}}
1053        //  - {a: {b: {c: 6}}}
1054
1055        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1056        let leaf = Arc::new(c) as ArrayRef;
1057        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1058        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1059
1060        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1061        let a = StructArray::from((
1062            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1063            Buffer::from([0b00101111]),
1064        ));
1065
1066        let a_field = Field::new("a", a.data_type().clone(), true);
1067        let a_array = Arc::new(a) as ArrayRef;
1068
1069        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1070        assert_eq!(levels.len(), 1);
1071
1072        let logical_nulls = leaf.logical_nulls();
1073        let expected_levels = ArrayLevels {
1074            def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1075            rep_levels: None,
1076            non_null_indices: vec![0, 2, 5],
1077            max_def_level: 3,
1078            max_rep_level: 0,
1079            array: leaf,
1080            logical_nulls,
1081        };
1082        assert_eq!(&levels[0], &expected_levels);
1083    }
1084
1085    #[test]
1086    fn list_single_column() {
1087        // this tests the level generation from the arrow_writer equivalent test
1088
1089        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1090        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1091        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1092        let a_list_data = ArrayData::builder(a_list_type.clone())
1093            .len(5)
1094            .add_buffer(a_value_offsets)
1095            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1096            .add_child_data(a_values.to_data())
1097            .build()
1098            .unwrap();
1099
1100        assert_eq!(a_list_data.null_count(), 1);
1101
1102        let a = ListArray::from(a_list_data);
1103
1104        let item_field = Field::new_list_field(a_list_type, true);
1105        let mut builder = levels(&item_field, a);
1106        builder.write(2..4);
1107        let levels = builder.finish();
1108
1109        assert_eq!(levels.len(), 1);
1110
1111        let list_level = &levels[0];
1112
1113        let expected_level = ArrayLevels {
1114            def_levels: Some(vec![0, 3, 3, 3]),
1115            rep_levels: Some(vec![0, 0, 1, 1]),
1116            non_null_indices: vec![3, 4, 5],
1117            max_def_level: 3,
1118            max_rep_level: 1,
1119            array: Arc::new(a_values),
1120            logical_nulls: None,
1121        };
1122        assert_eq!(list_level, &expected_level);
1123    }
1124
1125    #[test]
1126    fn mixed_struct_list() {
1127        // this tests the level generation from the equivalent arrow_writer_complex test
1128
1129        // define schema
1130        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1131        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1132        let struct_field_g = Arc::new(Field::new(
1133            "g",
1134            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1135            false,
1136        ));
1137        let struct_field_e = Arc::new(Field::new(
1138            "e",
1139            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1140            true,
1141        ));
1142        let schema = Schema::new(vec![
1143            Field::new("a", DataType::Int32, false),
1144            Field::new("b", DataType::Int32, true),
1145            Field::new(
1146                "c",
1147                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1148                true, // https://github.com/apache/arrow-rs/issues/245
1149            ),
1150        ]);
1151
1152        // create some data
1153        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1154        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1155        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1156        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1157
1158        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1159
1160        // Construct a buffer for value offsets, for the nested array:
1161        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1162        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1163
1164        // Construct a list array from the above two
1165        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1166            .len(5)
1167            .add_buffer(g_value_offsets)
1168            .add_child_data(g_value.into_data())
1169            .build()
1170            .unwrap();
1171        let g = ListArray::from(g_list_data);
1172
1173        let e = StructArray::from(vec![
1174            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1175            (struct_field_g, Arc::new(g) as ArrayRef),
1176        ]);
1177
1178        let c = StructArray::from(vec![
1179            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1180            (struct_field_e, Arc::new(e) as ArrayRef),
1181        ]);
1182
1183        // build a record batch
1184        let batch = RecordBatch::try_new(
1185            Arc::new(schema),
1186            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1187        )
1188        .unwrap();
1189
1190        //////////////////////////////////////////////
1191        // calculate the list's level
1192        let mut levels = vec![];
1193        batch
1194            .columns()
1195            .iter()
1196            .zip(batch.schema().fields())
1197            .for_each(|(array, field)| {
1198                let mut array_levels = calculate_array_levels(array, field).unwrap();
1199                levels.append(&mut array_levels);
1200            });
1201        assert_eq!(levels.len(), 5);
1202
1203        // test "a" levels
1204        let list_level = &levels[0];
1205
1206        let expected_level = ArrayLevels {
1207            def_levels: None,
1208            rep_levels: None,
1209            non_null_indices: vec![0, 1, 2, 3, 4],
1210            max_def_level: 0,
1211            max_rep_level: 0,
1212            array: Arc::new(a),
1213            logical_nulls: None,
1214        };
1215        assert_eq!(list_level, &expected_level);
1216
1217        // test "b" levels
1218        let list_level = levels.get(1).unwrap();
1219
1220        let b_logical_nulls = b.logical_nulls();
1221        let expected_level = ArrayLevels {
1222            def_levels: Some(vec![1, 0, 0, 1, 1]),
1223            rep_levels: None,
1224            non_null_indices: vec![0, 3, 4],
1225            max_def_level: 1,
1226            max_rep_level: 0,
1227            array: Arc::new(b),
1228            logical_nulls: b_logical_nulls,
1229        };
1230        assert_eq!(list_level, &expected_level);
1231
1232        // test "d" levels
1233        let list_level = levels.get(2).unwrap();
1234
1235        let d_logical_nulls = d.logical_nulls();
1236        let expected_level = ArrayLevels {
1237            def_levels: Some(vec![1, 1, 1, 2, 1]),
1238            rep_levels: None,
1239            non_null_indices: vec![3],
1240            max_def_level: 2,
1241            max_rep_level: 0,
1242            array: Arc::new(d),
1243            logical_nulls: d_logical_nulls,
1244        };
1245        assert_eq!(list_level, &expected_level);
1246
1247        // test "f" levels
1248        let list_level = levels.get(3).unwrap();
1249
1250        let f_logical_nulls = f.logical_nulls();
1251        let expected_level = ArrayLevels {
1252            def_levels: Some(vec![3, 2, 3, 2, 3]),
1253            rep_levels: None,
1254            non_null_indices: vec![0, 2, 4],
1255            max_def_level: 3,
1256            max_rep_level: 0,
1257            array: Arc::new(f),
1258            logical_nulls: f_logical_nulls,
1259        };
1260        assert_eq!(list_level, &expected_level);
1261    }
1262
1263    #[test]
1264    fn test_null_vs_nonnull_struct() {
1265        // define schema
1266        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1267        let schema = Schema::new(vec![Field::new(
1268            "some_nested_object",
1269            DataType::Struct(vec![offset_field.clone()].into()),
1270            false,
1271        )]);
1272
1273        // create some data
1274        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1275
1276        let some_nested_object =
1277            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1278
1279        // build a record batch
1280        let batch =
1281            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1282
1283        let struct_null_level =
1284            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1285
1286        // create second batch
1287        // define schema
1288        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1289        let schema = Schema::new(vec![Field::new(
1290            "some_nested_object",
1291            DataType::Struct(vec![offset_field.clone()].into()),
1292            true,
1293        )]);
1294
1295        // create some data
1296        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1297
1298        let some_nested_object =
1299            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1300
1301        // build a record batch
1302        let batch =
1303            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1304
1305        let struct_non_null_level =
1306            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1307
1308        // The 2 levels should not be the same
1309        if struct_non_null_level == struct_null_level {
1310            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1311        }
1312    }
1313
1314    #[test]
1315    fn test_map_array() {
1316        // Note: we are using the JSON Arrow reader for brevity
1317        let json_content = r#"
1318        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1319        {"stocks":{"long": "$CCC", "short": null}}
1320        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1321        "#;
1322        let entries_struct_type = DataType::Struct(Fields::from(vec![
1323            Field::new("key", DataType::Utf8, false),
1324            Field::new("value", DataType::Utf8, true),
1325        ]));
1326        let stocks_field = Field::new(
1327            "stocks",
1328            DataType::Map(
1329                Arc::new(Field::new("entries", entries_struct_type, false)),
1330                false,
1331            ),
1332            // not nullable, so the keys have max level = 1
1333            false,
1334        );
1335        let schema = Arc::new(Schema::new(vec![stocks_field]));
1336        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1337        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1338
1339        let batch = reader.next().unwrap().unwrap();
1340
1341        // calculate the map's level
1342        let mut levels = vec![];
1343        batch
1344            .columns()
1345            .iter()
1346            .zip(batch.schema().fields())
1347            .for_each(|(array, field)| {
1348                let mut array_levels = calculate_array_levels(array, field).unwrap();
1349                levels.append(&mut array_levels);
1350            });
1351        assert_eq!(levels.len(), 2);
1352
1353        let map = batch.column(0).as_map();
1354        let map_keys_logical_nulls = map.keys().logical_nulls();
1355
1356        // test key levels
1357        let list_level = &levels[0];
1358
1359        let expected_level = ArrayLevels {
1360            def_levels: Some(vec![1; 7]),
1361            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1362            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1363            max_def_level: 1,
1364            max_rep_level: 1,
1365            array: map.keys().clone(),
1366            logical_nulls: map_keys_logical_nulls,
1367        };
1368        assert_eq!(list_level, &expected_level);
1369
1370        // test values levels
1371        let list_level = levels.get(1).unwrap();
1372        let map_values_logical_nulls = map.values().logical_nulls();
1373
1374        let expected_level = ArrayLevels {
1375            def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1376            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1377            non_null_indices: vec![0, 1, 2, 4, 6],
1378            max_def_level: 2,
1379            max_rep_level: 1,
1380            array: map.values().clone(),
1381            logical_nulls: map_values_logical_nulls,
1382        };
1383        assert_eq!(list_level, &expected_level);
1384    }
1385
1386    #[test]
1387    fn test_list_of_struct() {
1388        // define schema
1389        let int_field = Field::new("a", DataType::Int32, true);
1390        let fields = Fields::from([Arc::new(int_field)]);
1391        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1392        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1393
1394        let int_builder = Int32Builder::with_capacity(10);
1395        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1396        let mut list_builder = ListBuilder::new(struct_builder);
1397
1398        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
1399        //
1400        // [{a: 1}]
1401        let values = list_builder.values();
1402        values
1403            .field_builder::<Int32Builder>(0)
1404            .unwrap()
1405            .append_value(1);
1406        values.append(true);
1407        list_builder.append(true);
1408
1409        // []
1410        list_builder.append(true);
1411
1412        // null
1413        list_builder.append(false);
1414
1415        // [null, null]
1416        let values = list_builder.values();
1417        values
1418            .field_builder::<Int32Builder>(0)
1419            .unwrap()
1420            .append_null();
1421        values.append(false);
1422        values
1423            .field_builder::<Int32Builder>(0)
1424            .unwrap()
1425            .append_null();
1426        values.append(false);
1427        list_builder.append(true);
1428
1429        // [{a: null}]
1430        let values = list_builder.values();
1431        values
1432            .field_builder::<Int32Builder>(0)
1433            .unwrap()
1434            .append_null();
1435        values.append(true);
1436        list_builder.append(true);
1437
1438        // [{a: 2}]
1439        let values = list_builder.values();
1440        values
1441            .field_builder::<Int32Builder>(0)
1442            .unwrap()
1443            .append_value(2);
1444        values.append(true);
1445        list_builder.append(true);
1446
1447        let array = Arc::new(list_builder.finish());
1448
1449        let values = array.values().as_struct().column(0).clone();
1450        let values_len = values.len();
1451        assert_eq!(values_len, 5);
1452
1453        let schema = Arc::new(Schema::new(vec![list_field]));
1454
1455        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1456
1457        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1458        let list_level = &levels[0];
1459
1460        let logical_nulls = values.logical_nulls();
1461        let expected_level = ArrayLevels {
1462            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1463            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1464            non_null_indices: vec![0, 4],
1465            max_def_level: 4,
1466            max_rep_level: 1,
1467            array: values,
1468            logical_nulls,
1469        };
1470
1471        assert_eq!(list_level, &expected_level);
1472    }
1473
1474    #[test]
1475    fn test_struct_mask_list() {
1476        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
1477        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1478            Some(vec![Some(1), Some(2)]),
1479            Some(vec![None]),
1480            Some(vec![]),
1481            Some(vec![Some(3), None]), // Masked by struct array
1482            Some(vec![Some(4), Some(5)]),
1483            None, // Masked by struct array
1484            None,
1485        ]);
1486        let values = inner.values().clone();
1487
1488        // This test assumes that nulls don't take up space
1489        assert_eq!(inner.values().len(), 7);
1490
1491        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1492        let array = Arc::new(inner) as ArrayRef;
1493        let nulls = Buffer::from([0b01010111]);
1494        let struct_a = StructArray::from((vec![(field, array)], nulls));
1495
1496        let field = Field::new("struct", struct_a.data_type().clone(), true);
1497        let array = Arc::new(struct_a) as ArrayRef;
1498        let levels = calculate_array_levels(&array, &field).unwrap();
1499
1500        assert_eq!(levels.len(), 1);
1501
1502        let logical_nulls = values.logical_nulls();
1503        let expected_level = ArrayLevels {
1504            def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1505            rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1506            non_null_indices: vec![0, 1, 5, 6],
1507            max_def_level: 4,
1508            max_rep_level: 1,
1509            array: values,
1510            logical_nulls,
1511        };
1512
1513        assert_eq!(&levels[0], &expected_level);
1514    }
1515
1516    #[test]
1517    fn test_list_mask_struct() {
1518        // Test the null mask of a struct array and the null mask of a list array
1519        // masking out non-null elements of their children
1520
1521        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1522            Some(vec![None]), // Masked by list array
1523            Some(vec![]),     // Masked by list array
1524            Some(vec![Some(3), None]),
1525            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
1526            None,
1527            None,
1528        ]);
1529        let a1_values = a1.values().clone();
1530        let a1 = Arc::new(a1) as ArrayRef;
1531
1532        let a2 = Arc::new(Int32Array::from_iter(vec![
1533            Some(1), // Masked by list array
1534            Some(2), // Masked by list array
1535            None,
1536            Some(4), // Masked by struct array
1537            Some(5),
1538            None,
1539        ])) as ArrayRef;
1540        let a2_values = a2.clone();
1541
1542        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1543        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1544
1545        let nulls = Buffer::from([0b00110111]);
1546        let struct_a = Arc::new(StructArray::from((
1547            vec![(field_a1, a1), (field_a2, a2)],
1548            nulls,
1549        ))) as ArrayRef;
1550
1551        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1552        let nulls = Buffer::from([0b00111100]);
1553
1554        let list_type = DataType::List(Arc::new(Field::new(
1555            "struct",
1556            struct_a.data_type().clone(),
1557            true,
1558        )));
1559
1560        let data = ArrayDataBuilder::new(list_type.clone())
1561            .len(6)
1562            .null_bit_buffer(Some(nulls))
1563            .add_buffer(offsets)
1564            .add_child_data(struct_a.into_data())
1565            .build()
1566            .unwrap();
1567
1568        let list = make_array(data);
1569        let list_field = Field::new("col", list_type, true);
1570
1571        let expected = vec![
1572            r#""#.to_string(),
1573            r#""#.to_string(),
1574            r#"[]"#.to_string(),
1575            r#"[{list: [3, ], integers: }]"#.to_string(),
1576            r#"[, {list: , integers: 5}]"#.to_string(),
1577            r#"[]"#.to_string(),
1578        ];
1579
1580        let actual: Vec<_> = (0..6)
1581            .map(|x| array_value_to_string(&list, x).unwrap())
1582            .collect();
1583        assert_eq!(actual, expected);
1584
1585        let levels = calculate_array_levels(&list, &list_field).unwrap();
1586
1587        assert_eq!(levels.len(), 2);
1588
1589        let a1_logical_nulls = a1_values.logical_nulls();
1590        let expected_level = ArrayLevels {
1591            def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1592            rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1593            non_null_indices: vec![1],
1594            max_def_level: 6,
1595            max_rep_level: 2,
1596            array: a1_values,
1597            logical_nulls: a1_logical_nulls,
1598        };
1599
1600        assert_eq!(&levels[0], &expected_level);
1601
1602        let a2_logical_nulls = a2_values.logical_nulls();
1603        let expected_level = ArrayLevels {
1604            def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1605            rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1606            non_null_indices: vec![4],
1607            max_def_level: 4,
1608            max_rep_level: 1,
1609            array: a2_values,
1610            logical_nulls: a2_logical_nulls,
1611        };
1612
1613        assert_eq!(&levels[1], &expected_level);
1614    }
1615
1616    #[test]
1617    fn test_fixed_size_list() {
1618        // [[1, 2], null, null, [7, 8], null]
1619        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1620        builder.values().append_slice(&[1, 2]);
1621        builder.append(true);
1622        builder.values().append_slice(&[3, 4]);
1623        builder.append(false);
1624        builder.values().append_slice(&[5, 6]);
1625        builder.append(false);
1626        builder.values().append_slice(&[7, 8]);
1627        builder.append(true);
1628        builder.values().append_slice(&[9, 10]);
1629        builder.append(false);
1630        let a = builder.finish();
1631        let values = a.values().clone();
1632
1633        let item_field = Field::new_list_field(a.data_type().clone(), true);
1634        let mut builder = levels(&item_field, a);
1635        builder.write(1..4);
1636        let levels = builder.finish();
1637
1638        assert_eq!(levels.len(), 1);
1639
1640        let list_level = &levels[0];
1641
1642        let logical_nulls = values.logical_nulls();
1643        let expected_level = ArrayLevels {
1644            def_levels: Some(vec![0, 0, 3, 3]),
1645            rep_levels: Some(vec![0, 0, 0, 1]),
1646            non_null_indices: vec![6, 7],
1647            max_def_level: 3,
1648            max_rep_level: 1,
1649            array: values,
1650            logical_nulls,
1651        };
1652        assert_eq!(list_level, &expected_level);
1653    }
1654
1655    #[test]
1656    fn test_fixed_size_list_of_struct() {
1657        // define schema
1658        let field_a = Field::new("a", DataType::Int32, true);
1659        let field_b = Field::new("b", DataType::Int64, false);
1660        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1661        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1662        let list_field = Field::new(
1663            "list",
1664            DataType::FixedSizeList(Arc::new(item_field), 2),
1665            true,
1666        );
1667
1668        let builder_a = Int32Builder::with_capacity(10);
1669        let builder_b = Int64Builder::with_capacity(10);
1670        let struct_builder =
1671            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1672        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1673
1674        // [
1675        //   [{a: 1, b: 2}, null],
1676        //   null,
1677        //   [null, null],
1678        //   [{a: null, b: 3}, {a: 2, b: 4}]
1679        // ]
1680
1681        // [{a: 1, b: 2}, null]
1682        let values = list_builder.values();
1683        // {a: 1, b: 2}
1684        values
1685            .field_builder::<Int32Builder>(0)
1686            .unwrap()
1687            .append_value(1);
1688        values
1689            .field_builder::<Int64Builder>(1)
1690            .unwrap()
1691            .append_value(2);
1692        values.append(true);
1693        // null
1694        values
1695            .field_builder::<Int32Builder>(0)
1696            .unwrap()
1697            .append_null();
1698        values
1699            .field_builder::<Int64Builder>(1)
1700            .unwrap()
1701            .append_value(0);
1702        values.append(false);
1703        list_builder.append(true);
1704
1705        // null
1706        let values = list_builder.values();
1707        // null
1708        values
1709            .field_builder::<Int32Builder>(0)
1710            .unwrap()
1711            .append_null();
1712        values
1713            .field_builder::<Int64Builder>(1)
1714            .unwrap()
1715            .append_value(0);
1716        values.append(false);
1717        // null
1718        values
1719            .field_builder::<Int32Builder>(0)
1720            .unwrap()
1721            .append_null();
1722        values
1723            .field_builder::<Int64Builder>(1)
1724            .unwrap()
1725            .append_value(0);
1726        values.append(false);
1727        list_builder.append(false);
1728
1729        // [null, null]
1730        let values = list_builder.values();
1731        // null
1732        values
1733            .field_builder::<Int32Builder>(0)
1734            .unwrap()
1735            .append_null();
1736        values
1737            .field_builder::<Int64Builder>(1)
1738            .unwrap()
1739            .append_value(0);
1740        values.append(false);
1741        // null
1742        values
1743            .field_builder::<Int32Builder>(0)
1744            .unwrap()
1745            .append_null();
1746        values
1747            .field_builder::<Int64Builder>(1)
1748            .unwrap()
1749            .append_value(0);
1750        values.append(false);
1751        list_builder.append(true);
1752
1753        // [{a: null, b: 3}, {a: 2, b: 4}]
1754        let values = list_builder.values();
1755        // {a: null, b: 3}
1756        values
1757            .field_builder::<Int32Builder>(0)
1758            .unwrap()
1759            .append_null();
1760        values
1761            .field_builder::<Int64Builder>(1)
1762            .unwrap()
1763            .append_value(3);
1764        values.append(true);
1765        // {a: 2, b: 4}
1766        values
1767            .field_builder::<Int32Builder>(0)
1768            .unwrap()
1769            .append_value(2);
1770        values
1771            .field_builder::<Int64Builder>(1)
1772            .unwrap()
1773            .append_value(4);
1774        values.append(true);
1775        list_builder.append(true);
1776
1777        let array = Arc::new(list_builder.finish());
1778
1779        assert_eq!(array.values().len(), 8);
1780        assert_eq!(array.len(), 4);
1781
1782        let struct_values = array.values().as_struct();
1783        let values_a = struct_values.column(0).clone();
1784        let values_b = struct_values.column(1).clone();
1785
1786        let schema = Arc::new(Schema::new(vec![list_field]));
1787        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1788
1789        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1790        let a_levels = &levels[0];
1791        let b_levels = &levels[1];
1792
1793        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
1794        let values_a_logical_nulls = values_a.logical_nulls();
1795        let expected_a = ArrayLevels {
1796            def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1797            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1798            non_null_indices: vec![0, 7],
1799            max_def_level: 4,
1800            max_rep_level: 1,
1801            array: values_a,
1802            logical_nulls: values_a_logical_nulls,
1803        };
1804        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
1805        let values_b_logical_nulls = values_b.logical_nulls();
1806        let expected_b = ArrayLevels {
1807            def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
1808            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1809            non_null_indices: vec![0, 6, 7],
1810            max_def_level: 3,
1811            max_rep_level: 1,
1812            array: values_b,
1813            logical_nulls: values_b_logical_nulls,
1814        };
1815
1816        assert_eq!(a_levels, &expected_a);
1817        assert_eq!(b_levels, &expected_b);
1818    }
1819
1820    #[test]
1821    fn test_fixed_size_list_empty() {
1822        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
1823        builder.append(true);
1824        builder.append(false);
1825        builder.append(true);
1826        let array = builder.finish();
1827        let values = array.values().clone();
1828
1829        let item_field = Field::new_list_field(array.data_type().clone(), true);
1830        let mut builder = levels(&item_field, array);
1831        builder.write(0..3);
1832        let levels = builder.finish();
1833
1834        assert_eq!(levels.len(), 1);
1835
1836        let list_level = &levels[0];
1837
1838        let logical_nulls = values.logical_nulls();
1839        let expected_level = ArrayLevels {
1840            def_levels: Some(vec![1, 0, 1]),
1841            rep_levels: Some(vec![0, 0, 0]),
1842            non_null_indices: vec![],
1843            max_def_level: 3,
1844            max_rep_level: 1,
1845            array: values,
1846            logical_nulls,
1847        };
1848        assert_eq!(list_level, &expected_level);
1849    }
1850
1851    #[test]
1852    fn test_fixed_size_list_of_var_lists() {
1853        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
1854        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
1855        builder.values().append_value([Some(1), None, Some(3)]);
1856        builder.values().append_null();
1857        builder.append(true);
1858        builder.values().append_value([Some(4)]);
1859        builder.values().append_value([]);
1860        builder.append(true);
1861        builder.values().append_value([Some(5), Some(6)]);
1862        builder.values().append_value([None, None]);
1863        builder.append(true);
1864        builder.values().append_null();
1865        builder.values().append_null();
1866        builder.append(false);
1867        let a = builder.finish();
1868        let values = a.values().as_list::<i32>().values().clone();
1869
1870        let item_field = Field::new_list_field(a.data_type().clone(), true);
1871        let mut builder = levels(&item_field, a);
1872        builder.write(0..4);
1873        let levels = builder.finish();
1874
1875        let logical_nulls = values.logical_nulls();
1876        let expected_level = ArrayLevels {
1877            def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
1878            rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
1879            non_null_indices: vec![0, 2, 3, 4, 5],
1880            max_def_level: 5,
1881            max_rep_level: 2,
1882            array: values,
1883            logical_nulls,
1884        };
1885
1886        assert_eq!(levels[0], expected_level);
1887    }
1888
1889    #[test]
1890    fn test_null_dictionary_values() {
1891        let values = Int32Array::new(
1892            vec![1, 2, 3, 4].into(),
1893            Some(NullBuffer::from(vec![true, false, true, true])),
1894        );
1895        let keys = Int32Array::new(
1896            vec![1, 54, 2, 0].into(),
1897            Some(NullBuffer::from(vec![true, false, true, true])),
1898        );
1899        // [NULL, NULL, 3, 0]
1900        let dict = DictionaryArray::new(keys, Arc::new(values));
1901
1902        let item_field = Field::new_list_field(dict.data_type().clone(), true);
1903
1904        let mut builder = levels(&item_field, dict.clone());
1905        builder.write(0..4);
1906        let levels = builder.finish();
1907
1908        let logical_nulls = dict.logical_nulls();
1909        let expected_level = ArrayLevels {
1910            def_levels: Some(vec![0, 0, 1, 1]),
1911            rep_levels: None,
1912            non_null_indices: vec![2, 3],
1913            max_def_level: 1,
1914            max_rep_level: 0,
1915            array: Arc::new(dict),
1916            logical_nulls,
1917        };
1918        assert_eq!(levels[0], expected_level);
1919    }
1920
1921    #[test]
1922    fn mismatched_types() {
1923        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1924        let field = Field::new_list_field(DataType::Float64, false);
1925
1926        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
1927            .unwrap_err()
1928            .to_string();
1929
1930        assert_eq!(
1931            err,
1932            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
1933        );
1934    }
1935
1936    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
1937        let v = Arc::new(array) as ArrayRef;
1938        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
1939    }
1940}