Skip to main content

parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::errors::{ParquetError, Result};
44use arrow_array::cast::AsArray;
45use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
46use arrow_buffer::bit_iterator::BitIndexIterator;
47use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
48use arrow_schema::{DataType, Field};
49use std::ops::Range;
50use std::sync::Arc;
51
52/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
53/// for each leaf column encountered
54pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
55    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
56    builder.write(0..array.len());
57    Ok(builder.finish())
58}
59
60/// Returns true if the DataType can be represented as a primitive parquet column,
61/// i.e. a leaf array with no children
62fn is_leaf(data_type: &DataType) -> bool {
63    matches!(
64        data_type,
65        DataType::Null
66            | DataType::Boolean
67            | DataType::Int8
68            | DataType::Int16
69            | DataType::Int32
70            | DataType::Int64
71            | DataType::UInt8
72            | DataType::UInt16
73            | DataType::UInt32
74            | DataType::UInt64
75            | DataType::Float16
76            | DataType::Float32
77            | DataType::Float64
78            | DataType::Utf8
79            | DataType::Utf8View
80            | DataType::LargeUtf8
81            | DataType::Timestamp(_, _)
82            | DataType::Date32
83            | DataType::Date64
84            | DataType::Time32(_)
85            | DataType::Time64(_)
86            | DataType::Duration(_)
87            | DataType::Interval(_)
88            | DataType::Binary
89            | DataType::LargeBinary
90            | DataType::BinaryView
91            | DataType::Decimal32(_, _)
92            | DataType::Decimal64(_, _)
93            | DataType::Decimal128(_, _)
94            | DataType::Decimal256(_, _)
95            | DataType::FixedSizeBinary(_)
96    )
97}
98
99/// The definition and repetition level of an array within a potentially nested hierarchy
100#[derive(Debug, Default, Clone, Copy)]
101struct LevelContext {
102    /// The current repetition level
103    rep_level: i16,
104    /// The current definition level
105    def_level: i16,
106}
107
108/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
109#[derive(Debug)]
110enum LevelInfoBuilder {
111    /// A primitive, leaf array
112    Primitive(ArrayLevels),
113    /// A list array
114    List(
115        Box<LevelInfoBuilder>, // Child Values
116        LevelContext,          // Context
117        OffsetBuffer<i32>,     // Offsets
118        Option<NullBuffer>,    // Nulls
119    ),
120    /// A large list array
121    LargeList(
122        Box<LevelInfoBuilder>, // Child Values
123        LevelContext,          // Context
124        OffsetBuffer<i64>,     // Offsets
125        Option<NullBuffer>,    // Nulls
126    ),
127    /// A fixed size list array
128    FixedSizeList(
129        Box<LevelInfoBuilder>, // Values
130        LevelContext,          // Context
131        usize,                 // List Size
132        Option<NullBuffer>,    // Nulls
133    ),
134    /// A list view array
135    ListView(
136        Box<LevelInfoBuilder>, // Child Values
137        LevelContext,          // Context
138        ScalarBuffer<i32>,     // Offsets
139        ScalarBuffer<i32>,     // Sizes
140        Option<NullBuffer>,    // Nulls
141    ),
142    /// A large list view array
143    LargeListView(
144        Box<LevelInfoBuilder>, // Child Values
145        LevelContext,          // Context
146        ScalarBuffer<i64>,     // Offsets
147        ScalarBuffer<i64>,     // Sizes
148        Option<NullBuffer>,    // Nulls
149    ),
150    /// A struct array
151    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
152}
153
154impl LevelInfoBuilder {
155    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
156    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
157        if !Self::types_compatible(field.data_type(), array.data_type()) {
158            return Err(arrow_err!(format!(
159                "Incompatible type. Field '{}' has type {}, array has type {}",
160                field.name(),
161                field.data_type(),
162                array.data_type(),
163            )));
164        }
165
166        let is_nullable = field.is_nullable();
167
168        match array.data_type() {
169            d if is_leaf(d) => {
170                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
171                Ok(Self::Primitive(levels))
172            }
173            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
174                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
175                Ok(Self::Primitive(levels))
176            }
177            DataType::Struct(children) => {
178                let array = array.as_struct();
179                let def_level = match is_nullable {
180                    true => parent_ctx.def_level + 1,
181                    false => parent_ctx.def_level,
182                };
183
184                let ctx = LevelContext {
185                    rep_level: parent_ctx.rep_level,
186                    def_level,
187                };
188
189                let children = children
190                    .iter()
191                    .zip(array.columns())
192                    .map(|(f, a)| Self::try_new(f, ctx, a))
193                    .collect::<Result<_>>()?;
194
195                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
196            }
197            DataType::List(child)
198            | DataType::LargeList(child)
199            | DataType::Map(child, _)
200            | DataType::FixedSizeList(child, _)
201            | DataType::ListView(child)
202            | DataType::LargeListView(child) => {
203                let def_level = match is_nullable {
204                    true => parent_ctx.def_level + 2,
205                    false => parent_ctx.def_level + 1,
206                };
207
208                let ctx = LevelContext {
209                    rep_level: parent_ctx.rep_level + 1,
210                    def_level,
211                };
212
213                Ok(match field.data_type() {
214                    DataType::List(_) => {
215                        let list = array.as_list();
216                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
217                        let offsets = list.offsets().clone();
218                        Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
219                    }
220                    DataType::LargeList(_) => {
221                        let list = array.as_list();
222                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
223                        let offsets = list.offsets().clone();
224                        let nulls = list.nulls().cloned();
225                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
226                    }
227                    DataType::Map(_, _) => {
228                        let map = array.as_map();
229                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
230                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
231                        let offsets = map.offsets().clone();
232                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
233                    }
234                    DataType::FixedSizeList(_, size) => {
235                        let list = array.as_fixed_size_list();
236                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
237                        let nulls = list.nulls().cloned();
238                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
239                    }
240                    DataType::ListView(_) => {
241                        let list = array.as_list_view();
242                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
243                        let offsets = list.offsets().clone();
244                        let sizes = list.sizes().clone();
245                        let nulls = list.nulls().cloned();
246                        Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
247                    }
248                    DataType::LargeListView(_) => {
249                        let list = array.as_list_view();
250                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
251                        let offsets = list.offsets().clone();
252                        let sizes = list.sizes().clone();
253                        let nulls = list.nulls().cloned();
254                        Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
255                    }
256                    _ => unreachable!(),
257                })
258            }
259            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
260        }
261    }
262
263    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
264    /// as enumerated by a depth-first search
265    fn finish(self) -> Vec<ArrayLevels> {
266        match self {
267            LevelInfoBuilder::Primitive(v) => vec![v],
268            LevelInfoBuilder::List(v, _, _, _)
269            | LevelInfoBuilder::LargeList(v, _, _, _)
270            | LevelInfoBuilder::FixedSizeList(v, _, _, _)
271            | LevelInfoBuilder::ListView(v, _, _, _, _)
272            | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
273            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
274        }
275    }
276
277    /// Given an `array`, write the level data for the elements in `range`
278    fn write(&mut self, range: Range<usize>) {
279        match self {
280            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
281            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
282                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
283            }
284            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
285                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
286            }
287            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
288                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
289            }
290            LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
291                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
292            }
293            LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
294                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
295            }
296            LevelInfoBuilder::Struct(children, ctx, nulls) => {
297                Self::write_struct(children, ctx, nulls.as_ref(), range)
298            }
299        }
300    }
301
302    /// Write `range` elements from ListArray `array`
303    ///
304    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
305    fn write_list<O: OffsetSizeTrait>(
306        child: &mut LevelInfoBuilder,
307        ctx: &LevelContext,
308        offsets: &[O],
309        nulls: Option<&NullBuffer>,
310        range: Range<usize>,
311    ) {
312        let offsets = &offsets[range.start..range.end + 1];
313
314        let write_non_null_slice =
315            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
316                child.write(start_idx..end_idx);
317                child.visit_leaves(|leaf| {
318                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
319                    let mut rev = rep_levels.iter_mut().rev();
320                    let mut remaining = end_idx - start_idx;
321
322                    loop {
323                        let next = rev.next().unwrap();
324                        if *next > ctx.rep_level {
325                            // Nested element - ignore
326                            continue;
327                        }
328
329                        remaining -= 1;
330                        if remaining == 0 {
331                            *next = ctx.rep_level - 1;
332                            break;
333                        }
334                    }
335                })
336            };
337
338        let write_empty_slice = |child: &mut LevelInfoBuilder| {
339            child.visit_leaves(|leaf| {
340                let rep_levels = leaf.rep_levels.as_mut().unwrap();
341                rep_levels.push(ctx.rep_level - 1);
342                let def_levels = leaf.def_levels.as_mut().unwrap();
343                def_levels.push(ctx.def_level - 1);
344            })
345        };
346
347        let write_null_slice = |child: &mut LevelInfoBuilder| {
348            child.visit_leaves(|leaf| {
349                let rep_levels = leaf.rep_levels.as_mut().unwrap();
350                rep_levels.push(ctx.rep_level - 1);
351                let def_levels = leaf.def_levels.as_mut().unwrap();
352                def_levels.push(ctx.def_level - 2);
353            })
354        };
355
356        match nulls {
357            Some(nulls) => {
358                let null_offset = range.start;
359                // TODO: Faster bitmask iteration (#1757)
360                for (idx, w) in offsets.windows(2).enumerate() {
361                    let is_valid = nulls.is_valid(idx + null_offset);
362                    let start_idx = w[0].as_usize();
363                    let end_idx = w[1].as_usize();
364                    if !is_valid {
365                        write_null_slice(child)
366                    } else if start_idx == end_idx {
367                        write_empty_slice(child)
368                    } else {
369                        write_non_null_slice(child, start_idx, end_idx)
370                    }
371                }
372            }
373            None => {
374                for w in offsets.windows(2) {
375                    let start_idx = w[0].as_usize();
376                    let end_idx = w[1].as_usize();
377                    if start_idx == end_idx {
378                        write_empty_slice(child)
379                    } else {
380                        write_non_null_slice(child, start_idx, end_idx)
381                    }
382                }
383            }
384        }
385    }
386
387    /// Write `range` elements from ListViewArray `array`
388    fn write_list_view<O: OffsetSizeTrait>(
389        child: &mut LevelInfoBuilder,
390        ctx: &LevelContext,
391        offsets: &[O],
392        sizes: &[O],
393        nulls: Option<&NullBuffer>,
394        range: Range<usize>,
395    ) {
396        let offsets = &offsets[range.start..range.end];
397        let sizes = &sizes[range.start..range.end];
398
399        let write_non_null_slice =
400            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
401                child.write(start_idx..end_idx);
402                child.visit_leaves(|leaf| {
403                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
404                    let mut rev = rep_levels.iter_mut().rev();
405                    let mut remaining = end_idx - start_idx;
406
407                    loop {
408                        let next = rev.next().unwrap();
409                        if *next > ctx.rep_level {
410                            // Nested element - ignore
411                            continue;
412                        }
413
414                        remaining -= 1;
415                        if remaining == 0 {
416                            *next = ctx.rep_level - 1;
417                            break;
418                        }
419                    }
420                })
421            };
422
423        let write_empty_slice = |child: &mut LevelInfoBuilder| {
424            child.visit_leaves(|leaf| {
425                let rep_levels = leaf.rep_levels.as_mut().unwrap();
426                rep_levels.push(ctx.rep_level - 1);
427                let def_levels = leaf.def_levels.as_mut().unwrap();
428                def_levels.push(ctx.def_level - 1);
429            })
430        };
431
432        let write_null_slice = |child: &mut LevelInfoBuilder| {
433            child.visit_leaves(|leaf| {
434                let rep_levels = leaf.rep_levels.as_mut().unwrap();
435                rep_levels.push(ctx.rep_level - 1);
436                let def_levels = leaf.def_levels.as_mut().unwrap();
437                def_levels.push(ctx.def_level - 2);
438            })
439        };
440
441        match nulls {
442            Some(nulls) => {
443                let null_offset = range.start;
444                // TODO: Faster bitmask iteration (#1757)
445                for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
446                    let is_valid = nulls.is_valid(idx + null_offset);
447                    let start_idx = offset.as_usize();
448                    let size = size.as_usize();
449                    let end_idx = start_idx + size;
450                    if !is_valid {
451                        write_null_slice(child)
452                    } else if size == 0 {
453                        write_empty_slice(child)
454                    } else {
455                        write_non_null_slice(child, start_idx, end_idx)
456                    }
457                }
458            }
459            None => {
460                for (offset, size) in offsets.iter().zip(sizes.iter()) {
461                    let start_idx = offset.as_usize();
462                    let size = size.as_usize();
463                    let end_idx = start_idx + size;
464                    if size == 0 {
465                        write_empty_slice(child)
466                    } else {
467                        write_non_null_slice(child, start_idx, end_idx)
468                    }
469                }
470            }
471        }
472    }
473
474    /// Write `range` elements from StructArray `array`
475    fn write_struct(
476        children: &mut [LevelInfoBuilder],
477        ctx: &LevelContext,
478        nulls: Option<&NullBuffer>,
479        range: Range<usize>,
480    ) {
481        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
482            for child in children {
483                child.visit_leaves(|info| {
484                    let len = range.end - range.start;
485
486                    let def_levels = info.def_levels.as_mut().unwrap();
487                    def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
488
489                    if let Some(rep_levels) = info.rep_levels.as_mut() {
490                        rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
491                    }
492                })
493            }
494        };
495
496        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
497            for child in children {
498                child.write(range.clone())
499            }
500        };
501
502        match nulls {
503            Some(validity) => {
504                let mut last_non_null_idx = None;
505                let mut last_null_idx = None;
506
507                // TODO: Faster bitmask iteration (#1757)
508                for i in range.clone() {
509                    match validity.is_valid(i) {
510                        true => {
511                            if let Some(last_idx) = last_null_idx.take() {
512                                write_null(children, last_idx..i)
513                            }
514                            last_non_null_idx.get_or_insert(i);
515                        }
516                        false => {
517                            if let Some(last_idx) = last_non_null_idx.take() {
518                                write_non_null(children, last_idx..i)
519                            }
520                            last_null_idx.get_or_insert(i);
521                        }
522                    }
523                }
524
525                if let Some(last_idx) = last_null_idx.take() {
526                    write_null(children, last_idx..range.end)
527                }
528
529                if let Some(last_idx) = last_non_null_idx.take() {
530                    write_non_null(children, last_idx..range.end)
531                }
532            }
533            None => write_non_null(children, range),
534        }
535    }
536
537    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
538    fn write_fixed_size_list(
539        child: &mut LevelInfoBuilder,
540        ctx: &LevelContext,
541        fixed_size: usize,
542        nulls: Option<&NullBuffer>,
543        range: Range<usize>,
544    ) {
545        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
546            let values_start = start_idx * fixed_size;
547            let values_end = end_idx * fixed_size;
548            child.write(values_start..values_end);
549
550            child.visit_leaves(|leaf| {
551                let rep_levels = leaf.rep_levels.as_mut().unwrap();
552
553                let row_indices = (0..fixed_size)
554                    .rev()
555                    .cycle()
556                    .take(values_end - values_start);
557
558                // Step backward over the child rep levels and mark the start of each list
559                rep_levels
560                    .iter_mut()
561                    .rev()
562                    // Filter out reps from nested children
563                    .filter(|&&mut r| r == ctx.rep_level)
564                    .zip(row_indices)
565                    .for_each(|(r, idx)| {
566                        if idx == 0 {
567                            *r = ctx.rep_level - 1;
568                        }
569                    });
570            })
571        };
572
573        // If list size is 0, ignore values and just write rep/def levels.
574        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
575            let len = end_idx - start_idx;
576            child.visit_leaves(|leaf| {
577                let rep_levels = leaf.rep_levels.as_mut().unwrap();
578                rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
579                let def_levels = leaf.def_levels.as_mut().unwrap();
580                def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
581            })
582        };
583
584        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
585            if fixed_size > 0 {
586                write_non_null(child, start_idx, end_idx)
587            } else {
588                write_empty(child, start_idx, end_idx)
589            }
590        };
591
592        match nulls {
593            Some(nulls) => {
594                let mut start_idx = None;
595                for idx in range.clone() {
596                    if nulls.is_valid(idx) {
597                        // Start a run of valid rows if not already inside of one
598                        start_idx.get_or_insert(idx);
599                    } else {
600                        // Write out any pending valid rows
601                        if let Some(start) = start_idx.take() {
602                            write_rows(child, start, idx);
603                        }
604                        // Add null row
605                        child.visit_leaves(|leaf| {
606                            let rep_levels = leaf.rep_levels.as_mut().unwrap();
607                            rep_levels.push(ctx.rep_level - 1);
608                            let def_levels = leaf.def_levels.as_mut().unwrap();
609                            def_levels.push(ctx.def_level - 2);
610                        })
611                    }
612                }
613                // Write out any remaining valid rows
614                if let Some(start) = start_idx.take() {
615                    write_rows(child, start, range.end);
616                }
617            }
618            // If all rows are valid then write the whole array
619            None => write_rows(child, range.start, range.end),
620        }
621    }
622
623    /// Write a primitive array, as defined by [`is_leaf`]
624    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
625        let len = range.end - range.start;
626
627        match &mut info.def_levels {
628            Some(def_levels) => {
629                def_levels.reserve(len);
630                info.non_null_indices.reserve(len);
631
632                match &info.logical_nulls {
633                    Some(nulls) => {
634                        assert!(range.end <= nulls.len());
635                        let nulls = nulls.inner();
636                        def_levels.extend(range.clone().map(|i| {
637                            // Safety: range.end was asserted to be in bounds earlier
638                            let valid = unsafe { nulls.value_unchecked(i) };
639                            info.max_def_level - (!valid as i16)
640                        }));
641                        info.non_null_indices.extend(
642                            BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
643                                .map(|i| i + range.start),
644                        );
645                    }
646                    None => {
647                        let iter = std::iter::repeat_n(info.max_def_level, len);
648                        def_levels.extend(iter);
649                        info.non_null_indices.extend(range);
650                    }
651                }
652            }
653            None => info.non_null_indices.extend(range),
654        }
655
656        if let Some(rep_levels) = &mut info.rep_levels {
657            rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
658        }
659    }
660
661    /// Visits all children of this node in depth first order
662    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
663        match self {
664            LevelInfoBuilder::Primitive(info) => visit(info),
665            LevelInfoBuilder::List(c, _, _, _)
666            | LevelInfoBuilder::LargeList(c, _, _, _)
667            | LevelInfoBuilder::FixedSizeList(c, _, _, _)
668            | LevelInfoBuilder::ListView(c, _, _, _, _)
669            | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
670            LevelInfoBuilder::Struct(children, _, _) => {
671                for c in children {
672                    c.visit_leaves(visit)
673                }
674            }
675        }
676    }
677
678    /// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
679    ///
680    /// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
681    /// and the other is a native array, the dictionary values must have the same type as the
682    /// native array
683    fn types_compatible(a: &DataType, b: &DataType) -> bool {
684        // if the Arrow data types are equal, the types are deemed compatible
685        if a.equals_datatype(b) {
686            return true;
687        }
688
689        // get the values out of the dictionaries
690        let (a, b) = match (a, b) {
691            (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
692                (va.as_ref(), vb.as_ref())
693            }
694            (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
695            (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
696            _ => (a, b),
697        };
698
699        // now that we've got the values from one/both dictionaries, if the values
700        // have the same Arrow data type, they're compatible
701        if a == b {
702            return true;
703        }
704
705        // here we have different Arrow data types, but if the array contains the same type of data
706        // then we consider the type compatible
707        match a {
708            // String, StringView and LargeString are compatible
709            DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
710            DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
711            DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
712
713            // Binary, BinaryView and LargeBinary are compatible
714            DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
715            DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
716            DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
717
718            // otherwise we have incompatible types
719            _ => false,
720        }
721    }
722}
723
724/// The data necessary to write a primitive Arrow array to parquet, taking into account
725/// any non-primitive parents it may have in the arrow representation
726#[derive(Debug, Clone)]
727pub(crate) struct ArrayLevels {
728    /// Array's definition levels
729    ///
730    /// Present if `max_def_level != 0`
731    def_levels: Option<Vec<i16>>,
732
733    /// Array's optional repetition levels
734    ///
735    /// Present if `max_rep_level != 0`
736    rep_levels: Option<Vec<i16>>,
737
738    /// The corresponding array identifying non-null slices of data
739    /// from the primitive array
740    non_null_indices: Vec<usize>,
741
742    /// The maximum definition level for this leaf column
743    max_def_level: i16,
744
745    /// The maximum repetition for this leaf column
746    max_rep_level: i16,
747
748    /// The arrow array
749    array: ArrayRef,
750
751    /// cached logical nulls of the array.
752    logical_nulls: Option<NullBuffer>,
753}
754
755impl PartialEq for ArrayLevels {
756    fn eq(&self, other: &Self) -> bool {
757        self.def_levels == other.def_levels
758            && self.rep_levels == other.rep_levels
759            && self.non_null_indices == other.non_null_indices
760            && self.max_def_level == other.max_def_level
761            && self.max_rep_level == other.max_rep_level
762            && self.array.as_ref() == other.array.as_ref()
763            && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
764    }
765}
766impl Eq for ArrayLevels {}
767
768impl ArrayLevels {
769    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
770        let max_rep_level = ctx.rep_level;
771        let max_def_level = match is_nullable {
772            true => ctx.def_level + 1,
773            false => ctx.def_level,
774        };
775
776        let logical_nulls = array.logical_nulls();
777
778        Self {
779            def_levels: (max_def_level != 0).then(Vec::new),
780            rep_levels: (max_rep_level != 0).then(Vec::new),
781            non_null_indices: vec![],
782            max_def_level,
783            max_rep_level,
784            array,
785            logical_nulls,
786        }
787    }
788
789    pub fn array(&self) -> &ArrayRef {
790        &self.array
791    }
792
793    pub fn def_levels(&self) -> Option<&[i16]> {
794        self.def_levels.as_deref()
795    }
796
797    pub fn rep_levels(&self) -> Option<&[i16]> {
798        self.rep_levels.as_deref()
799    }
800
801    pub fn non_null_indices(&self) -> &[usize] {
802        &self.non_null_indices
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809
810    use arrow_array::builder::*;
811    use arrow_array::types::Int32Type;
812    use arrow_array::*;
813    use arrow_buffer::{Buffer, ToByteSlice};
814    use arrow_cast::display::array_value_to_string;
815    use arrow_data::{ArrayData, ArrayDataBuilder};
816    use arrow_schema::{Fields, Schema};
817
818    #[test]
819    fn test_calculate_array_levels_twitter_example() {
820        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
821        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
822
823        let leaf_type = Field::new_list_field(DataType::Int32, false);
824        let inner_type = DataType::List(Arc::new(leaf_type));
825        let inner_field = Field::new("l2", inner_type.clone(), false);
826        let outer_type = DataType::List(Arc::new(inner_field));
827        let outer_field = Field::new("l1", outer_type.clone(), false);
828
829        let primitives = Int32Array::from_iter(0..10);
830
831        // Cannot use from_iter_primitive as always infers nullable
832        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
833        let inner_list = ArrayDataBuilder::new(inner_type)
834            .len(4)
835            .add_buffer(offsets)
836            .add_child_data(primitives.to_data())
837            .build()
838            .unwrap();
839
840        let offsets = Buffer::from_iter([0_i32, 2, 4]);
841        let outer_list = ArrayDataBuilder::new(outer_type)
842            .len(2)
843            .add_buffer(offsets)
844            .add_child_data(inner_list)
845            .build()
846            .unwrap();
847        let outer_list = make_array(outer_list);
848
849        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
850        assert_eq!(levels.len(), 1);
851
852        let expected = ArrayLevels {
853            def_levels: Some(vec![2; 10]),
854            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
855            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
856            max_def_level: 2,
857            max_rep_level: 2,
858            array: Arc::new(primitives),
859            logical_nulls: None,
860        };
861        assert_eq!(&levels[0], &expected);
862    }
863
864    #[test]
865    fn test_calculate_one_level_1() {
866        // This test calculates the levels for a non-null primitive array
867        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
868        let field = Field::new_list_field(DataType::Int32, false);
869
870        let levels = calculate_array_levels(&array, &field).unwrap();
871        assert_eq!(levels.len(), 1);
872
873        let expected_levels = ArrayLevels {
874            def_levels: None,
875            rep_levels: None,
876            non_null_indices: (0..10).collect(),
877            max_def_level: 0,
878            max_rep_level: 0,
879            array,
880            logical_nulls: None,
881        };
882        assert_eq!(&levels[0], &expected_levels);
883    }
884
885    #[test]
886    fn test_calculate_one_level_2() {
887        // This test calculates the levels for a nullable primitive array
888        let array = Arc::new(Int32Array::from_iter([
889            Some(0),
890            None,
891            Some(0),
892            Some(0),
893            None,
894        ])) as ArrayRef;
895        let field = Field::new_list_field(DataType::Int32, true);
896
897        let levels = calculate_array_levels(&array, &field).unwrap();
898        assert_eq!(levels.len(), 1);
899
900        let logical_nulls = array.logical_nulls();
901        let expected_levels = ArrayLevels {
902            def_levels: Some(vec![1, 0, 1, 1, 0]),
903            rep_levels: None,
904            non_null_indices: vec![0, 2, 3],
905            max_def_level: 1,
906            max_rep_level: 0,
907            array,
908            logical_nulls,
909        };
910        assert_eq!(&levels[0], &expected_levels);
911    }
912
913    #[test]
914    fn test_calculate_array_levels_1() {
915        let leaf_field = Field::new_list_field(DataType::Int32, false);
916        let list_type = DataType::List(Arc::new(leaf_field));
917
918        // if all array values are defined (e.g. batch<list<_>>)
919        // [[0], [1], [2], [3], [4]]
920
921        let leaf_array = Int32Array::from_iter(0..5);
922        // Cannot use from_iter_primitive as always infers nullable
923        let offsets = Buffer::from_iter(0_i32..6);
924        let list = ArrayDataBuilder::new(list_type.clone())
925            .len(5)
926            .add_buffer(offsets)
927            .add_child_data(leaf_array.to_data())
928            .build()
929            .unwrap();
930        let list = make_array(list);
931
932        let list_field = Field::new("list", list_type.clone(), false);
933        let levels = calculate_array_levels(&list, &list_field).unwrap();
934        assert_eq!(levels.len(), 1);
935
936        let expected_levels = ArrayLevels {
937            def_levels: Some(vec![1; 5]),
938            rep_levels: Some(vec![0; 5]),
939            non_null_indices: (0..5).collect(),
940            max_def_level: 1,
941            max_rep_level: 1,
942            array: Arc::new(leaf_array),
943            logical_nulls: None,
944        };
945        assert_eq!(&levels[0], &expected_levels);
946
947        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
948        // all values are defined as we do not have nulls on the root (batch)
949        // repetition:
950        //   0: 0, 1
951        //   1: 0
952        //   2: 0, 1
953        //   3: 0, 1, 1, 1
954        //   4: 0, 1, 1
955        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
956        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
957        let list = ArrayDataBuilder::new(list_type.clone())
958            .len(5)
959            .add_buffer(offsets)
960            .add_child_data(leaf_array.to_data())
961            .null_bit_buffer(Some(Buffer::from([0b00011101])))
962            .build()
963            .unwrap();
964        let list = make_array(list);
965
966        let list_field = Field::new("list", list_type, true);
967        let levels = calculate_array_levels(&list, &list_field).unwrap();
968        assert_eq!(levels.len(), 1);
969
970        let expected_levels = ArrayLevels {
971            def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
972            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
973            non_null_indices: (0..11).collect(),
974            max_def_level: 2,
975            max_rep_level: 1,
976            array: Arc::new(leaf_array),
977            logical_nulls: None,
978        };
979        assert_eq!(&levels[0], &expected_levels);
980    }
981
982    #[test]
983    fn test_calculate_array_levels_2() {
984        // If some values are null
985        //
986        // This emulates an array in the form: <struct<list<?>>
987        // with values:
988        // - 0: [0, 1], but is null because of the struct
989        // - 1: []
990        // - 2: [2, 3], but is null because of the struct
991        // - 3: [4, 5, 6, 7]
992        // - 4: [8, 9, 10]
993        //
994        // If the first values of a list are null due to a parent, we have to still account for them
995        // while indexing, because they would affect the way the child is indexed
996        // i.e. in the above example, we have to know that [0, 1] has to be skipped
997        let leaf = Int32Array::from_iter(0..11);
998        let leaf_field = Field::new("leaf", DataType::Int32, false);
999
1000        let list_type = DataType::List(Arc::new(leaf_field));
1001        let list = ArrayData::builder(list_type.clone())
1002            .len(5)
1003            .add_child_data(leaf.to_data())
1004            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1005            .build()
1006            .unwrap();
1007
1008        let list = make_array(list);
1009        let list_field = Arc::new(Field::new("list", list_type, true));
1010
1011        let struct_array =
1012            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1013        let array = Arc::new(struct_array) as ArrayRef;
1014
1015        let struct_field = Field::new("struct", array.data_type().clone(), true);
1016
1017        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1018        assert_eq!(levels.len(), 1);
1019
1020        let expected_levels = ArrayLevels {
1021            def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1022            rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1023            non_null_indices: (4..11).collect(),
1024            max_def_level: 3,
1025            max_rep_level: 1,
1026            array: Arc::new(leaf),
1027            logical_nulls: None,
1028        };
1029
1030        assert_eq!(&levels[0], &expected_levels);
1031
1032        // nested lists
1033
1034        // 0: [[100, 101], [102, 103]]
1035        // 1: []
1036        // 2: [[104, 105], [106, 107]]
1037        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
1038        // 4: [[116, 117], [118, 119], [120, 121]]
1039
1040        let leaf = Int32Array::from_iter(100..122);
1041        let leaf_field = Field::new("leaf", DataType::Int32, true);
1042
1043        let l1_type = DataType::List(Arc::new(leaf_field));
1044        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1045        let l1 = ArrayData::builder(l1_type.clone())
1046            .len(11)
1047            .add_child_data(leaf.to_data())
1048            .add_buffer(offsets)
1049            .build()
1050            .unwrap();
1051
1052        let l1_field = Field::new("l1", l1_type, true);
1053        let l2_type = DataType::List(Arc::new(l1_field));
1054        let l2 = ArrayData::builder(l2_type)
1055            .len(5)
1056            .add_child_data(l1)
1057            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1058            .build()
1059            .unwrap();
1060
1061        let l2 = make_array(l2);
1062        let l2_field = Field::new("l2", l2.data_type().clone(), true);
1063
1064        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1065        assert_eq!(levels.len(), 1);
1066
1067        let expected_levels = ArrayLevels {
1068            def_levels: Some(vec![
1069                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1070            ]),
1071            rep_levels: Some(vec![
1072                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1073            ]),
1074            non_null_indices: (0..22).collect(),
1075            max_def_level: 5,
1076            max_rep_level: 2,
1077            array: Arc::new(leaf),
1078            logical_nulls: None,
1079        };
1080
1081        assert_eq!(&levels[0], &expected_levels);
1082    }
1083
1084    #[test]
1085    fn test_calculate_array_levels_nested_list() {
1086        let leaf_field = Field::new("leaf", DataType::Int32, false);
1087        let list_type = DataType::List(Arc::new(leaf_field));
1088
1089        // if all array values are defined (e.g. batch<list<_>>)
1090        // The array at this level looks like:
1091        // 0: [a]
1092        // 1: [a]
1093        // 2: [a]
1094        // 3: [a]
1095
1096        let leaf = Int32Array::from_iter([0; 4]);
1097        let list = ArrayData::builder(list_type.clone())
1098            .len(4)
1099            .add_buffer(Buffer::from_iter(0_i32..5))
1100            .add_child_data(leaf.to_data())
1101            .build()
1102            .unwrap();
1103        let list = make_array(list);
1104
1105        let list_field = Field::new("list", list_type.clone(), false);
1106        let levels = calculate_array_levels(&list, &list_field).unwrap();
1107        assert_eq!(levels.len(), 1);
1108
1109        let expected_levels = ArrayLevels {
1110            def_levels: Some(vec![1; 4]),
1111            rep_levels: Some(vec![0; 4]),
1112            non_null_indices: (0..4).collect(),
1113            max_def_level: 1,
1114            max_rep_level: 1,
1115            array: Arc::new(leaf),
1116            logical_nulls: None,
1117        };
1118        assert_eq!(&levels[0], &expected_levels);
1119
1120        // 0: null
1121        // 1: [1, 2, 3]
1122        // 2: [4, 5]
1123        // 3: [6, 7]
1124        let leaf = Int32Array::from_iter(0..8);
1125        let list = ArrayData::builder(list_type.clone())
1126            .len(4)
1127            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1128            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1129            .add_child_data(leaf.to_data())
1130            .build()
1131            .unwrap();
1132        let list = make_array(list);
1133        let list_field = Arc::new(Field::new("list", list_type, true));
1134
1135        let struct_array = StructArray::from(vec![(list_field, list)]);
1136        let array = Arc::new(struct_array) as ArrayRef;
1137
1138        let struct_field = Field::new("struct", array.data_type().clone(), true);
1139        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1140        assert_eq!(levels.len(), 1);
1141
1142        let expected_levels = ArrayLevels {
1143            def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1144            rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1145            non_null_indices: (0..7).collect(),
1146            max_def_level: 3,
1147            max_rep_level: 1,
1148            array: Arc::new(leaf),
1149            logical_nulls: None,
1150        };
1151        assert_eq!(&levels[0], &expected_levels);
1152
1153        // nested lists
1154        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
1155        // 0: {"struct": null }
1156        // 1: {"struct": [ [201], [202, 203], [] ]}
1157        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
1158        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
1159
1160        let leaf = Int32Array::from_iter(201..216);
1161        let leaf_field = Field::new("leaf", DataType::Int32, false);
1162        let list_1_type = DataType::List(Arc::new(leaf_field));
1163        let list_1 = ArrayData::builder(list_1_type.clone())
1164            .len(7)
1165            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1166            .add_child_data(leaf.to_data())
1167            .build()
1168            .unwrap();
1169
1170        let list_1_field = Field::new("l1", list_1_type, true);
1171        let list_2_type = DataType::List(Arc::new(list_1_field));
1172        let list_2 = ArrayData::builder(list_2_type.clone())
1173            .len(4)
1174            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1175            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1176            .add_child_data(list_1)
1177            .build()
1178            .unwrap();
1179
1180        let list_2 = make_array(list_2);
1181        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1182
1183        let struct_array =
1184            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1185        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1186
1187        let array = Arc::new(struct_array) as ArrayRef;
1188        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1189        assert_eq!(levels.len(), 1);
1190
1191        let expected_levels = ArrayLevels {
1192            def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1193            rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1194            non_null_indices: (0..15).collect(),
1195            max_def_level: 5,
1196            max_rep_level: 2,
1197            array: Arc::new(leaf),
1198            logical_nulls: None,
1199        };
1200        assert_eq!(&levels[0], &expected_levels);
1201    }
1202
1203    #[test]
1204    fn test_calculate_nested_struct_levels() {
1205        // tests a <struct[a]<struct[b]<int[c]>>
1206        // array:
1207        //  - {a: {b: {c: 1}}}
1208        //  - {a: {b: {c: null}}}
1209        //  - {a: {b: {c: 3}}}
1210        //  - {a: {b: null}}
1211        //  - {a: null}}
1212        //  - {a: {b: {c: 6}}}
1213
1214        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1215        let leaf = Arc::new(c) as ArrayRef;
1216        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1217        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1218
1219        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1220        let a = StructArray::from((
1221            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1222            Buffer::from([0b00101111]),
1223        ));
1224
1225        let a_field = Field::new("a", a.data_type().clone(), true);
1226        let a_array = Arc::new(a) as ArrayRef;
1227
1228        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1229        assert_eq!(levels.len(), 1);
1230
1231        let logical_nulls = leaf.logical_nulls();
1232        let expected_levels = ArrayLevels {
1233            def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1234            rep_levels: None,
1235            non_null_indices: vec![0, 2, 5],
1236            max_def_level: 3,
1237            max_rep_level: 0,
1238            array: leaf,
1239            logical_nulls,
1240        };
1241        assert_eq!(&levels[0], &expected_levels);
1242    }
1243
1244    #[test]
1245    fn list_single_column() {
1246        // this tests the level generation from the arrow_writer equivalent test
1247
1248        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1249        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1250        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1251        let a_list_data = ArrayData::builder(a_list_type.clone())
1252            .len(5)
1253            .add_buffer(a_value_offsets)
1254            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1255            .add_child_data(a_values.to_data())
1256            .build()
1257            .unwrap();
1258
1259        assert_eq!(a_list_data.null_count(), 1);
1260
1261        let a = ListArray::from(a_list_data);
1262
1263        let item_field = Field::new_list_field(a_list_type, true);
1264        let mut builder = levels(&item_field, a);
1265        builder.write(2..4);
1266        let levels = builder.finish();
1267
1268        assert_eq!(levels.len(), 1);
1269
1270        let list_level = &levels[0];
1271
1272        let expected_level = ArrayLevels {
1273            def_levels: Some(vec![0, 3, 3, 3]),
1274            rep_levels: Some(vec![0, 0, 1, 1]),
1275            non_null_indices: vec![3, 4, 5],
1276            max_def_level: 3,
1277            max_rep_level: 1,
1278            array: Arc::new(a_values),
1279            logical_nulls: None,
1280        };
1281        assert_eq!(list_level, &expected_level);
1282    }
1283
1284    #[test]
1285    fn mixed_struct_list() {
1286        // this tests the level generation from the equivalent arrow_writer_complex test
1287
1288        // define schema
1289        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1290        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1291        let struct_field_g = Arc::new(Field::new(
1292            "g",
1293            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1294            false,
1295        ));
1296        let struct_field_e = Arc::new(Field::new(
1297            "e",
1298            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1299            true,
1300        ));
1301        let schema = Schema::new(vec![
1302            Field::new("a", DataType::Int32, false),
1303            Field::new("b", DataType::Int32, true),
1304            Field::new(
1305                "c",
1306                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1307                true, // https://github.com/apache/arrow-rs/issues/245
1308            ),
1309        ]);
1310
1311        // create some data
1312        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1313        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1314        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1315        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1316
1317        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1318
1319        // Construct a buffer for value offsets, for the nested array:
1320        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1321        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1322
1323        // Construct a list array from the above two
1324        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1325            .len(5)
1326            .add_buffer(g_value_offsets)
1327            .add_child_data(g_value.into_data())
1328            .build()
1329            .unwrap();
1330        let g = ListArray::from(g_list_data);
1331
1332        let e = StructArray::from(vec![
1333            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1334            (struct_field_g, Arc::new(g) as ArrayRef),
1335        ]);
1336
1337        let c = StructArray::from(vec![
1338            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1339            (struct_field_e, Arc::new(e) as ArrayRef),
1340        ]);
1341
1342        // build a record batch
1343        let batch = RecordBatch::try_new(
1344            Arc::new(schema),
1345            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1346        )
1347        .unwrap();
1348
1349        //////////////////////////////////////////////
1350        // calculate the list's level
1351        let mut levels = vec![];
1352        batch
1353            .columns()
1354            .iter()
1355            .zip(batch.schema().fields())
1356            .for_each(|(array, field)| {
1357                let mut array_levels = calculate_array_levels(array, field).unwrap();
1358                levels.append(&mut array_levels);
1359            });
1360        assert_eq!(levels.len(), 5);
1361
1362        // test "a" levels
1363        let list_level = &levels[0];
1364
1365        let expected_level = ArrayLevels {
1366            def_levels: None,
1367            rep_levels: None,
1368            non_null_indices: vec![0, 1, 2, 3, 4],
1369            max_def_level: 0,
1370            max_rep_level: 0,
1371            array: Arc::new(a),
1372            logical_nulls: None,
1373        };
1374        assert_eq!(list_level, &expected_level);
1375
1376        // test "b" levels
1377        let list_level = levels.get(1).unwrap();
1378
1379        let b_logical_nulls = b.logical_nulls();
1380        let expected_level = ArrayLevels {
1381            def_levels: Some(vec![1, 0, 0, 1, 1]),
1382            rep_levels: None,
1383            non_null_indices: vec![0, 3, 4],
1384            max_def_level: 1,
1385            max_rep_level: 0,
1386            array: Arc::new(b),
1387            logical_nulls: b_logical_nulls,
1388        };
1389        assert_eq!(list_level, &expected_level);
1390
1391        // test "d" levels
1392        let list_level = levels.get(2).unwrap();
1393
1394        let d_logical_nulls = d.logical_nulls();
1395        let expected_level = ArrayLevels {
1396            def_levels: Some(vec![1, 1, 1, 2, 1]),
1397            rep_levels: None,
1398            non_null_indices: vec![3],
1399            max_def_level: 2,
1400            max_rep_level: 0,
1401            array: Arc::new(d),
1402            logical_nulls: d_logical_nulls,
1403        };
1404        assert_eq!(list_level, &expected_level);
1405
1406        // test "f" levels
1407        let list_level = levels.get(3).unwrap();
1408
1409        let f_logical_nulls = f.logical_nulls();
1410        let expected_level = ArrayLevels {
1411            def_levels: Some(vec![3, 2, 3, 2, 3]),
1412            rep_levels: None,
1413            non_null_indices: vec![0, 2, 4],
1414            max_def_level: 3,
1415            max_rep_level: 0,
1416            array: Arc::new(f),
1417            logical_nulls: f_logical_nulls,
1418        };
1419        assert_eq!(list_level, &expected_level);
1420    }
1421
1422    #[test]
1423    fn test_null_vs_nonnull_struct() {
1424        // define schema
1425        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1426        let schema = Schema::new(vec![Field::new(
1427            "some_nested_object",
1428            DataType::Struct(vec![offset_field.clone()].into()),
1429            false,
1430        )]);
1431
1432        // create some data
1433        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1434
1435        let some_nested_object =
1436            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1437
1438        // build a record batch
1439        let batch =
1440            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1441
1442        let struct_null_level =
1443            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1444
1445        // create second batch
1446        // define schema
1447        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1448        let schema = Schema::new(vec![Field::new(
1449            "some_nested_object",
1450            DataType::Struct(vec![offset_field.clone()].into()),
1451            true,
1452        )]);
1453
1454        // create some data
1455        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1456
1457        let some_nested_object =
1458            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1459
1460        // build a record batch
1461        let batch =
1462            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1463
1464        let struct_non_null_level =
1465            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1466
1467        // The 2 levels should not be the same
1468        if struct_non_null_level == struct_null_level {
1469            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1470        }
1471    }
1472
1473    #[test]
1474    fn test_map_array() {
1475        // Note: we are using the JSON Arrow reader for brevity
1476        let json_content = r#"
1477        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1478        {"stocks":{"long": "$CCC", "short": null}}
1479        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1480        "#;
1481        let entries_struct_type = DataType::Struct(Fields::from(vec![
1482            Field::new("key", DataType::Utf8, false),
1483            Field::new("value", DataType::Utf8, true),
1484        ]));
1485        let stocks_field = Field::new(
1486            "stocks",
1487            DataType::Map(
1488                Arc::new(Field::new("entries", entries_struct_type, false)),
1489                false,
1490            ),
1491            // not nullable, so the keys have max level = 1
1492            false,
1493        );
1494        let schema = Arc::new(Schema::new(vec![stocks_field]));
1495        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1496        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1497
1498        let batch = reader.next().unwrap().unwrap();
1499
1500        // calculate the map's level
1501        let mut levels = vec![];
1502        batch
1503            .columns()
1504            .iter()
1505            .zip(batch.schema().fields())
1506            .for_each(|(array, field)| {
1507                let mut array_levels = calculate_array_levels(array, field).unwrap();
1508                levels.append(&mut array_levels);
1509            });
1510        assert_eq!(levels.len(), 2);
1511
1512        let map = batch.column(0).as_map();
1513        let map_keys_logical_nulls = map.keys().logical_nulls();
1514
1515        // test key levels
1516        let list_level = &levels[0];
1517
1518        let expected_level = ArrayLevels {
1519            def_levels: Some(vec![1; 7]),
1520            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1521            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1522            max_def_level: 1,
1523            max_rep_level: 1,
1524            array: map.keys().clone(),
1525            logical_nulls: map_keys_logical_nulls,
1526        };
1527        assert_eq!(list_level, &expected_level);
1528
1529        // test values levels
1530        let list_level = levels.get(1).unwrap();
1531        let map_values_logical_nulls = map.values().logical_nulls();
1532
1533        let expected_level = ArrayLevels {
1534            def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1535            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1536            non_null_indices: vec![0, 1, 2, 4, 6],
1537            max_def_level: 2,
1538            max_rep_level: 1,
1539            array: map.values().clone(),
1540            logical_nulls: map_values_logical_nulls,
1541        };
1542        assert_eq!(list_level, &expected_level);
1543    }
1544
1545    #[test]
1546    fn test_list_of_struct() {
1547        // define schema
1548        let int_field = Field::new("a", DataType::Int32, true);
1549        let fields = Fields::from([Arc::new(int_field)]);
1550        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1551        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1552
1553        let int_builder = Int32Builder::with_capacity(10);
1554        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1555        let mut list_builder = ListBuilder::new(struct_builder);
1556
1557        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
1558        //
1559        // [{a: 1}]
1560        let values = list_builder.values();
1561        values
1562            .field_builder::<Int32Builder>(0)
1563            .unwrap()
1564            .append_value(1);
1565        values.append(true);
1566        list_builder.append(true);
1567
1568        // []
1569        list_builder.append(true);
1570
1571        // null
1572        list_builder.append(false);
1573
1574        // [null, null]
1575        let values = list_builder.values();
1576        values
1577            .field_builder::<Int32Builder>(0)
1578            .unwrap()
1579            .append_null();
1580        values.append(false);
1581        values
1582            .field_builder::<Int32Builder>(0)
1583            .unwrap()
1584            .append_null();
1585        values.append(false);
1586        list_builder.append(true);
1587
1588        // [{a: null}]
1589        let values = list_builder.values();
1590        values
1591            .field_builder::<Int32Builder>(0)
1592            .unwrap()
1593            .append_null();
1594        values.append(true);
1595        list_builder.append(true);
1596
1597        // [{a: 2}]
1598        let values = list_builder.values();
1599        values
1600            .field_builder::<Int32Builder>(0)
1601            .unwrap()
1602            .append_value(2);
1603        values.append(true);
1604        list_builder.append(true);
1605
1606        let array = Arc::new(list_builder.finish());
1607
1608        let values = array.values().as_struct().column(0).clone();
1609        let values_len = values.len();
1610        assert_eq!(values_len, 5);
1611
1612        let schema = Arc::new(Schema::new(vec![list_field]));
1613
1614        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1615
1616        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1617        let list_level = &levels[0];
1618
1619        let logical_nulls = values.logical_nulls();
1620        let expected_level = ArrayLevels {
1621            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1622            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1623            non_null_indices: vec![0, 4],
1624            max_def_level: 4,
1625            max_rep_level: 1,
1626            array: values,
1627            logical_nulls,
1628        };
1629
1630        assert_eq!(list_level, &expected_level);
1631    }
1632
1633    #[test]
1634    fn test_struct_mask_list() {
1635        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
1636        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1637            Some(vec![Some(1), Some(2)]),
1638            Some(vec![None]),
1639            Some(vec![]),
1640            Some(vec![Some(3), None]), // Masked by struct array
1641            Some(vec![Some(4), Some(5)]),
1642            None, // Masked by struct array
1643            None,
1644        ]);
1645        let values = inner.values().clone();
1646
1647        // This test assumes that nulls don't take up space
1648        assert_eq!(inner.values().len(), 7);
1649
1650        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1651        let array = Arc::new(inner) as ArrayRef;
1652        let nulls = Buffer::from([0b01010111]);
1653        let struct_a = StructArray::from((vec![(field, array)], nulls));
1654
1655        let field = Field::new("struct", struct_a.data_type().clone(), true);
1656        let array = Arc::new(struct_a) as ArrayRef;
1657        let levels = calculate_array_levels(&array, &field).unwrap();
1658
1659        assert_eq!(levels.len(), 1);
1660
1661        let logical_nulls = values.logical_nulls();
1662        let expected_level = ArrayLevels {
1663            def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1664            rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1665            non_null_indices: vec![0, 1, 5, 6],
1666            max_def_level: 4,
1667            max_rep_level: 1,
1668            array: values,
1669            logical_nulls,
1670        };
1671
1672        assert_eq!(&levels[0], &expected_level);
1673    }
1674
1675    #[test]
1676    fn test_list_mask_struct() {
1677        // Test the null mask of a struct array and the null mask of a list array
1678        // masking out non-null elements of their children
1679
1680        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1681            Some(vec![None]), // Masked by list array
1682            Some(vec![]),     // Masked by list array
1683            Some(vec![Some(3), None]),
1684            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
1685            None,
1686            None,
1687        ]);
1688        let a1_values = a1.values().clone();
1689        let a1 = Arc::new(a1) as ArrayRef;
1690
1691        let a2 = Arc::new(Int32Array::from_iter(vec![
1692            Some(1), // Masked by list array
1693            Some(2), // Masked by list array
1694            None,
1695            Some(4), // Masked by struct array
1696            Some(5),
1697            None,
1698        ])) as ArrayRef;
1699        let a2_values = a2.clone();
1700
1701        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1702        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1703
1704        let nulls = Buffer::from([0b00110111]);
1705        let struct_a = Arc::new(StructArray::from((
1706            vec![(field_a1, a1), (field_a2, a2)],
1707            nulls,
1708        ))) as ArrayRef;
1709
1710        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1711        let nulls = Buffer::from([0b00111100]);
1712
1713        let list_type = DataType::List(Arc::new(Field::new(
1714            "struct",
1715            struct_a.data_type().clone(),
1716            true,
1717        )));
1718
1719        let data = ArrayDataBuilder::new(list_type.clone())
1720            .len(6)
1721            .null_bit_buffer(Some(nulls))
1722            .add_buffer(offsets)
1723            .add_child_data(struct_a.into_data())
1724            .build()
1725            .unwrap();
1726
1727        let list = make_array(data);
1728        let list_field = Field::new("col", list_type, true);
1729
1730        let expected = vec![
1731            r#""#.to_string(),
1732            r#""#.to_string(),
1733            r#"[]"#.to_string(),
1734            r#"[{list: [3, ], integers: }]"#.to_string(),
1735            r#"[, {list: , integers: 5}]"#.to_string(),
1736            r#"[]"#.to_string(),
1737        ];
1738
1739        let actual: Vec<_> = (0..6)
1740            .map(|x| array_value_to_string(&list, x).unwrap())
1741            .collect();
1742        assert_eq!(actual, expected);
1743
1744        let levels = calculate_array_levels(&list, &list_field).unwrap();
1745
1746        assert_eq!(levels.len(), 2);
1747
1748        let a1_logical_nulls = a1_values.logical_nulls();
1749        let expected_level = ArrayLevels {
1750            def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1751            rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1752            non_null_indices: vec![1],
1753            max_def_level: 6,
1754            max_rep_level: 2,
1755            array: a1_values,
1756            logical_nulls: a1_logical_nulls,
1757        };
1758
1759        assert_eq!(&levels[0], &expected_level);
1760
1761        let a2_logical_nulls = a2_values.logical_nulls();
1762        let expected_level = ArrayLevels {
1763            def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1764            rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1765            non_null_indices: vec![4],
1766            max_def_level: 4,
1767            max_rep_level: 1,
1768            array: a2_values,
1769            logical_nulls: a2_logical_nulls,
1770        };
1771
1772        assert_eq!(&levels[1], &expected_level);
1773    }
1774
1775    #[test]
1776    fn test_fixed_size_list() {
1777        // [[1, 2], null, null, [7, 8], null]
1778        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1779        builder.values().append_slice(&[1, 2]);
1780        builder.append(true);
1781        builder.values().append_slice(&[3, 4]);
1782        builder.append(false);
1783        builder.values().append_slice(&[5, 6]);
1784        builder.append(false);
1785        builder.values().append_slice(&[7, 8]);
1786        builder.append(true);
1787        builder.values().append_slice(&[9, 10]);
1788        builder.append(false);
1789        let a = builder.finish();
1790        let values = a.values().clone();
1791
1792        let item_field = Field::new_list_field(a.data_type().clone(), true);
1793        let mut builder = levels(&item_field, a);
1794        builder.write(1..4);
1795        let levels = builder.finish();
1796
1797        assert_eq!(levels.len(), 1);
1798
1799        let list_level = &levels[0];
1800
1801        let logical_nulls = values.logical_nulls();
1802        let expected_level = ArrayLevels {
1803            def_levels: Some(vec![0, 0, 3, 3]),
1804            rep_levels: Some(vec![0, 0, 0, 1]),
1805            non_null_indices: vec![6, 7],
1806            max_def_level: 3,
1807            max_rep_level: 1,
1808            array: values,
1809            logical_nulls,
1810        };
1811        assert_eq!(list_level, &expected_level);
1812    }
1813
1814    #[test]
1815    fn test_fixed_size_list_of_struct() {
1816        // define schema
1817        let field_a = Field::new("a", DataType::Int32, true);
1818        let field_b = Field::new("b", DataType::Int64, false);
1819        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1820        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1821        let list_field = Field::new(
1822            "list",
1823            DataType::FixedSizeList(Arc::new(item_field), 2),
1824            true,
1825        );
1826
1827        let builder_a = Int32Builder::with_capacity(10);
1828        let builder_b = Int64Builder::with_capacity(10);
1829        let struct_builder =
1830            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1831        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1832
1833        // [
1834        //   [{a: 1, b: 2}, null],
1835        //   null,
1836        //   [null, null],
1837        //   [{a: null, b: 3}, {a: 2, b: 4}]
1838        // ]
1839
1840        // [{a: 1, b: 2}, null]
1841        let values = list_builder.values();
1842        // {a: 1, b: 2}
1843        values
1844            .field_builder::<Int32Builder>(0)
1845            .unwrap()
1846            .append_value(1);
1847        values
1848            .field_builder::<Int64Builder>(1)
1849            .unwrap()
1850            .append_value(2);
1851        values.append(true);
1852        // null
1853        values
1854            .field_builder::<Int32Builder>(0)
1855            .unwrap()
1856            .append_null();
1857        values
1858            .field_builder::<Int64Builder>(1)
1859            .unwrap()
1860            .append_value(0);
1861        values.append(false);
1862        list_builder.append(true);
1863
1864        // null
1865        let values = list_builder.values();
1866        // null
1867        values
1868            .field_builder::<Int32Builder>(0)
1869            .unwrap()
1870            .append_null();
1871        values
1872            .field_builder::<Int64Builder>(1)
1873            .unwrap()
1874            .append_value(0);
1875        values.append(false);
1876        // null
1877        values
1878            .field_builder::<Int32Builder>(0)
1879            .unwrap()
1880            .append_null();
1881        values
1882            .field_builder::<Int64Builder>(1)
1883            .unwrap()
1884            .append_value(0);
1885        values.append(false);
1886        list_builder.append(false);
1887
1888        // [null, null]
1889        let values = list_builder.values();
1890        // null
1891        values
1892            .field_builder::<Int32Builder>(0)
1893            .unwrap()
1894            .append_null();
1895        values
1896            .field_builder::<Int64Builder>(1)
1897            .unwrap()
1898            .append_value(0);
1899        values.append(false);
1900        // null
1901        values
1902            .field_builder::<Int32Builder>(0)
1903            .unwrap()
1904            .append_null();
1905        values
1906            .field_builder::<Int64Builder>(1)
1907            .unwrap()
1908            .append_value(0);
1909        values.append(false);
1910        list_builder.append(true);
1911
1912        // [{a: null, b: 3}, {a: 2, b: 4}]
1913        let values = list_builder.values();
1914        // {a: null, b: 3}
1915        values
1916            .field_builder::<Int32Builder>(0)
1917            .unwrap()
1918            .append_null();
1919        values
1920            .field_builder::<Int64Builder>(1)
1921            .unwrap()
1922            .append_value(3);
1923        values.append(true);
1924        // {a: 2, b: 4}
1925        values
1926            .field_builder::<Int32Builder>(0)
1927            .unwrap()
1928            .append_value(2);
1929        values
1930            .field_builder::<Int64Builder>(1)
1931            .unwrap()
1932            .append_value(4);
1933        values.append(true);
1934        list_builder.append(true);
1935
1936        let array = Arc::new(list_builder.finish());
1937
1938        assert_eq!(array.values().len(), 8);
1939        assert_eq!(array.len(), 4);
1940
1941        let struct_values = array.values().as_struct();
1942        let values_a = struct_values.column(0).clone();
1943        let values_b = struct_values.column(1).clone();
1944
1945        let schema = Arc::new(Schema::new(vec![list_field]));
1946        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1947
1948        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1949        let a_levels = &levels[0];
1950        let b_levels = &levels[1];
1951
1952        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
1953        let values_a_logical_nulls = values_a.logical_nulls();
1954        let expected_a = ArrayLevels {
1955            def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1956            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1957            non_null_indices: vec![0, 7],
1958            max_def_level: 4,
1959            max_rep_level: 1,
1960            array: values_a,
1961            logical_nulls: values_a_logical_nulls,
1962        };
1963        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
1964        let values_b_logical_nulls = values_b.logical_nulls();
1965        let expected_b = ArrayLevels {
1966            def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
1967            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1968            non_null_indices: vec![0, 6, 7],
1969            max_def_level: 3,
1970            max_rep_level: 1,
1971            array: values_b,
1972            logical_nulls: values_b_logical_nulls,
1973        };
1974
1975        assert_eq!(a_levels, &expected_a);
1976        assert_eq!(b_levels, &expected_b);
1977    }
1978
1979    #[test]
1980    fn test_fixed_size_list_empty() {
1981        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
1982        builder.append(true);
1983        builder.append(false);
1984        builder.append(true);
1985        let array = builder.finish();
1986        let values = array.values().clone();
1987
1988        let item_field = Field::new_list_field(array.data_type().clone(), true);
1989        let mut builder = levels(&item_field, array);
1990        builder.write(0..3);
1991        let levels = builder.finish();
1992
1993        assert_eq!(levels.len(), 1);
1994
1995        let list_level = &levels[0];
1996
1997        let logical_nulls = values.logical_nulls();
1998        let expected_level = ArrayLevels {
1999            def_levels: Some(vec![1, 0, 1]),
2000            rep_levels: Some(vec![0, 0, 0]),
2001            non_null_indices: vec![],
2002            max_def_level: 3,
2003            max_rep_level: 1,
2004            array: values,
2005            logical_nulls,
2006        };
2007        assert_eq!(list_level, &expected_level);
2008    }
2009
2010    #[test]
2011    fn test_fixed_size_list_of_var_lists() {
2012        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
2013        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2014        builder.values().append_value([Some(1), None, Some(3)]);
2015        builder.values().append_null();
2016        builder.append(true);
2017        builder.values().append_value([Some(4)]);
2018        builder.values().append_value([]);
2019        builder.append(true);
2020        builder.values().append_value([Some(5), Some(6)]);
2021        builder.values().append_value([None, None]);
2022        builder.append(true);
2023        builder.values().append_null();
2024        builder.values().append_null();
2025        builder.append(false);
2026        let a = builder.finish();
2027        let values = a.values().as_list::<i32>().values().clone();
2028
2029        let item_field = Field::new_list_field(a.data_type().clone(), true);
2030        let mut builder = levels(&item_field, a);
2031        builder.write(0..4);
2032        let levels = builder.finish();
2033
2034        let logical_nulls = values.logical_nulls();
2035        let expected_level = ArrayLevels {
2036            def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2037            rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2038            non_null_indices: vec![0, 2, 3, 4, 5],
2039            max_def_level: 5,
2040            max_rep_level: 2,
2041            array: values,
2042            logical_nulls,
2043        };
2044
2045        assert_eq!(levels[0], expected_level);
2046    }
2047
2048    #[test]
2049    fn test_null_dictionary_values() {
2050        let values = Int32Array::new(
2051            vec![1, 2, 3, 4].into(),
2052            Some(NullBuffer::from(vec![true, false, true, true])),
2053        );
2054        let keys = Int32Array::new(
2055            vec![1, 54, 2, 0].into(),
2056            Some(NullBuffer::from(vec![true, false, true, true])),
2057        );
2058        // [NULL, NULL, 3, 0]
2059        let dict = DictionaryArray::new(keys, Arc::new(values));
2060
2061        let item_field = Field::new_list_field(dict.data_type().clone(), true);
2062
2063        let mut builder = levels(&item_field, dict.clone());
2064        builder.write(0..4);
2065        let levels = builder.finish();
2066
2067        let logical_nulls = dict.logical_nulls();
2068        let expected_level = ArrayLevels {
2069            def_levels: Some(vec![0, 0, 1, 1]),
2070            rep_levels: None,
2071            non_null_indices: vec![2, 3],
2072            max_def_level: 1,
2073            max_rep_level: 0,
2074            array: Arc::new(dict),
2075            logical_nulls,
2076        };
2077        assert_eq!(levels[0], expected_level);
2078    }
2079
2080    #[test]
2081    fn mismatched_types() {
2082        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2083        let field = Field::new_list_field(DataType::Float64, false);
2084
2085        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2086            .unwrap_err()
2087            .to_string();
2088
2089        assert_eq!(
2090            err,
2091            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2092        );
2093    }
2094
2095    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2096        let v = Arc::new(array) as ArrayRef;
2097        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2098    }
2099}