parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::errors::{ParquetError, Result};
44use arrow_array::cast::AsArray;
45use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
46use arrow_buffer::{NullBuffer, OffsetBuffer};
47use arrow_schema::{DataType, Field};
48use std::ops::Range;
49use std::sync::Arc;
50
51/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
52/// for each leaf column encountered
53pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
54    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
55    builder.write(0..array.len());
56    Ok(builder.finish())
57}
58
59/// Returns true if the DataType can be represented as a primitive parquet column,
60/// i.e. a leaf array with no children
61fn is_leaf(data_type: &DataType) -> bool {
62    matches!(
63        data_type,
64        DataType::Null
65            | DataType::Boolean
66            | DataType::Int8
67            | DataType::Int16
68            | DataType::Int32
69            | DataType::Int64
70            | DataType::UInt8
71            | DataType::UInt16
72            | DataType::UInt32
73            | DataType::UInt64
74            | DataType::Float16
75            | DataType::Float32
76            | DataType::Float64
77            | DataType::Utf8
78            | DataType::Utf8View
79            | DataType::LargeUtf8
80            | DataType::Timestamp(_, _)
81            | DataType::Date32
82            | DataType::Date64
83            | DataType::Time32(_)
84            | DataType::Time64(_)
85            | DataType::Duration(_)
86            | DataType::Interval(_)
87            | DataType::Binary
88            | DataType::LargeBinary
89            | DataType::BinaryView
90            | DataType::Decimal128(_, _)
91            | DataType::Decimal256(_, _)
92            | DataType::FixedSizeBinary(_)
93    )
94}
95
96/// The definition and repetition level of an array within a potentially nested hierarchy
97#[derive(Debug, Default, Clone, Copy)]
98struct LevelContext {
99    /// The current repetition level
100    rep_level: i16,
101    /// The current definition level
102    def_level: i16,
103}
104
105/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
106#[derive(Debug)]
107enum LevelInfoBuilder {
108    /// A primitive, leaf array
109    Primitive(ArrayLevels),
110    /// A list array
111    List(
112        Box<LevelInfoBuilder>, // Child Values
113        LevelContext,          // Context
114        OffsetBuffer<i32>,     // Offsets
115        Option<NullBuffer>,    // Nulls
116    ),
117    /// A large list array
118    LargeList(
119        Box<LevelInfoBuilder>, // Child Values
120        LevelContext,          // Context
121        OffsetBuffer<i64>,     // Offsets
122        Option<NullBuffer>,    // Nulls
123    ),
124    /// A fixed size list array
125    FixedSizeList(
126        Box<LevelInfoBuilder>, // Values
127        LevelContext,          // Context
128        usize,                 // List Size
129        Option<NullBuffer>,    // Nulls
130    ),
131    /// A struct array
132    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
133}
134
135impl LevelInfoBuilder {
136    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
137    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
138        if field.data_type() != array.data_type() {
139            return Err(arrow_err!(format!(
140                "Incompatible type. Field '{}' has type {}, array has type {}",
141                field.name(),
142                field.data_type(),
143                array.data_type(),
144            )));
145        }
146
147        let is_nullable = field.is_nullable();
148
149        match array.data_type() {
150            d if is_leaf(d) => {
151                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
152                Ok(Self::Primitive(levels))
153            }
154            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
155                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
156                Ok(Self::Primitive(levels))
157            }
158            DataType::Struct(children) => {
159                let array = array.as_struct();
160                let def_level = match is_nullable {
161                    true => parent_ctx.def_level + 1,
162                    false => parent_ctx.def_level,
163                };
164
165                let ctx = LevelContext {
166                    rep_level: parent_ctx.rep_level,
167                    def_level,
168                };
169
170                let children = children
171                    .iter()
172                    .zip(array.columns())
173                    .map(|(f, a)| Self::try_new(f, ctx, a))
174                    .collect::<Result<_>>()?;
175
176                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
177            }
178            DataType::List(child)
179            | DataType::LargeList(child)
180            | DataType::Map(child, _)
181            | DataType::FixedSizeList(child, _) => {
182                let def_level = match is_nullable {
183                    true => parent_ctx.def_level + 2,
184                    false => parent_ctx.def_level + 1,
185                };
186
187                let ctx = LevelContext {
188                    rep_level: parent_ctx.rep_level + 1,
189                    def_level,
190                };
191
192                Ok(match field.data_type() {
193                    DataType::List(_) => {
194                        let list = array.as_list();
195                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
196                        let offsets = list.offsets().clone();
197                        Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
198                    }
199                    DataType::LargeList(_) => {
200                        let list = array.as_list();
201                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
202                        let offsets = list.offsets().clone();
203                        let nulls = list.nulls().cloned();
204                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
205                    }
206                    DataType::Map(_, _) => {
207                        let map = array.as_map();
208                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
209                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
210                        let offsets = map.offsets().clone();
211                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
212                    }
213                    DataType::FixedSizeList(_, size) => {
214                        let list = array.as_fixed_size_list();
215                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
216                        let nulls = list.nulls().cloned();
217                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
218                    }
219                    _ => unreachable!(),
220                })
221            }
222            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
223        }
224    }
225
226    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
227    /// as enumerated by a depth-first search
228    fn finish(self) -> Vec<ArrayLevels> {
229        match self {
230            LevelInfoBuilder::Primitive(v) => vec![v],
231            LevelInfoBuilder::List(v, _, _, _)
232            | LevelInfoBuilder::LargeList(v, _, _, _)
233            | LevelInfoBuilder::FixedSizeList(v, _, _, _) => v.finish(),
234            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
235        }
236    }
237
238    /// Given an `array`, write the level data for the elements in `range`
239    fn write(&mut self, range: Range<usize>) {
240        match self {
241            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
242            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
243                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
244            }
245            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
246                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
247            }
248            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
249                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
250            }
251            LevelInfoBuilder::Struct(children, ctx, nulls) => {
252                Self::write_struct(children, ctx, nulls.as_ref(), range)
253            }
254        }
255    }
256
257    /// Write `range` elements from ListArray `array`
258    ///
259    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
260    fn write_list<O: OffsetSizeTrait>(
261        child: &mut LevelInfoBuilder,
262        ctx: &LevelContext,
263        offsets: &[O],
264        nulls: Option<&NullBuffer>,
265        range: Range<usize>,
266    ) {
267        let offsets = &offsets[range.start..range.end + 1];
268
269        let write_non_null_slice =
270            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
271                child.write(start_idx..end_idx);
272                child.visit_leaves(|leaf| {
273                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
274                    let mut rev = rep_levels.iter_mut().rev();
275                    let mut remaining = end_idx - start_idx;
276
277                    loop {
278                        let next = rev.next().unwrap();
279                        if *next > ctx.rep_level {
280                            // Nested element - ignore
281                            continue;
282                        }
283
284                        remaining -= 1;
285                        if remaining == 0 {
286                            *next = ctx.rep_level - 1;
287                            break;
288                        }
289                    }
290                })
291            };
292
293        let write_empty_slice = |child: &mut LevelInfoBuilder| {
294            child.visit_leaves(|leaf| {
295                let rep_levels = leaf.rep_levels.as_mut().unwrap();
296                rep_levels.push(ctx.rep_level - 1);
297                let def_levels = leaf.def_levels.as_mut().unwrap();
298                def_levels.push(ctx.def_level - 1);
299            })
300        };
301
302        let write_null_slice = |child: &mut LevelInfoBuilder| {
303            child.visit_leaves(|leaf| {
304                let rep_levels = leaf.rep_levels.as_mut().unwrap();
305                rep_levels.push(ctx.rep_level - 1);
306                let def_levels = leaf.def_levels.as_mut().unwrap();
307                def_levels.push(ctx.def_level - 2);
308            })
309        };
310
311        match nulls {
312            Some(nulls) => {
313                let null_offset = range.start;
314                // TODO: Faster bitmask iteration (#1757)
315                for (idx, w) in offsets.windows(2).enumerate() {
316                    let is_valid = nulls.is_valid(idx + null_offset);
317                    let start_idx = w[0].as_usize();
318                    let end_idx = w[1].as_usize();
319                    if !is_valid {
320                        write_null_slice(child)
321                    } else if start_idx == end_idx {
322                        write_empty_slice(child)
323                    } else {
324                        write_non_null_slice(child, start_idx, end_idx)
325                    }
326                }
327            }
328            None => {
329                for w in offsets.windows(2) {
330                    let start_idx = w[0].as_usize();
331                    let end_idx = w[1].as_usize();
332                    if start_idx == end_idx {
333                        write_empty_slice(child)
334                    } else {
335                        write_non_null_slice(child, start_idx, end_idx)
336                    }
337                }
338            }
339        }
340    }
341
342    /// Write `range` elements from StructArray `array`
343    fn write_struct(
344        children: &mut [LevelInfoBuilder],
345        ctx: &LevelContext,
346        nulls: Option<&NullBuffer>,
347        range: Range<usize>,
348    ) {
349        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
350            for child in children {
351                child.visit_leaves(|info| {
352                    let len = range.end - range.start;
353
354                    let def_levels = info.def_levels.as_mut().unwrap();
355                    def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
356
357                    if let Some(rep_levels) = info.rep_levels.as_mut() {
358                        rep_levels.extend(std::iter::repeat(ctx.rep_level).take(len));
359                    }
360                })
361            }
362        };
363
364        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
365            for child in children {
366                child.write(range.clone())
367            }
368        };
369
370        match nulls {
371            Some(validity) => {
372                let mut last_non_null_idx = None;
373                let mut last_null_idx = None;
374
375                // TODO: Faster bitmask iteration (#1757)
376                for i in range.clone() {
377                    match validity.is_valid(i) {
378                        true => {
379                            if let Some(last_idx) = last_null_idx.take() {
380                                write_null(children, last_idx..i)
381                            }
382                            last_non_null_idx.get_or_insert(i);
383                        }
384                        false => {
385                            if let Some(last_idx) = last_non_null_idx.take() {
386                                write_non_null(children, last_idx..i)
387                            }
388                            last_null_idx.get_or_insert(i);
389                        }
390                    }
391                }
392
393                if let Some(last_idx) = last_null_idx.take() {
394                    write_null(children, last_idx..range.end)
395                }
396
397                if let Some(last_idx) = last_non_null_idx.take() {
398                    write_non_null(children, last_idx..range.end)
399                }
400            }
401            None => write_non_null(children, range),
402        }
403    }
404
405    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
406    fn write_fixed_size_list(
407        child: &mut LevelInfoBuilder,
408        ctx: &LevelContext,
409        fixed_size: usize,
410        nulls: Option<&NullBuffer>,
411        range: Range<usize>,
412    ) {
413        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
414            let values_start = start_idx * fixed_size;
415            let values_end = end_idx * fixed_size;
416            child.write(values_start..values_end);
417
418            child.visit_leaves(|leaf| {
419                let rep_levels = leaf.rep_levels.as_mut().unwrap();
420
421                let row_indices = (0..fixed_size)
422                    .rev()
423                    .cycle()
424                    .take(values_end - values_start);
425
426                // Step backward over the child rep levels and mark the start of each list
427                rep_levels
428                    .iter_mut()
429                    .rev()
430                    // Filter out reps from nested children
431                    .filter(|&&mut r| r == ctx.rep_level)
432                    .zip(row_indices)
433                    .for_each(|(r, idx)| {
434                        if idx == 0 {
435                            *r = ctx.rep_level - 1;
436                        }
437                    });
438            })
439        };
440
441        // If list size is 0, ignore values and just write rep/def levels.
442        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
443            let len = end_idx - start_idx;
444            child.visit_leaves(|leaf| {
445                let rep_levels = leaf.rep_levels.as_mut().unwrap();
446                rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
447                let def_levels = leaf.def_levels.as_mut().unwrap();
448                def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
449            })
450        };
451
452        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
453            if fixed_size > 0 {
454                write_non_null(child, start_idx, end_idx)
455            } else {
456                write_empty(child, start_idx, end_idx)
457            }
458        };
459
460        match nulls {
461            Some(nulls) => {
462                let mut start_idx = None;
463                for idx in range.clone() {
464                    if nulls.is_valid(idx) {
465                        // Start a run of valid rows if not already inside of one
466                        start_idx.get_or_insert(idx);
467                    } else {
468                        // Write out any pending valid rows
469                        if let Some(start) = start_idx.take() {
470                            write_rows(child, start, idx);
471                        }
472                        // Add null row
473                        child.visit_leaves(|leaf| {
474                            let rep_levels = leaf.rep_levels.as_mut().unwrap();
475                            rep_levels.push(ctx.rep_level - 1);
476                            let def_levels = leaf.def_levels.as_mut().unwrap();
477                            def_levels.push(ctx.def_level - 2);
478                        })
479                    }
480                }
481                // Write out any remaining valid rows
482                if let Some(start) = start_idx.take() {
483                    write_rows(child, start, range.end);
484                }
485            }
486            // If all rows are valid then write the whole array
487            None => write_rows(child, range.start, range.end),
488        }
489    }
490
491    /// Write a primitive array, as defined by [`is_leaf`]
492    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
493        let len = range.end - range.start;
494
495        match &mut info.def_levels {
496            Some(def_levels) => {
497                def_levels.reserve(len);
498                info.non_null_indices.reserve(len);
499
500                match info.array.logical_nulls() {
501                    Some(nulls) => {
502                        // TODO: Faster bitmask iteration (#1757)
503                        for i in range {
504                            match nulls.is_valid(i) {
505                                true => {
506                                    def_levels.push(info.max_def_level);
507                                    info.non_null_indices.push(i)
508                                }
509                                false => def_levels.push(info.max_def_level - 1),
510                            }
511                        }
512                    }
513                    None => {
514                        let iter = std::iter::repeat(info.max_def_level).take(len);
515                        def_levels.extend(iter);
516                        info.non_null_indices.extend(range);
517                    }
518                }
519            }
520            None => info.non_null_indices.extend(range),
521        }
522
523        if let Some(rep_levels) = &mut info.rep_levels {
524            rep_levels.extend(std::iter::repeat(info.max_rep_level).take(len))
525        }
526    }
527
528    /// Visits all children of this node in depth first order
529    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
530        match self {
531            LevelInfoBuilder::Primitive(info) => visit(info),
532            LevelInfoBuilder::List(c, _, _, _)
533            | LevelInfoBuilder::LargeList(c, _, _, _)
534            | LevelInfoBuilder::FixedSizeList(c, _, _, _) => c.visit_leaves(visit),
535            LevelInfoBuilder::Struct(children, _, _) => {
536                for c in children {
537                    c.visit_leaves(visit)
538                }
539            }
540        }
541    }
542}
543/// The data necessary to write a primitive Arrow array to parquet, taking into account
544/// any non-primitive parents it may have in the arrow representation
545#[derive(Debug, Clone)]
546pub(crate) struct ArrayLevels {
547    /// Array's definition levels
548    ///
549    /// Present if `max_def_level != 0`
550    def_levels: Option<Vec<i16>>,
551
552    /// Array's optional repetition levels
553    ///
554    /// Present if `max_rep_level != 0`
555    rep_levels: Option<Vec<i16>>,
556
557    /// The corresponding array identifying non-null slices of data
558    /// from the primitive array
559    non_null_indices: Vec<usize>,
560
561    /// The maximum definition level for this leaf column
562    max_def_level: i16,
563
564    /// The maximum repetition for this leaf column
565    max_rep_level: i16,
566
567    /// The arrow array
568    array: ArrayRef,
569}
570
571impl PartialEq for ArrayLevels {
572    fn eq(&self, other: &Self) -> bool {
573        self.def_levels == other.def_levels
574            && self.rep_levels == other.rep_levels
575            && self.non_null_indices == other.non_null_indices
576            && self.max_def_level == other.max_def_level
577            && self.max_rep_level == other.max_rep_level
578            && self.array.as_ref() == other.array.as_ref()
579    }
580}
581impl Eq for ArrayLevels {}
582
583impl ArrayLevels {
584    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
585        let max_rep_level = ctx.rep_level;
586        let max_def_level = match is_nullable {
587            true => ctx.def_level + 1,
588            false => ctx.def_level,
589        };
590
591        Self {
592            def_levels: (max_def_level != 0).then(Vec::new),
593            rep_levels: (max_rep_level != 0).then(Vec::new),
594            non_null_indices: vec![],
595            max_def_level,
596            max_rep_level,
597            array,
598        }
599    }
600
601    pub fn array(&self) -> &ArrayRef {
602        &self.array
603    }
604
605    pub fn def_levels(&self) -> Option<&[i16]> {
606        self.def_levels.as_deref()
607    }
608
609    pub fn rep_levels(&self) -> Option<&[i16]> {
610        self.rep_levels.as_deref()
611    }
612
613    pub fn non_null_indices(&self) -> &[usize] {
614        &self.non_null_indices
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    use arrow_array::builder::*;
623    use arrow_array::types::Int32Type;
624    use arrow_array::*;
625    use arrow_buffer::{Buffer, ToByteSlice};
626    use arrow_cast::display::array_value_to_string;
627    use arrow_data::{ArrayData, ArrayDataBuilder};
628    use arrow_schema::{Fields, Schema};
629
630    #[test]
631    fn test_calculate_array_levels_twitter_example() {
632        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
633        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
634
635        let leaf_type = Field::new_list_field(DataType::Int32, false);
636        let inner_type = DataType::List(Arc::new(leaf_type));
637        let inner_field = Field::new("l2", inner_type.clone(), false);
638        let outer_type = DataType::List(Arc::new(inner_field));
639        let outer_field = Field::new("l1", outer_type.clone(), false);
640
641        let primitives = Int32Array::from_iter(0..10);
642
643        // Cannot use from_iter_primitive as always infers nullable
644        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
645        let inner_list = ArrayDataBuilder::new(inner_type)
646            .len(4)
647            .add_buffer(offsets)
648            .add_child_data(primitives.to_data())
649            .build()
650            .unwrap();
651
652        let offsets = Buffer::from_iter([0_i32, 2, 4]);
653        let outer_list = ArrayDataBuilder::new(outer_type)
654            .len(2)
655            .add_buffer(offsets)
656            .add_child_data(inner_list)
657            .build()
658            .unwrap();
659        let outer_list = make_array(outer_list);
660
661        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
662        assert_eq!(levels.len(), 1);
663
664        let expected = ArrayLevels {
665            def_levels: Some(vec![2; 10]),
666            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
667            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
668            max_def_level: 2,
669            max_rep_level: 2,
670            array: Arc::new(primitives),
671        };
672        assert_eq!(&levels[0], &expected);
673    }
674
675    #[test]
676    fn test_calculate_one_level_1() {
677        // This test calculates the levels for a non-null primitive array
678        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
679        let field = Field::new_list_field(DataType::Int32, false);
680
681        let levels = calculate_array_levels(&array, &field).unwrap();
682        assert_eq!(levels.len(), 1);
683
684        let expected_levels = ArrayLevels {
685            def_levels: None,
686            rep_levels: None,
687            non_null_indices: (0..10).collect(),
688            max_def_level: 0,
689            max_rep_level: 0,
690            array,
691        };
692        assert_eq!(&levels[0], &expected_levels);
693    }
694
695    #[test]
696    fn test_calculate_one_level_2() {
697        // This test calculates the levels for a nullable primitive array
698        let array = Arc::new(Int32Array::from_iter([
699            Some(0),
700            None,
701            Some(0),
702            Some(0),
703            None,
704        ])) as ArrayRef;
705        let field = Field::new_list_field(DataType::Int32, true);
706
707        let levels = calculate_array_levels(&array, &field).unwrap();
708        assert_eq!(levels.len(), 1);
709
710        let expected_levels = ArrayLevels {
711            def_levels: Some(vec![1, 0, 1, 1, 0]),
712            rep_levels: None,
713            non_null_indices: vec![0, 2, 3],
714            max_def_level: 1,
715            max_rep_level: 0,
716            array,
717        };
718        assert_eq!(&levels[0], &expected_levels);
719    }
720
721    #[test]
722    fn test_calculate_array_levels_1() {
723        let leaf_field = Field::new_list_field(DataType::Int32, false);
724        let list_type = DataType::List(Arc::new(leaf_field));
725
726        // if all array values are defined (e.g. batch<list<_>>)
727        // [[0], [1], [2], [3], [4]]
728
729        let leaf_array = Int32Array::from_iter(0..5);
730        // Cannot use from_iter_primitive as always infers nullable
731        let offsets = Buffer::from_iter(0_i32..6);
732        let list = ArrayDataBuilder::new(list_type.clone())
733            .len(5)
734            .add_buffer(offsets)
735            .add_child_data(leaf_array.to_data())
736            .build()
737            .unwrap();
738        let list = make_array(list);
739
740        let list_field = Field::new("list", list_type.clone(), false);
741        let levels = calculate_array_levels(&list, &list_field).unwrap();
742        assert_eq!(levels.len(), 1);
743
744        let expected_levels = ArrayLevels {
745            def_levels: Some(vec![1; 5]),
746            rep_levels: Some(vec![0; 5]),
747            non_null_indices: (0..5).collect(),
748            max_def_level: 1,
749            max_rep_level: 1,
750            array: Arc::new(leaf_array),
751        };
752        assert_eq!(&levels[0], &expected_levels);
753
754        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
755        // all values are defined as we do not have nulls on the root (batch)
756        // repetition:
757        //   0: 0, 1
758        //   1: 0
759        //   2: 0, 1
760        //   3: 0, 1, 1, 1
761        //   4: 0, 1, 1
762        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
763        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
764        let list = ArrayDataBuilder::new(list_type.clone())
765            .len(5)
766            .add_buffer(offsets)
767            .add_child_data(leaf_array.to_data())
768            .null_bit_buffer(Some(Buffer::from([0b00011101])))
769            .build()
770            .unwrap();
771        let list = make_array(list);
772
773        let list_field = Field::new("list", list_type, true);
774        let levels = calculate_array_levels(&list, &list_field).unwrap();
775        assert_eq!(levels.len(), 1);
776
777        let expected_levels = ArrayLevels {
778            def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
779            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
780            non_null_indices: (0..11).collect(),
781            max_def_level: 2,
782            max_rep_level: 1,
783            array: Arc::new(leaf_array),
784        };
785        assert_eq!(&levels[0], &expected_levels);
786    }
787
788    #[test]
789    fn test_calculate_array_levels_2() {
790        // If some values are null
791        //
792        // This emulates an array in the form: <struct<list<?>>
793        // with values:
794        // - 0: [0, 1], but is null because of the struct
795        // - 1: []
796        // - 2: [2, 3], but is null because of the struct
797        // - 3: [4, 5, 6, 7]
798        // - 4: [8, 9, 10]
799        //
800        // If the first values of a list are null due to a parent, we have to still account for them
801        // while indexing, because they would affect the way the child is indexed
802        // i.e. in the above example, we have to know that [0, 1] has to be skipped
803        let leaf = Int32Array::from_iter(0..11);
804        let leaf_field = Field::new("leaf", DataType::Int32, false);
805
806        let list_type = DataType::List(Arc::new(leaf_field));
807        let list = ArrayData::builder(list_type.clone())
808            .len(5)
809            .add_child_data(leaf.to_data())
810            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
811            .build()
812            .unwrap();
813
814        let list = make_array(list);
815        let list_field = Arc::new(Field::new("list", list_type, true));
816
817        let struct_array =
818            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
819        let array = Arc::new(struct_array) as ArrayRef;
820
821        let struct_field = Field::new("struct", array.data_type().clone(), true);
822
823        let levels = calculate_array_levels(&array, &struct_field).unwrap();
824        assert_eq!(levels.len(), 1);
825
826        let expected_levels = ArrayLevels {
827            def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
828            rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
829            non_null_indices: (4..11).collect(),
830            max_def_level: 3,
831            max_rep_level: 1,
832            array: Arc::new(leaf),
833        };
834
835        assert_eq!(&levels[0], &expected_levels);
836
837        // nested lists
838
839        // 0: [[100, 101], [102, 103]]
840        // 1: []
841        // 2: [[104, 105], [106, 107]]
842        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
843        // 4: [[116, 117], [118, 119], [120, 121]]
844
845        let leaf = Int32Array::from_iter(100..122);
846        let leaf_field = Field::new("leaf", DataType::Int32, true);
847
848        let l1_type = DataType::List(Arc::new(leaf_field));
849        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
850        let l1 = ArrayData::builder(l1_type.clone())
851            .len(11)
852            .add_child_data(leaf.to_data())
853            .add_buffer(offsets)
854            .build()
855            .unwrap();
856
857        let l1_field = Field::new("l1", l1_type, true);
858        let l2_type = DataType::List(Arc::new(l1_field));
859        let l2 = ArrayData::builder(l2_type)
860            .len(5)
861            .add_child_data(l1)
862            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
863            .build()
864            .unwrap();
865
866        let l2 = make_array(l2);
867        let l2_field = Field::new("l2", l2.data_type().clone(), true);
868
869        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
870        assert_eq!(levels.len(), 1);
871
872        let expected_levels = ArrayLevels {
873            def_levels: Some(vec![
874                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
875            ]),
876            rep_levels: Some(vec![
877                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
878            ]),
879            non_null_indices: (0..22).collect(),
880            max_def_level: 5,
881            max_rep_level: 2,
882            array: Arc::new(leaf),
883        };
884
885        assert_eq!(&levels[0], &expected_levels);
886    }
887
888    #[test]
889    fn test_calculate_array_levels_nested_list() {
890        let leaf_field = Field::new("leaf", DataType::Int32, false);
891        let list_type = DataType::List(Arc::new(leaf_field));
892
893        // if all array values are defined (e.g. batch<list<_>>)
894        // The array at this level looks like:
895        // 0: [a]
896        // 1: [a]
897        // 2: [a]
898        // 3: [a]
899
900        let leaf = Int32Array::from_iter([0; 4]);
901        let list = ArrayData::builder(list_type.clone())
902            .len(4)
903            .add_buffer(Buffer::from_iter(0_i32..5))
904            .add_child_data(leaf.to_data())
905            .build()
906            .unwrap();
907        let list = make_array(list);
908
909        let list_field = Field::new("list", list_type.clone(), false);
910        let levels = calculate_array_levels(&list, &list_field).unwrap();
911        assert_eq!(levels.len(), 1);
912
913        let expected_levels = ArrayLevels {
914            def_levels: Some(vec![1; 4]),
915            rep_levels: Some(vec![0; 4]),
916            non_null_indices: (0..4).collect(),
917            max_def_level: 1,
918            max_rep_level: 1,
919            array: Arc::new(leaf),
920        };
921        assert_eq!(&levels[0], &expected_levels);
922
923        // 0: null
924        // 1: [1, 2, 3]
925        // 2: [4, 5]
926        // 3: [6, 7]
927        let leaf = Int32Array::from_iter(0..8);
928        let list = ArrayData::builder(list_type.clone())
929            .len(4)
930            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
931            .null_bit_buffer(Some(Buffer::from([0b00001110])))
932            .add_child_data(leaf.to_data())
933            .build()
934            .unwrap();
935        let list = make_array(list);
936        let list_field = Arc::new(Field::new("list", list_type, true));
937
938        let struct_array = StructArray::from(vec![(list_field, list)]);
939        let array = Arc::new(struct_array) as ArrayRef;
940
941        let struct_field = Field::new("struct", array.data_type().clone(), true);
942        let levels = calculate_array_levels(&array, &struct_field).unwrap();
943        assert_eq!(levels.len(), 1);
944
945        let expected_levels = ArrayLevels {
946            def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
947            rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
948            non_null_indices: (0..7).collect(),
949            max_def_level: 3,
950            max_rep_level: 1,
951            array: Arc::new(leaf),
952        };
953        assert_eq!(&levels[0], &expected_levels);
954
955        // nested lists
956        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
957        // 0: {"struct": null }
958        // 1: {"struct": [ [201], [202, 203], [] ]}
959        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
960        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
961
962        let leaf = Int32Array::from_iter(201..216);
963        let leaf_field = Field::new("leaf", DataType::Int32, false);
964        let list_1_type = DataType::List(Arc::new(leaf_field));
965        let list_1 = ArrayData::builder(list_1_type.clone())
966            .len(7)
967            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
968            .add_child_data(leaf.to_data())
969            .build()
970            .unwrap();
971
972        let list_1_field = Field::new("l1", list_1_type, true);
973        let list_2_type = DataType::List(Arc::new(list_1_field));
974        let list_2 = ArrayData::builder(list_2_type.clone())
975            .len(4)
976            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
977            .null_bit_buffer(Some(Buffer::from([0b00001110])))
978            .add_child_data(list_1)
979            .build()
980            .unwrap();
981
982        let list_2 = make_array(list_2);
983        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
984
985        let struct_array =
986            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
987        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
988
989        let array = Arc::new(struct_array) as ArrayRef;
990        let levels = calculate_array_levels(&array, &struct_field).unwrap();
991        assert_eq!(levels.len(), 1);
992
993        let expected_levels = ArrayLevels {
994            def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
995            rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
996            non_null_indices: (0..15).collect(),
997            max_def_level: 5,
998            max_rep_level: 2,
999            array: Arc::new(leaf),
1000        };
1001        assert_eq!(&levels[0], &expected_levels);
1002    }
1003
1004    #[test]
1005    fn test_calculate_nested_struct_levels() {
1006        // tests a <struct[a]<struct[b]<int[c]>>
1007        // array:
1008        //  - {a: {b: {c: 1}}}
1009        //  - {a: {b: {c: null}}}
1010        //  - {a: {b: {c: 3}}}
1011        //  - {a: {b: null}}
1012        //  - {a: null}}
1013        //  - {a: {b: {c: 6}}}
1014
1015        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1016        let leaf = Arc::new(c) as ArrayRef;
1017        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1018        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1019
1020        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1021        let a = StructArray::from((
1022            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1023            Buffer::from([0b00101111]),
1024        ));
1025
1026        let a_field = Field::new("a", a.data_type().clone(), true);
1027        let a_array = Arc::new(a) as ArrayRef;
1028
1029        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1030        assert_eq!(levels.len(), 1);
1031
1032        let expected_levels = ArrayLevels {
1033            def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1034            rep_levels: None,
1035            non_null_indices: vec![0, 2, 5],
1036            max_def_level: 3,
1037            max_rep_level: 0,
1038            array: leaf,
1039        };
1040        assert_eq!(&levels[0], &expected_levels);
1041    }
1042
1043    #[test]
1044    fn list_single_column() {
1045        // this tests the level generation from the arrow_writer equivalent test
1046
1047        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1048        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1049        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1050        let a_list_data = ArrayData::builder(a_list_type.clone())
1051            .len(5)
1052            .add_buffer(a_value_offsets)
1053            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1054            .add_child_data(a_values.to_data())
1055            .build()
1056            .unwrap();
1057
1058        assert_eq!(a_list_data.null_count(), 1);
1059
1060        let a = ListArray::from(a_list_data);
1061
1062        let item_field = Field::new_list_field(a_list_type, true);
1063        let mut builder = levels(&item_field, a);
1064        builder.write(2..4);
1065        let levels = builder.finish();
1066
1067        assert_eq!(levels.len(), 1);
1068
1069        let list_level = &levels[0];
1070
1071        let expected_level = ArrayLevels {
1072            def_levels: Some(vec![0, 3, 3, 3]),
1073            rep_levels: Some(vec![0, 0, 1, 1]),
1074            non_null_indices: vec![3, 4, 5],
1075            max_def_level: 3,
1076            max_rep_level: 1,
1077            array: Arc::new(a_values),
1078        };
1079        assert_eq!(list_level, &expected_level);
1080    }
1081
1082    #[test]
1083    fn mixed_struct_list() {
1084        // this tests the level generation from the equivalent arrow_writer_complex test
1085
1086        // define schema
1087        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1088        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1089        let struct_field_g = Arc::new(Field::new(
1090            "g",
1091            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1092            false,
1093        ));
1094        let struct_field_e = Arc::new(Field::new(
1095            "e",
1096            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1097            true,
1098        ));
1099        let schema = Schema::new(vec![
1100            Field::new("a", DataType::Int32, false),
1101            Field::new("b", DataType::Int32, true),
1102            Field::new(
1103                "c",
1104                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1105                true, // https://github.com/apache/arrow-rs/issues/245
1106            ),
1107        ]);
1108
1109        // create some data
1110        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1111        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1112        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1113        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1114
1115        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1116
1117        // Construct a buffer for value offsets, for the nested array:
1118        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1119        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1120
1121        // Construct a list array from the above two
1122        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1123            .len(5)
1124            .add_buffer(g_value_offsets)
1125            .add_child_data(g_value.into_data())
1126            .build()
1127            .unwrap();
1128        let g = ListArray::from(g_list_data);
1129
1130        let e = StructArray::from(vec![
1131            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1132            (struct_field_g, Arc::new(g) as ArrayRef),
1133        ]);
1134
1135        let c = StructArray::from(vec![
1136            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1137            (struct_field_e, Arc::new(e) as ArrayRef),
1138        ]);
1139
1140        // build a record batch
1141        let batch = RecordBatch::try_new(
1142            Arc::new(schema),
1143            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1144        )
1145        .unwrap();
1146
1147        //////////////////////////////////////////////
1148        // calculate the list's level
1149        let mut levels = vec![];
1150        batch
1151            .columns()
1152            .iter()
1153            .zip(batch.schema().fields())
1154            .for_each(|(array, field)| {
1155                let mut array_levels = calculate_array_levels(array, field).unwrap();
1156                levels.append(&mut array_levels);
1157            });
1158        assert_eq!(levels.len(), 5);
1159
1160        // test "a" levels
1161        let list_level = &levels[0];
1162
1163        let expected_level = ArrayLevels {
1164            def_levels: None,
1165            rep_levels: None,
1166            non_null_indices: vec![0, 1, 2, 3, 4],
1167            max_def_level: 0,
1168            max_rep_level: 0,
1169            array: Arc::new(a),
1170        };
1171        assert_eq!(list_level, &expected_level);
1172
1173        // test "b" levels
1174        let list_level = levels.get(1).unwrap();
1175
1176        let expected_level = ArrayLevels {
1177            def_levels: Some(vec![1, 0, 0, 1, 1]),
1178            rep_levels: None,
1179            non_null_indices: vec![0, 3, 4],
1180            max_def_level: 1,
1181            max_rep_level: 0,
1182            array: Arc::new(b),
1183        };
1184        assert_eq!(list_level, &expected_level);
1185
1186        // test "d" levels
1187        let list_level = levels.get(2).unwrap();
1188
1189        let expected_level = ArrayLevels {
1190            def_levels: Some(vec![1, 1, 1, 2, 1]),
1191            rep_levels: None,
1192            non_null_indices: vec![3],
1193            max_def_level: 2,
1194            max_rep_level: 0,
1195            array: Arc::new(d),
1196        };
1197        assert_eq!(list_level, &expected_level);
1198
1199        // test "f" levels
1200        let list_level = levels.get(3).unwrap();
1201
1202        let expected_level = ArrayLevels {
1203            def_levels: Some(vec![3, 2, 3, 2, 3]),
1204            rep_levels: None,
1205            non_null_indices: vec![0, 2, 4],
1206            max_def_level: 3,
1207            max_rep_level: 0,
1208            array: Arc::new(f),
1209        };
1210        assert_eq!(list_level, &expected_level);
1211    }
1212
1213    #[test]
1214    fn test_null_vs_nonnull_struct() {
1215        // define schema
1216        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1217        let schema = Schema::new(vec![Field::new(
1218            "some_nested_object",
1219            DataType::Struct(vec![offset_field.clone()].into()),
1220            false,
1221        )]);
1222
1223        // create some data
1224        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1225
1226        let some_nested_object =
1227            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1228
1229        // build a record batch
1230        let batch =
1231            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1232
1233        let struct_null_level =
1234            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1235
1236        // create second batch
1237        // define schema
1238        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1239        let schema = Schema::new(vec![Field::new(
1240            "some_nested_object",
1241            DataType::Struct(vec![offset_field.clone()].into()),
1242            true,
1243        )]);
1244
1245        // create some data
1246        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1247
1248        let some_nested_object =
1249            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1250
1251        // build a record batch
1252        let batch =
1253            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1254
1255        let struct_non_null_level =
1256            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1257
1258        // The 2 levels should not be the same
1259        if struct_non_null_level == struct_null_level {
1260            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1261        }
1262    }
1263
1264    #[test]
1265    fn test_map_array() {
1266        // Note: we are using the JSON Arrow reader for brevity
1267        let json_content = r#"
1268        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1269        {"stocks":{"long": "$CCC", "short": null}}
1270        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1271        "#;
1272        let entries_struct_type = DataType::Struct(Fields::from(vec![
1273            Field::new("key", DataType::Utf8, false),
1274            Field::new("value", DataType::Utf8, true),
1275        ]));
1276        let stocks_field = Field::new(
1277            "stocks",
1278            DataType::Map(
1279                Arc::new(Field::new("entries", entries_struct_type, false)),
1280                false,
1281            ),
1282            // not nullable, so the keys have max level = 1
1283            false,
1284        );
1285        let schema = Arc::new(Schema::new(vec![stocks_field]));
1286        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1287        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1288
1289        let batch = reader.next().unwrap().unwrap();
1290
1291        // calculate the map's level
1292        let mut levels = vec![];
1293        batch
1294            .columns()
1295            .iter()
1296            .zip(batch.schema().fields())
1297            .for_each(|(array, field)| {
1298                let mut array_levels = calculate_array_levels(array, field).unwrap();
1299                levels.append(&mut array_levels);
1300            });
1301        assert_eq!(levels.len(), 2);
1302
1303        let map = batch.column(0).as_map();
1304
1305        // test key levels
1306        let list_level = &levels[0];
1307
1308        let expected_level = ArrayLevels {
1309            def_levels: Some(vec![1; 7]),
1310            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1311            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1312            max_def_level: 1,
1313            max_rep_level: 1,
1314            array: map.keys().clone(),
1315        };
1316        assert_eq!(list_level, &expected_level);
1317
1318        // test values levels
1319        let list_level = levels.get(1).unwrap();
1320
1321        let expected_level = ArrayLevels {
1322            def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1323            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1324            non_null_indices: vec![0, 1, 2, 4, 6],
1325            max_def_level: 2,
1326            max_rep_level: 1,
1327            array: map.values().clone(),
1328        };
1329        assert_eq!(list_level, &expected_level);
1330    }
1331
1332    #[test]
1333    fn test_list_of_struct() {
1334        // define schema
1335        let int_field = Field::new("a", DataType::Int32, true);
1336        let fields = Fields::from([Arc::new(int_field)]);
1337        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1338        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1339
1340        let int_builder = Int32Builder::with_capacity(10);
1341        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1342        let mut list_builder = ListBuilder::new(struct_builder);
1343
1344        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
1345        //
1346        // [{a: 1}]
1347        let values = list_builder.values();
1348        values
1349            .field_builder::<Int32Builder>(0)
1350            .unwrap()
1351            .append_value(1);
1352        values.append(true);
1353        list_builder.append(true);
1354
1355        // []
1356        list_builder.append(true);
1357
1358        // null
1359        list_builder.append(false);
1360
1361        // [null, null]
1362        let values = list_builder.values();
1363        values
1364            .field_builder::<Int32Builder>(0)
1365            .unwrap()
1366            .append_null();
1367        values.append(false);
1368        values
1369            .field_builder::<Int32Builder>(0)
1370            .unwrap()
1371            .append_null();
1372        values.append(false);
1373        list_builder.append(true);
1374
1375        // [{a: null}]
1376        let values = list_builder.values();
1377        values
1378            .field_builder::<Int32Builder>(0)
1379            .unwrap()
1380            .append_null();
1381        values.append(true);
1382        list_builder.append(true);
1383
1384        // [{a: 2}]
1385        let values = list_builder.values();
1386        values
1387            .field_builder::<Int32Builder>(0)
1388            .unwrap()
1389            .append_value(2);
1390        values.append(true);
1391        list_builder.append(true);
1392
1393        let array = Arc::new(list_builder.finish());
1394
1395        let values = array.values().as_struct().column(0).clone();
1396        let values_len = values.len();
1397        assert_eq!(values_len, 5);
1398
1399        let schema = Arc::new(Schema::new(vec![list_field]));
1400
1401        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1402
1403        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1404        let list_level = &levels[0];
1405
1406        let expected_level = ArrayLevels {
1407            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1408            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1409            non_null_indices: vec![0, 4],
1410            max_def_level: 4,
1411            max_rep_level: 1,
1412            array: values,
1413        };
1414
1415        assert_eq!(list_level, &expected_level);
1416    }
1417
1418    #[test]
1419    fn test_struct_mask_list() {
1420        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
1421        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1422            Some(vec![Some(1), Some(2)]),
1423            Some(vec![None]),
1424            Some(vec![]),
1425            Some(vec![Some(3), None]), // Masked by struct array
1426            Some(vec![Some(4), Some(5)]),
1427            None, // Masked by struct array
1428            None,
1429        ]);
1430        let values = inner.values().clone();
1431
1432        // This test assumes that nulls don't take up space
1433        assert_eq!(inner.values().len(), 7);
1434
1435        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1436        let array = Arc::new(inner) as ArrayRef;
1437        let nulls = Buffer::from([0b01010111]);
1438        let struct_a = StructArray::from((vec![(field, array)], nulls));
1439
1440        let field = Field::new("struct", struct_a.data_type().clone(), true);
1441        let array = Arc::new(struct_a) as ArrayRef;
1442        let levels = calculate_array_levels(&array, &field).unwrap();
1443
1444        assert_eq!(levels.len(), 1);
1445
1446        let expected_level = ArrayLevels {
1447            def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1448            rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1449            non_null_indices: vec![0, 1, 5, 6],
1450            max_def_level: 4,
1451            max_rep_level: 1,
1452            array: values,
1453        };
1454
1455        assert_eq!(&levels[0], &expected_level);
1456    }
1457
1458    #[test]
1459    fn test_list_mask_struct() {
1460        // Test the null mask of a struct array and the null mask of a list array
1461        // masking out non-null elements of their children
1462
1463        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1464            Some(vec![None]), // Masked by list array
1465            Some(vec![]),     // Masked by list array
1466            Some(vec![Some(3), None]),
1467            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
1468            None,
1469            None,
1470        ]);
1471        let a1_values = a1.values().clone();
1472        let a1 = Arc::new(a1) as ArrayRef;
1473
1474        let a2 = Arc::new(Int32Array::from_iter(vec![
1475            Some(1), // Masked by list array
1476            Some(2), // Masked by list array
1477            None,
1478            Some(4), // Masked by struct array
1479            Some(5),
1480            None,
1481        ])) as ArrayRef;
1482        let a2_values = a2.clone();
1483
1484        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1485        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1486
1487        let nulls = Buffer::from([0b00110111]);
1488        let struct_a = Arc::new(StructArray::from((
1489            vec![(field_a1, a1), (field_a2, a2)],
1490            nulls,
1491        ))) as ArrayRef;
1492
1493        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1494        let nulls = Buffer::from([0b00111100]);
1495
1496        let list_type = DataType::List(Arc::new(Field::new(
1497            "struct",
1498            struct_a.data_type().clone(),
1499            true,
1500        )));
1501
1502        let data = ArrayDataBuilder::new(list_type.clone())
1503            .len(6)
1504            .null_bit_buffer(Some(nulls))
1505            .add_buffer(offsets)
1506            .add_child_data(struct_a.into_data())
1507            .build()
1508            .unwrap();
1509
1510        let list = make_array(data);
1511        let list_field = Field::new("col", list_type, true);
1512
1513        let expected = vec![
1514            r#""#.to_string(),
1515            r#""#.to_string(),
1516            r#"[]"#.to_string(),
1517            r#"[{list: [3, ], integers: }]"#.to_string(),
1518            r#"[, {list: , integers: 5}]"#.to_string(),
1519            r#"[]"#.to_string(),
1520        ];
1521
1522        let actual: Vec<_> = (0..6)
1523            .map(|x| array_value_to_string(&list, x).unwrap())
1524            .collect();
1525        assert_eq!(actual, expected);
1526
1527        let levels = calculate_array_levels(&list, &list_field).unwrap();
1528
1529        assert_eq!(levels.len(), 2);
1530
1531        let expected_level = ArrayLevels {
1532            def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1533            rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1534            non_null_indices: vec![1],
1535            max_def_level: 6,
1536            max_rep_level: 2,
1537            array: a1_values,
1538        };
1539
1540        assert_eq!(&levels[0], &expected_level);
1541
1542        let expected_level = ArrayLevels {
1543            def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1544            rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1545            non_null_indices: vec![4],
1546            max_def_level: 4,
1547            max_rep_level: 1,
1548            array: a2_values,
1549        };
1550
1551        assert_eq!(&levels[1], &expected_level);
1552    }
1553
1554    #[test]
1555    fn test_fixed_size_list() {
1556        // [[1, 2], null, null, [7, 8], null]
1557        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1558        builder.values().append_slice(&[1, 2]);
1559        builder.append(true);
1560        builder.values().append_slice(&[3, 4]);
1561        builder.append(false);
1562        builder.values().append_slice(&[5, 6]);
1563        builder.append(false);
1564        builder.values().append_slice(&[7, 8]);
1565        builder.append(true);
1566        builder.values().append_slice(&[9, 10]);
1567        builder.append(false);
1568        let a = builder.finish();
1569        let values = a.values().clone();
1570
1571        let item_field = Field::new_list_field(a.data_type().clone(), true);
1572        let mut builder = levels(&item_field, a);
1573        builder.write(1..4);
1574        let levels = builder.finish();
1575
1576        assert_eq!(levels.len(), 1);
1577
1578        let list_level = &levels[0];
1579
1580        let expected_level = ArrayLevels {
1581            def_levels: Some(vec![0, 0, 3, 3]),
1582            rep_levels: Some(vec![0, 0, 0, 1]),
1583            non_null_indices: vec![6, 7],
1584            max_def_level: 3,
1585            max_rep_level: 1,
1586            array: values,
1587        };
1588        assert_eq!(list_level, &expected_level);
1589    }
1590
1591    #[test]
1592    fn test_fixed_size_list_of_struct() {
1593        // define schema
1594        let field_a = Field::new("a", DataType::Int32, true);
1595        let field_b = Field::new("b", DataType::Int64, false);
1596        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1597        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1598        let list_field = Field::new(
1599            "list",
1600            DataType::FixedSizeList(Arc::new(item_field), 2),
1601            true,
1602        );
1603
1604        let builder_a = Int32Builder::with_capacity(10);
1605        let builder_b = Int64Builder::with_capacity(10);
1606        let struct_builder =
1607            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1608        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1609
1610        // [
1611        //   [{a: 1, b: 2}, null],
1612        //   null,
1613        //   [null, null],
1614        //   [{a: null, b: 3}, {a: 2, b: 4}]
1615        // ]
1616
1617        // [{a: 1, b: 2}, null]
1618        let values = list_builder.values();
1619        // {a: 1, b: 2}
1620        values
1621            .field_builder::<Int32Builder>(0)
1622            .unwrap()
1623            .append_value(1);
1624        values
1625            .field_builder::<Int64Builder>(1)
1626            .unwrap()
1627            .append_value(2);
1628        values.append(true);
1629        // null
1630        values
1631            .field_builder::<Int32Builder>(0)
1632            .unwrap()
1633            .append_null();
1634        values
1635            .field_builder::<Int64Builder>(1)
1636            .unwrap()
1637            .append_value(0);
1638        values.append(false);
1639        list_builder.append(true);
1640
1641        // null
1642        let values = list_builder.values();
1643        // null
1644        values
1645            .field_builder::<Int32Builder>(0)
1646            .unwrap()
1647            .append_null();
1648        values
1649            .field_builder::<Int64Builder>(1)
1650            .unwrap()
1651            .append_value(0);
1652        values.append(false);
1653        // null
1654        values
1655            .field_builder::<Int32Builder>(0)
1656            .unwrap()
1657            .append_null();
1658        values
1659            .field_builder::<Int64Builder>(1)
1660            .unwrap()
1661            .append_value(0);
1662        values.append(false);
1663        list_builder.append(false);
1664
1665        // [null, null]
1666        let values = list_builder.values();
1667        // null
1668        values
1669            .field_builder::<Int32Builder>(0)
1670            .unwrap()
1671            .append_null();
1672        values
1673            .field_builder::<Int64Builder>(1)
1674            .unwrap()
1675            .append_value(0);
1676        values.append(false);
1677        // null
1678        values
1679            .field_builder::<Int32Builder>(0)
1680            .unwrap()
1681            .append_null();
1682        values
1683            .field_builder::<Int64Builder>(1)
1684            .unwrap()
1685            .append_value(0);
1686        values.append(false);
1687        list_builder.append(true);
1688
1689        // [{a: null, b: 3}, {a: 2, b: 4}]
1690        let values = list_builder.values();
1691        // {a: null, b: 3}
1692        values
1693            .field_builder::<Int32Builder>(0)
1694            .unwrap()
1695            .append_null();
1696        values
1697            .field_builder::<Int64Builder>(1)
1698            .unwrap()
1699            .append_value(3);
1700        values.append(true);
1701        // {a: 2, b: 4}
1702        values
1703            .field_builder::<Int32Builder>(0)
1704            .unwrap()
1705            .append_value(2);
1706        values
1707            .field_builder::<Int64Builder>(1)
1708            .unwrap()
1709            .append_value(4);
1710        values.append(true);
1711        list_builder.append(true);
1712
1713        let array = Arc::new(list_builder.finish());
1714
1715        assert_eq!(array.values().len(), 8);
1716        assert_eq!(array.len(), 4);
1717
1718        let struct_values = array.values().as_struct();
1719        let values_a = struct_values.column(0).clone();
1720        let values_b = struct_values.column(1).clone();
1721
1722        let schema = Arc::new(Schema::new(vec![list_field]));
1723        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1724
1725        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1726        let a_levels = &levels[0];
1727        let b_levels = &levels[1];
1728
1729        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
1730        let expected_a = ArrayLevels {
1731            def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1732            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1733            non_null_indices: vec![0, 7],
1734            max_def_level: 4,
1735            max_rep_level: 1,
1736            array: values_a,
1737        };
1738        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
1739        let expected_b = ArrayLevels {
1740            def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
1741            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1742            non_null_indices: vec![0, 6, 7],
1743            max_def_level: 3,
1744            max_rep_level: 1,
1745            array: values_b,
1746        };
1747
1748        assert_eq!(a_levels, &expected_a);
1749        assert_eq!(b_levels, &expected_b);
1750    }
1751
1752    #[test]
1753    fn test_fixed_size_list_empty() {
1754        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
1755        builder.append(true);
1756        builder.append(false);
1757        builder.append(true);
1758        let array = builder.finish();
1759        let values = array.values().clone();
1760
1761        let item_field = Field::new_list_field(array.data_type().clone(), true);
1762        let mut builder = levels(&item_field, array);
1763        builder.write(0..3);
1764        let levels = builder.finish();
1765
1766        assert_eq!(levels.len(), 1);
1767
1768        let list_level = &levels[0];
1769
1770        let expected_level = ArrayLevels {
1771            def_levels: Some(vec![1, 0, 1]),
1772            rep_levels: Some(vec![0, 0, 0]),
1773            non_null_indices: vec![],
1774            max_def_level: 3,
1775            max_rep_level: 1,
1776            array: values,
1777        };
1778        assert_eq!(list_level, &expected_level);
1779    }
1780
1781    #[test]
1782    fn test_fixed_size_list_of_var_lists() {
1783        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
1784        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
1785        builder.values().append_value([Some(1), None, Some(3)]);
1786        builder.values().append_null();
1787        builder.append(true);
1788        builder.values().append_value([Some(4)]);
1789        builder.values().append_value([]);
1790        builder.append(true);
1791        builder.values().append_value([Some(5), Some(6)]);
1792        builder.values().append_value([None, None]);
1793        builder.append(true);
1794        builder.values().append_null();
1795        builder.values().append_null();
1796        builder.append(false);
1797        let a = builder.finish();
1798        let values = a.values().as_list::<i32>().values().clone();
1799
1800        let item_field = Field::new_list_field(a.data_type().clone(), true);
1801        let mut builder = levels(&item_field, a);
1802        builder.write(0..4);
1803        let levels = builder.finish();
1804
1805        let expected_level = ArrayLevels {
1806            def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
1807            rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
1808            non_null_indices: vec![0, 2, 3, 4, 5],
1809            max_def_level: 5,
1810            max_rep_level: 2,
1811            array: values,
1812        };
1813
1814        assert_eq!(levels[0], expected_level);
1815    }
1816
1817    #[test]
1818    fn test_null_dictionary_values() {
1819        let values = Int32Array::new(
1820            vec![1, 2, 3, 4].into(),
1821            Some(NullBuffer::from(vec![true, false, true, true])),
1822        );
1823        let keys = Int32Array::new(
1824            vec![1, 54, 2, 0].into(),
1825            Some(NullBuffer::from(vec![true, false, true, true])),
1826        );
1827        // [NULL, NULL, 3, 0]
1828        let dict = DictionaryArray::new(keys, Arc::new(values));
1829
1830        let item_field = Field::new_list_field(dict.data_type().clone(), true);
1831
1832        let mut builder = levels(&item_field, dict.clone());
1833        builder.write(0..4);
1834        let levels = builder.finish();
1835        let expected_level = ArrayLevels {
1836            def_levels: Some(vec![0, 0, 1, 1]),
1837            rep_levels: None,
1838            non_null_indices: vec![2, 3],
1839            max_def_level: 1,
1840            max_rep_level: 0,
1841            array: Arc::new(dict),
1842        };
1843        assert_eq!(levels[0], expected_level);
1844    }
1845
1846    #[test]
1847    fn mismatched_types() {
1848        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1849        let field = Field::new_list_field(DataType::Float64, false);
1850
1851        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
1852            .unwrap_err()
1853            .to_string();
1854
1855        assert_eq!(
1856            err,
1857            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
1858        );
1859    }
1860
1861    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
1862        let v = Arc::new(array) as ArrayRef;
1863        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
1864    }
1865}