Skip to main content

parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::column::chunker::CdcChunk;
44use crate::errors::{ParquetError, Result};
45use arrow_array::cast::AsArray;
46use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
47use arrow_buffer::bit_iterator::BitIndexIterator;
48use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
49use arrow_schema::{DataType, Field};
50use std::ops::Range;
51use std::sync::Arc;
52
53/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
54/// for each leaf column encountered
55pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
56    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
57    builder.write(0..array.len());
58    Ok(builder.finish())
59}
60
61/// Returns true if the DataType can be represented as a primitive parquet column,
62/// i.e. a leaf array with no children
63fn is_leaf(data_type: &DataType) -> bool {
64    matches!(
65        data_type,
66        DataType::Null
67            | DataType::Boolean
68            | DataType::Int8
69            | DataType::Int16
70            | DataType::Int32
71            | DataType::Int64
72            | DataType::UInt8
73            | DataType::UInt16
74            | DataType::UInt32
75            | DataType::UInt64
76            | DataType::Float16
77            | DataType::Float32
78            | DataType::Float64
79            | DataType::Utf8
80            | DataType::Utf8View
81            | DataType::LargeUtf8
82            | DataType::Timestamp(_, _)
83            | DataType::Date32
84            | DataType::Date64
85            | DataType::Time32(_)
86            | DataType::Time64(_)
87            | DataType::Duration(_)
88            | DataType::Interval(_)
89            | DataType::Binary
90            | DataType::LargeBinary
91            | DataType::BinaryView
92            | DataType::Decimal32(_, _)
93            | DataType::Decimal64(_, _)
94            | DataType::Decimal128(_, _)
95            | DataType::Decimal256(_, _)
96            | DataType::FixedSizeBinary(_)
97    )
98}
99
100/// The definition and repetition level of an array within a potentially nested hierarchy
101#[derive(Debug, Default, Clone, Copy)]
102struct LevelContext {
103    /// The current repetition level
104    rep_level: i16,
105    /// The current definition level
106    def_level: i16,
107}
108
109/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
110#[derive(Debug)]
111enum LevelInfoBuilder {
112    /// A primitive, leaf array
113    Primitive(ArrayLevels),
114    /// A list array
115    List(
116        Box<LevelInfoBuilder>, // Child Values
117        LevelContext,          // Context
118        OffsetBuffer<i32>,     // Offsets
119        Option<NullBuffer>,    // Nulls
120    ),
121    /// A large list array
122    LargeList(
123        Box<LevelInfoBuilder>, // Child Values
124        LevelContext,          // Context
125        OffsetBuffer<i64>,     // Offsets
126        Option<NullBuffer>,    // Nulls
127    ),
128    /// A fixed size list array
129    FixedSizeList(
130        Box<LevelInfoBuilder>, // Values
131        LevelContext,          // Context
132        usize,                 // List Size
133        Option<NullBuffer>,    // Nulls
134    ),
135    /// A list view array
136    ListView(
137        Box<LevelInfoBuilder>, // Child Values
138        LevelContext,          // Context
139        ScalarBuffer<i32>,     // Offsets
140        ScalarBuffer<i32>,     // Sizes
141        Option<NullBuffer>,    // Nulls
142    ),
143    /// A large list view array
144    LargeListView(
145        Box<LevelInfoBuilder>, // Child Values
146        LevelContext,          // Context
147        ScalarBuffer<i64>,     // Offsets
148        ScalarBuffer<i64>,     // Sizes
149        Option<NullBuffer>,    // Nulls
150    ),
151    /// A struct array
152    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
153}
154
155impl LevelInfoBuilder {
156    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
157    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
158        if !Self::types_compatible(field.data_type(), array.data_type()) {
159            return Err(arrow_err!(format!(
160                "Incompatible type. Field '{}' has type {}, array has type {}",
161                field.name(),
162                field.data_type(),
163                array.data_type(),
164            )));
165        }
166
167        let is_nullable = field.is_nullable();
168
169        match array.data_type() {
170            d if is_leaf(d) => {
171                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
172                Ok(Self::Primitive(levels))
173            }
174            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
175                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
176                Ok(Self::Primitive(levels))
177            }
178            DataType::Struct(children) => {
179                let array = array.as_struct();
180                let def_level = match is_nullable {
181                    true => parent_ctx.def_level + 1,
182                    false => parent_ctx.def_level,
183                };
184
185                let ctx = LevelContext {
186                    rep_level: parent_ctx.rep_level,
187                    def_level,
188                };
189
190                let children = children
191                    .iter()
192                    .zip(array.columns())
193                    .map(|(f, a)| Self::try_new(f, ctx, a))
194                    .collect::<Result<_>>()?;
195
196                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
197            }
198            DataType::List(child)
199            | DataType::LargeList(child)
200            | DataType::Map(child, _)
201            | DataType::FixedSizeList(child, _)
202            | DataType::ListView(child)
203            | DataType::LargeListView(child) => {
204                let def_level = match is_nullable {
205                    true => parent_ctx.def_level + 2,
206                    false => parent_ctx.def_level + 1,
207                };
208
209                let ctx = LevelContext {
210                    rep_level: parent_ctx.rep_level + 1,
211                    def_level,
212                };
213
214                Ok(match field.data_type() {
215                    DataType::List(_) => {
216                        let list = array.as_list();
217                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
218                        let offsets = list.offsets().clone();
219                        Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
220                    }
221                    DataType::LargeList(_) => {
222                        let list = array.as_list();
223                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
224                        let offsets = list.offsets().clone();
225                        let nulls = list.nulls().cloned();
226                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
227                    }
228                    DataType::Map(_, _) => {
229                        let map = array.as_map();
230                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
231                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
232                        let offsets = map.offsets().clone();
233                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
234                    }
235                    DataType::FixedSizeList(_, size) => {
236                        let list = array.as_fixed_size_list();
237                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
238                        let nulls = list.nulls().cloned();
239                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
240                    }
241                    DataType::ListView(_) => {
242                        let list = array.as_list_view();
243                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
244                        let offsets = list.offsets().clone();
245                        let sizes = list.sizes().clone();
246                        let nulls = list.nulls().cloned();
247                        Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
248                    }
249                    DataType::LargeListView(_) => {
250                        let list = array.as_list_view();
251                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
252                        let offsets = list.offsets().clone();
253                        let sizes = list.sizes().clone();
254                        let nulls = list.nulls().cloned();
255                        Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
256                    }
257                    _ => unreachable!(),
258                })
259            }
260            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
261        }
262    }
263
264    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
265    /// as enumerated by a depth-first search
266    fn finish(self) -> Vec<ArrayLevels> {
267        match self {
268            LevelInfoBuilder::Primitive(v) => vec![v],
269            LevelInfoBuilder::List(v, _, _, _)
270            | LevelInfoBuilder::LargeList(v, _, _, _)
271            | LevelInfoBuilder::FixedSizeList(v, _, _, _)
272            | LevelInfoBuilder::ListView(v, _, _, _, _)
273            | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
274            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
275        }
276    }
277
278    /// Given an `array`, write the level data for the elements in `range`
279    fn write(&mut self, range: Range<usize>) {
280        match self {
281            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
282            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
283                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
284            }
285            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
286                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
287            }
288            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
289                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
290            }
291            LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
292                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
293            }
294            LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
295                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
296            }
297            LevelInfoBuilder::Struct(children, ctx, nulls) => {
298                Self::write_struct(children, ctx, nulls.as_ref(), range)
299            }
300        }
301    }
302
303    /// Write `range` elements from ListArray `array`
304    ///
305    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
306    fn write_list<O: OffsetSizeTrait>(
307        child: &mut LevelInfoBuilder,
308        ctx: &LevelContext,
309        offsets: &[O],
310        nulls: Option<&NullBuffer>,
311        range: Range<usize>,
312    ) {
313        let offsets = &offsets[range.start..range.end + 1];
314
315        let write_non_null_slice =
316            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
317                child.write(start_idx..end_idx);
318                child.visit_leaves(|leaf| {
319                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
320                    let mut rev = rep_levels.iter_mut().rev();
321                    let mut remaining = end_idx - start_idx;
322
323                    loop {
324                        let next = rev.next().unwrap();
325                        if *next > ctx.rep_level {
326                            // Nested element - ignore
327                            continue;
328                        }
329
330                        remaining -= 1;
331                        if remaining == 0 {
332                            *next = ctx.rep_level - 1;
333                            break;
334                        }
335                    }
336                })
337            };
338
339        let write_empty_slice = |child: &mut LevelInfoBuilder| {
340            child.visit_leaves(|leaf| {
341                let rep_levels = leaf.rep_levels.as_mut().unwrap();
342                rep_levels.push(ctx.rep_level - 1);
343                let def_levels = leaf.def_levels.as_mut().unwrap();
344                def_levels.push(ctx.def_level - 1);
345            })
346        };
347
348        let write_null_slice = |child: &mut LevelInfoBuilder| {
349            child.visit_leaves(|leaf| {
350                let rep_levels = leaf.rep_levels.as_mut().unwrap();
351                rep_levels.push(ctx.rep_level - 1);
352                let def_levels = leaf.def_levels.as_mut().unwrap();
353                def_levels.push(ctx.def_level - 2);
354            })
355        };
356
357        match nulls {
358            Some(nulls) => {
359                let null_offset = range.start;
360                // TODO: Faster bitmask iteration (#1757)
361                for (idx, w) in offsets.windows(2).enumerate() {
362                    let is_valid = nulls.is_valid(idx + null_offset);
363                    let start_idx = w[0].as_usize();
364                    let end_idx = w[1].as_usize();
365                    if !is_valid {
366                        write_null_slice(child)
367                    } else if start_idx == end_idx {
368                        write_empty_slice(child)
369                    } else {
370                        write_non_null_slice(child, start_idx, end_idx)
371                    }
372                }
373            }
374            None => {
375                for w in offsets.windows(2) {
376                    let start_idx = w[0].as_usize();
377                    let end_idx = w[1].as_usize();
378                    if start_idx == end_idx {
379                        write_empty_slice(child)
380                    } else {
381                        write_non_null_slice(child, start_idx, end_idx)
382                    }
383                }
384            }
385        }
386    }
387
388    /// Write `range` elements from ListViewArray `array`
389    fn write_list_view<O: OffsetSizeTrait>(
390        child: &mut LevelInfoBuilder,
391        ctx: &LevelContext,
392        offsets: &[O],
393        sizes: &[O],
394        nulls: Option<&NullBuffer>,
395        range: Range<usize>,
396    ) {
397        let offsets = &offsets[range.start..range.end];
398        let sizes = &sizes[range.start..range.end];
399
400        let write_non_null_slice =
401            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
402                child.write(start_idx..end_idx);
403                child.visit_leaves(|leaf| {
404                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
405                    let mut rev = rep_levels.iter_mut().rev();
406                    let mut remaining = end_idx - start_idx;
407
408                    loop {
409                        let next = rev.next().unwrap();
410                        if *next > ctx.rep_level {
411                            // Nested element - ignore
412                            continue;
413                        }
414
415                        remaining -= 1;
416                        if remaining == 0 {
417                            *next = ctx.rep_level - 1;
418                            break;
419                        }
420                    }
421                })
422            };
423
424        let write_empty_slice = |child: &mut LevelInfoBuilder| {
425            child.visit_leaves(|leaf| {
426                let rep_levels = leaf.rep_levels.as_mut().unwrap();
427                rep_levels.push(ctx.rep_level - 1);
428                let def_levels = leaf.def_levels.as_mut().unwrap();
429                def_levels.push(ctx.def_level - 1);
430            })
431        };
432
433        let write_null_slice = |child: &mut LevelInfoBuilder| {
434            child.visit_leaves(|leaf| {
435                let rep_levels = leaf.rep_levels.as_mut().unwrap();
436                rep_levels.push(ctx.rep_level - 1);
437                let def_levels = leaf.def_levels.as_mut().unwrap();
438                def_levels.push(ctx.def_level - 2);
439            })
440        };
441
442        match nulls {
443            Some(nulls) => {
444                let null_offset = range.start;
445                // TODO: Faster bitmask iteration (#1757)
446                for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
447                    let is_valid = nulls.is_valid(idx + null_offset);
448                    let start_idx = offset.as_usize();
449                    let size = size.as_usize();
450                    let end_idx = start_idx + size;
451                    if !is_valid {
452                        write_null_slice(child)
453                    } else if size == 0 {
454                        write_empty_slice(child)
455                    } else {
456                        write_non_null_slice(child, start_idx, end_idx)
457                    }
458                }
459            }
460            None => {
461                for (offset, size) in offsets.iter().zip(sizes.iter()) {
462                    let start_idx = offset.as_usize();
463                    let size = size.as_usize();
464                    let end_idx = start_idx + size;
465                    if size == 0 {
466                        write_empty_slice(child)
467                    } else {
468                        write_non_null_slice(child, start_idx, end_idx)
469                    }
470                }
471            }
472        }
473    }
474
475    /// Write `range` elements from StructArray `array`
476    fn write_struct(
477        children: &mut [LevelInfoBuilder],
478        ctx: &LevelContext,
479        nulls: Option<&NullBuffer>,
480        range: Range<usize>,
481    ) {
482        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
483            for child in children {
484                child.visit_leaves(|info| {
485                    let len = range.end - range.start;
486
487                    let def_levels = info.def_levels.as_mut().unwrap();
488                    def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
489
490                    if let Some(rep_levels) = info.rep_levels.as_mut() {
491                        rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
492                    }
493                })
494            }
495        };
496
497        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
498            for child in children {
499                child.write(range.clone())
500            }
501        };
502
503        match nulls {
504            Some(validity) => {
505                let mut last_non_null_idx = None;
506                let mut last_null_idx = None;
507
508                // TODO: Faster bitmask iteration (#1757)
509                for i in range.clone() {
510                    match validity.is_valid(i) {
511                        true => {
512                            if let Some(last_idx) = last_null_idx.take() {
513                                write_null(children, last_idx..i)
514                            }
515                            last_non_null_idx.get_or_insert(i);
516                        }
517                        false => {
518                            if let Some(last_idx) = last_non_null_idx.take() {
519                                write_non_null(children, last_idx..i)
520                            }
521                            last_null_idx.get_or_insert(i);
522                        }
523                    }
524                }
525
526                if let Some(last_idx) = last_null_idx.take() {
527                    write_null(children, last_idx..range.end)
528                }
529
530                if let Some(last_idx) = last_non_null_idx.take() {
531                    write_non_null(children, last_idx..range.end)
532                }
533            }
534            None => write_non_null(children, range),
535        }
536    }
537
538    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
539    fn write_fixed_size_list(
540        child: &mut LevelInfoBuilder,
541        ctx: &LevelContext,
542        fixed_size: usize,
543        nulls: Option<&NullBuffer>,
544        range: Range<usize>,
545    ) {
546        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
547            let values_start = start_idx * fixed_size;
548            let values_end = end_idx * fixed_size;
549            child.write(values_start..values_end);
550
551            child.visit_leaves(|leaf| {
552                let rep_levels = leaf.rep_levels.as_mut().unwrap();
553
554                let row_indices = (0..fixed_size)
555                    .rev()
556                    .cycle()
557                    .take(values_end - values_start);
558
559                // Step backward over the child rep levels and mark the start of each list
560                rep_levels
561                    .iter_mut()
562                    .rev()
563                    // Filter out reps from nested children
564                    .filter(|&&mut r| r == ctx.rep_level)
565                    .zip(row_indices)
566                    .for_each(|(r, idx)| {
567                        if idx == 0 {
568                            *r = ctx.rep_level - 1;
569                        }
570                    });
571            })
572        };
573
574        // If list size is 0, ignore values and just write rep/def levels.
575        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
576            let len = end_idx - start_idx;
577            child.visit_leaves(|leaf| {
578                let rep_levels = leaf.rep_levels.as_mut().unwrap();
579                rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
580                let def_levels = leaf.def_levels.as_mut().unwrap();
581                def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
582            })
583        };
584
585        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
586            if fixed_size > 0 {
587                write_non_null(child, start_idx, end_idx)
588            } else {
589                write_empty(child, start_idx, end_idx)
590            }
591        };
592
593        match nulls {
594            Some(nulls) => {
595                let mut start_idx = None;
596                for idx in range.clone() {
597                    if nulls.is_valid(idx) {
598                        // Start a run of valid rows if not already inside of one
599                        start_idx.get_or_insert(idx);
600                    } else {
601                        // Write out any pending valid rows
602                        if let Some(start) = start_idx.take() {
603                            write_rows(child, start, idx);
604                        }
605                        // Add null row
606                        child.visit_leaves(|leaf| {
607                            let rep_levels = leaf.rep_levels.as_mut().unwrap();
608                            rep_levels.push(ctx.rep_level - 1);
609                            let def_levels = leaf.def_levels.as_mut().unwrap();
610                            def_levels.push(ctx.def_level - 2);
611                        })
612                    }
613                }
614                // Write out any remaining valid rows
615                if let Some(start) = start_idx.take() {
616                    write_rows(child, start, range.end);
617                }
618            }
619            // If all rows are valid then write the whole array
620            None => write_rows(child, range.start, range.end),
621        }
622    }
623
624    /// Write a primitive array, as defined by [`is_leaf`]
625    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
626        let len = range.end - range.start;
627
628        match &mut info.def_levels {
629            Some(def_levels) => {
630                def_levels.reserve(len);
631                info.non_null_indices.reserve(len);
632
633                match &info.logical_nulls {
634                    Some(nulls) => {
635                        assert!(range.end <= nulls.len());
636                        let nulls = nulls.inner();
637                        def_levels.extend(range.clone().map(|i| {
638                            // Safety: range.end was asserted to be in bounds earlier
639                            let valid = unsafe { nulls.value_unchecked(i) };
640                            info.max_def_level - (!valid as i16)
641                        }));
642                        info.non_null_indices.extend(
643                            BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
644                                .map(|i| i + range.start),
645                        );
646                    }
647                    None => {
648                        let iter = std::iter::repeat_n(info.max_def_level, len);
649                        def_levels.extend(iter);
650                        info.non_null_indices.extend(range);
651                    }
652                }
653            }
654            None => info.non_null_indices.extend(range),
655        }
656
657        if let Some(rep_levels) = &mut info.rep_levels {
658            rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
659        }
660    }
661
662    /// Visits all children of this node in depth first order
663    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
664        match self {
665            LevelInfoBuilder::Primitive(info) => visit(info),
666            LevelInfoBuilder::List(c, _, _, _)
667            | LevelInfoBuilder::LargeList(c, _, _, _)
668            | LevelInfoBuilder::FixedSizeList(c, _, _, _)
669            | LevelInfoBuilder::ListView(c, _, _, _, _)
670            | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
671            LevelInfoBuilder::Struct(children, _, _) => {
672                for c in children {
673                    c.visit_leaves(visit)
674                }
675            }
676        }
677    }
678
679    /// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
680    ///
681    /// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
682    /// and the other is a native array, the dictionary values must have the same type as the
683    /// native array
684    fn types_compatible(a: &DataType, b: &DataType) -> bool {
685        // if the Arrow data types are equal, the types are deemed compatible
686        if a.equals_datatype(b) {
687            return true;
688        }
689
690        // get the values out of the dictionaries
691        let (a, b) = match (a, b) {
692            (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
693                (va.as_ref(), vb.as_ref())
694            }
695            (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
696            (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
697            _ => (a, b),
698        };
699
700        // now that we've got the values from one/both dictionaries, if the values
701        // have the same Arrow data type, they're compatible
702        if a == b {
703            return true;
704        }
705
706        // here we have different Arrow data types, but if the array contains the same type of data
707        // then we consider the type compatible
708        match a {
709            // String, StringView and LargeString are compatible
710            DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
711            DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
712            DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
713
714            // Binary, BinaryView and LargeBinary are compatible
715            DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
716            DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
717            DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
718
719            // otherwise we have incompatible types
720            _ => false,
721        }
722    }
723}
724
725/// The data necessary to write a primitive Arrow array to parquet, taking into account
726/// any non-primitive parents it may have in the arrow representation
727#[derive(Debug, Clone)]
728pub(crate) struct ArrayLevels {
729    /// Array's definition levels
730    ///
731    /// Present if `max_def_level != 0`
732    def_levels: Option<Vec<i16>>,
733
734    /// Array's optional repetition levels
735    ///
736    /// Present if `max_rep_level != 0`
737    rep_levels: Option<Vec<i16>>,
738
739    /// The corresponding array identifying non-null slices of data
740    /// from the primitive array
741    non_null_indices: Vec<usize>,
742
743    /// The maximum definition level for this leaf column
744    max_def_level: i16,
745
746    /// The maximum repetition for this leaf column
747    max_rep_level: i16,
748
749    /// The arrow array
750    array: ArrayRef,
751
752    /// cached logical nulls of the array.
753    logical_nulls: Option<NullBuffer>,
754}
755
756impl PartialEq for ArrayLevels {
757    fn eq(&self, other: &Self) -> bool {
758        self.def_levels == other.def_levels
759            && self.rep_levels == other.rep_levels
760            && self.non_null_indices == other.non_null_indices
761            && self.max_def_level == other.max_def_level
762            && self.max_rep_level == other.max_rep_level
763            && self.array.as_ref() == other.array.as_ref()
764            && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
765    }
766}
767impl Eq for ArrayLevels {}
768
769impl ArrayLevels {
770    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
771        let max_rep_level = ctx.rep_level;
772        let max_def_level = match is_nullable {
773            true => ctx.def_level + 1,
774            false => ctx.def_level,
775        };
776
777        let logical_nulls = array.logical_nulls();
778
779        Self {
780            def_levels: (max_def_level != 0).then(Vec::new),
781            rep_levels: (max_rep_level != 0).then(Vec::new),
782            non_null_indices: vec![],
783            max_def_level,
784            max_rep_level,
785            array,
786            logical_nulls,
787        }
788    }
789
790    pub fn array(&self) -> &ArrayRef {
791        &self.array
792    }
793
794    pub fn def_levels(&self) -> Option<&[i16]> {
795        self.def_levels.as_deref()
796    }
797
798    pub fn rep_levels(&self) -> Option<&[i16]> {
799        self.rep_levels.as_deref()
800    }
801
802    pub fn non_null_indices(&self) -> &[usize] {
803        &self.non_null_indices
804    }
805
806    /// Create a sliced view of this `ArrayLevels` for a CDC chunk.
807    ///
808    /// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied (not zero-copy),
809    /// while `array` is sliced without copying.
810    pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
811        let level_offset = chunk.level_offset;
812        let num_levels = chunk.num_levels;
813        let value_offset = chunk.value_offset;
814        let num_values = chunk.num_values;
815        let def_levels = self
816            .def_levels
817            .as_ref()
818            .map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
819        let rep_levels = self
820            .rep_levels
821            .as_ref()
822            .map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
823
824        // Filter non_null_indices to [value_offset, value_offset + num_values)
825        // and shift by -value_offset. Use binary search since the slice is sorted.
826        let value_end = value_offset + num_values;
827        let start = self
828            .non_null_indices
829            .partition_point(|&idx| idx < value_offset);
830        let end = self
831            .non_null_indices
832            .partition_point(|&idx| idx < value_end);
833        let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
834            .iter()
835            .map(|&idx| idx - value_offset)
836            .collect();
837
838        let array = self.array.slice(value_offset, num_values);
839        let logical_nulls = array.logical_nulls();
840
841        Self {
842            def_levels,
843            rep_levels,
844            non_null_indices,
845            max_def_level: self.max_def_level,
846            max_rep_level: self.max_rep_level,
847            array,
848            logical_nulls,
849        }
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use super::*;
856    use crate::column::chunker::CdcChunk;
857
858    use arrow_array::builder::*;
859    use arrow_array::types::Int32Type;
860    use arrow_array::*;
861    use arrow_buffer::{Buffer, ToByteSlice};
862    use arrow_cast::display::array_value_to_string;
863    use arrow_data::{ArrayData, ArrayDataBuilder};
864    use arrow_schema::{Fields, Schema};
865
866    #[test]
867    fn test_calculate_array_levels_twitter_example() {
868        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
869        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
870
871        let leaf_type = Field::new_list_field(DataType::Int32, false);
872        let inner_type = DataType::List(Arc::new(leaf_type));
873        let inner_field = Field::new("l2", inner_type.clone(), false);
874        let outer_type = DataType::List(Arc::new(inner_field));
875        let outer_field = Field::new("l1", outer_type.clone(), false);
876
877        let primitives = Int32Array::from_iter(0..10);
878
879        // Cannot use from_iter_primitive as always infers nullable
880        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
881        let inner_list = ArrayDataBuilder::new(inner_type)
882            .len(4)
883            .add_buffer(offsets)
884            .add_child_data(primitives.to_data())
885            .build()
886            .unwrap();
887
888        let offsets = Buffer::from_iter([0_i32, 2, 4]);
889        let outer_list = ArrayDataBuilder::new(outer_type)
890            .len(2)
891            .add_buffer(offsets)
892            .add_child_data(inner_list)
893            .build()
894            .unwrap();
895        let outer_list = make_array(outer_list);
896
897        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
898        assert_eq!(levels.len(), 1);
899
900        let expected = ArrayLevels {
901            def_levels: Some(vec![2; 10]),
902            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
903            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
904            max_def_level: 2,
905            max_rep_level: 2,
906            array: Arc::new(primitives),
907            logical_nulls: None,
908        };
909        assert_eq!(&levels[0], &expected);
910    }
911
912    #[test]
913    fn test_calculate_one_level_1() {
914        // This test calculates the levels for a non-null primitive array
915        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
916        let field = Field::new_list_field(DataType::Int32, false);
917
918        let levels = calculate_array_levels(&array, &field).unwrap();
919        assert_eq!(levels.len(), 1);
920
921        let expected_levels = ArrayLevels {
922            def_levels: None,
923            rep_levels: None,
924            non_null_indices: (0..10).collect(),
925            max_def_level: 0,
926            max_rep_level: 0,
927            array,
928            logical_nulls: None,
929        };
930        assert_eq!(&levels[0], &expected_levels);
931    }
932
933    #[test]
934    fn test_calculate_one_level_2() {
935        // This test calculates the levels for a nullable primitive array
936        let array = Arc::new(Int32Array::from_iter([
937            Some(0),
938            None,
939            Some(0),
940            Some(0),
941            None,
942        ])) as ArrayRef;
943        let field = Field::new_list_field(DataType::Int32, true);
944
945        let levels = calculate_array_levels(&array, &field).unwrap();
946        assert_eq!(levels.len(), 1);
947
948        let logical_nulls = array.logical_nulls();
949        let expected_levels = ArrayLevels {
950            def_levels: Some(vec![1, 0, 1, 1, 0]),
951            rep_levels: None,
952            non_null_indices: vec![0, 2, 3],
953            max_def_level: 1,
954            max_rep_level: 0,
955            array,
956            logical_nulls,
957        };
958        assert_eq!(&levels[0], &expected_levels);
959    }
960
961    #[test]
962    fn test_calculate_array_levels_1() {
963        let leaf_field = Field::new_list_field(DataType::Int32, false);
964        let list_type = DataType::List(Arc::new(leaf_field));
965
966        // if all array values are defined (e.g. batch<list<_>>)
967        // [[0], [1], [2], [3], [4]]
968
969        let leaf_array = Int32Array::from_iter(0..5);
970        // Cannot use from_iter_primitive as always infers nullable
971        let offsets = Buffer::from_iter(0_i32..6);
972        let list = ArrayDataBuilder::new(list_type.clone())
973            .len(5)
974            .add_buffer(offsets)
975            .add_child_data(leaf_array.to_data())
976            .build()
977            .unwrap();
978        let list = make_array(list);
979
980        let list_field = Field::new("list", list_type.clone(), false);
981        let levels = calculate_array_levels(&list, &list_field).unwrap();
982        assert_eq!(levels.len(), 1);
983
984        let expected_levels = ArrayLevels {
985            def_levels: Some(vec![1; 5]),
986            rep_levels: Some(vec![0; 5]),
987            non_null_indices: (0..5).collect(),
988            max_def_level: 1,
989            max_rep_level: 1,
990            array: Arc::new(leaf_array),
991            logical_nulls: None,
992        };
993        assert_eq!(&levels[0], &expected_levels);
994
995        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
996        // all values are defined as we do not have nulls on the root (batch)
997        // repetition:
998        //   0: 0, 1
999        //   1: 0
1000        //   2: 0, 1
1001        //   3: 0, 1, 1, 1
1002        //   4: 0, 1, 1
1003        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
1004        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
1005        let list = ArrayDataBuilder::new(list_type.clone())
1006            .len(5)
1007            .add_buffer(offsets)
1008            .add_child_data(leaf_array.to_data())
1009            .null_bit_buffer(Some(Buffer::from([0b00011101])))
1010            .build()
1011            .unwrap();
1012        let list = make_array(list);
1013
1014        let list_field = Field::new("list", list_type, true);
1015        let levels = calculate_array_levels(&list, &list_field).unwrap();
1016        assert_eq!(levels.len(), 1);
1017
1018        let expected_levels = ArrayLevels {
1019            def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1020            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1021            non_null_indices: (0..11).collect(),
1022            max_def_level: 2,
1023            max_rep_level: 1,
1024            array: Arc::new(leaf_array),
1025            logical_nulls: None,
1026        };
1027        assert_eq!(&levels[0], &expected_levels);
1028    }
1029
1030    #[test]
1031    fn test_calculate_array_levels_2() {
1032        // If some values are null
1033        //
1034        // This emulates an array in the form: <struct<list<?>>
1035        // with values:
1036        // - 0: [0, 1], but is null because of the struct
1037        // - 1: []
1038        // - 2: [2, 3], but is null because of the struct
1039        // - 3: [4, 5, 6, 7]
1040        // - 4: [8, 9, 10]
1041        //
1042        // If the first values of a list are null due to a parent, we have to still account for them
1043        // while indexing, because they would affect the way the child is indexed
1044        // i.e. in the above example, we have to know that [0, 1] has to be skipped
1045        let leaf = Int32Array::from_iter(0..11);
1046        let leaf_field = Field::new("leaf", DataType::Int32, false);
1047
1048        let list_type = DataType::List(Arc::new(leaf_field));
1049        let list = ArrayData::builder(list_type.clone())
1050            .len(5)
1051            .add_child_data(leaf.to_data())
1052            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1053            .build()
1054            .unwrap();
1055
1056        let list = make_array(list);
1057        let list_field = Arc::new(Field::new("list", list_type, true));
1058
1059        let struct_array =
1060            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1061        let array = Arc::new(struct_array) as ArrayRef;
1062
1063        let struct_field = Field::new("struct", array.data_type().clone(), true);
1064
1065        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1066        assert_eq!(levels.len(), 1);
1067
1068        let expected_levels = ArrayLevels {
1069            def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1070            rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1071            non_null_indices: (4..11).collect(),
1072            max_def_level: 3,
1073            max_rep_level: 1,
1074            array: Arc::new(leaf),
1075            logical_nulls: None,
1076        };
1077
1078        assert_eq!(&levels[0], &expected_levels);
1079
1080        // nested lists
1081
1082        // 0: [[100, 101], [102, 103]]
1083        // 1: []
1084        // 2: [[104, 105], [106, 107]]
1085        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
1086        // 4: [[116, 117], [118, 119], [120, 121]]
1087
1088        let leaf = Int32Array::from_iter(100..122);
1089        let leaf_field = Field::new("leaf", DataType::Int32, true);
1090
1091        let l1_type = DataType::List(Arc::new(leaf_field));
1092        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1093        let l1 = ArrayData::builder(l1_type.clone())
1094            .len(11)
1095            .add_child_data(leaf.to_data())
1096            .add_buffer(offsets)
1097            .build()
1098            .unwrap();
1099
1100        let l1_field = Field::new("l1", l1_type, true);
1101        let l2_type = DataType::List(Arc::new(l1_field));
1102        let l2 = ArrayData::builder(l2_type)
1103            .len(5)
1104            .add_child_data(l1)
1105            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1106            .build()
1107            .unwrap();
1108
1109        let l2 = make_array(l2);
1110        let l2_field = Field::new("l2", l2.data_type().clone(), true);
1111
1112        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1113        assert_eq!(levels.len(), 1);
1114
1115        let expected_levels = ArrayLevels {
1116            def_levels: Some(vec![
1117                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1118            ]),
1119            rep_levels: Some(vec![
1120                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1121            ]),
1122            non_null_indices: (0..22).collect(),
1123            max_def_level: 5,
1124            max_rep_level: 2,
1125            array: Arc::new(leaf),
1126            logical_nulls: None,
1127        };
1128
1129        assert_eq!(&levels[0], &expected_levels);
1130    }
1131
1132    #[test]
1133    fn test_calculate_array_levels_nested_list() {
1134        let leaf_field = Field::new("leaf", DataType::Int32, false);
1135        let list_type = DataType::List(Arc::new(leaf_field));
1136
1137        // if all array values are defined (e.g. batch<list<_>>)
1138        // The array at this level looks like:
1139        // 0: [a]
1140        // 1: [a]
1141        // 2: [a]
1142        // 3: [a]
1143
1144        let leaf = Int32Array::from_iter([0; 4]);
1145        let list = ArrayData::builder(list_type.clone())
1146            .len(4)
1147            .add_buffer(Buffer::from_iter(0_i32..5))
1148            .add_child_data(leaf.to_data())
1149            .build()
1150            .unwrap();
1151        let list = make_array(list);
1152
1153        let list_field = Field::new("list", list_type.clone(), false);
1154        let levels = calculate_array_levels(&list, &list_field).unwrap();
1155        assert_eq!(levels.len(), 1);
1156
1157        let expected_levels = ArrayLevels {
1158            def_levels: Some(vec![1; 4]),
1159            rep_levels: Some(vec![0; 4]),
1160            non_null_indices: (0..4).collect(),
1161            max_def_level: 1,
1162            max_rep_level: 1,
1163            array: Arc::new(leaf),
1164            logical_nulls: None,
1165        };
1166        assert_eq!(&levels[0], &expected_levels);
1167
1168        // 0: null
1169        // 1: [1, 2, 3]
1170        // 2: [4, 5]
1171        // 3: [6, 7]
1172        let leaf = Int32Array::from_iter(0..8);
1173        let list = ArrayData::builder(list_type.clone())
1174            .len(4)
1175            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1176            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1177            .add_child_data(leaf.to_data())
1178            .build()
1179            .unwrap();
1180        let list = make_array(list);
1181        let list_field = Arc::new(Field::new("list", list_type, true));
1182
1183        let struct_array = StructArray::from(vec![(list_field, list)]);
1184        let array = Arc::new(struct_array) as ArrayRef;
1185
1186        let struct_field = Field::new("struct", array.data_type().clone(), true);
1187        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1188        assert_eq!(levels.len(), 1);
1189
1190        let expected_levels = ArrayLevels {
1191            def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1192            rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1193            non_null_indices: (0..7).collect(),
1194            max_def_level: 3,
1195            max_rep_level: 1,
1196            array: Arc::new(leaf),
1197            logical_nulls: None,
1198        };
1199        assert_eq!(&levels[0], &expected_levels);
1200
1201        // nested lists
1202        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
1203        // 0: {"struct": null }
1204        // 1: {"struct": [ [201], [202, 203], [] ]}
1205        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
1206        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
1207
1208        let leaf = Int32Array::from_iter(201..216);
1209        let leaf_field = Field::new("leaf", DataType::Int32, false);
1210        let list_1_type = DataType::List(Arc::new(leaf_field));
1211        let list_1 = ArrayData::builder(list_1_type.clone())
1212            .len(7)
1213            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1214            .add_child_data(leaf.to_data())
1215            .build()
1216            .unwrap();
1217
1218        let list_1_field = Field::new("l1", list_1_type, true);
1219        let list_2_type = DataType::List(Arc::new(list_1_field));
1220        let list_2 = ArrayData::builder(list_2_type.clone())
1221            .len(4)
1222            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1223            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1224            .add_child_data(list_1)
1225            .build()
1226            .unwrap();
1227
1228        let list_2 = make_array(list_2);
1229        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1230
1231        let struct_array =
1232            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1233        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1234
1235        let array = Arc::new(struct_array) as ArrayRef;
1236        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1237        assert_eq!(levels.len(), 1);
1238
1239        let expected_levels = ArrayLevels {
1240            def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1241            rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1242            non_null_indices: (0..15).collect(),
1243            max_def_level: 5,
1244            max_rep_level: 2,
1245            array: Arc::new(leaf),
1246            logical_nulls: None,
1247        };
1248        assert_eq!(&levels[0], &expected_levels);
1249    }
1250
1251    #[test]
1252    fn test_calculate_nested_struct_levels() {
1253        // tests a <struct[a]<struct[b]<int[c]>>
1254        // array:
1255        //  - {a: {b: {c: 1}}}
1256        //  - {a: {b: {c: null}}}
1257        //  - {a: {b: {c: 3}}}
1258        //  - {a: {b: null}}
1259        //  - {a: null}}
1260        //  - {a: {b: {c: 6}}}
1261
1262        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1263        let leaf = Arc::new(c) as ArrayRef;
1264        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1265        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1266
1267        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1268        let a = StructArray::from((
1269            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1270            Buffer::from([0b00101111]),
1271        ));
1272
1273        let a_field = Field::new("a", a.data_type().clone(), true);
1274        let a_array = Arc::new(a) as ArrayRef;
1275
1276        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1277        assert_eq!(levels.len(), 1);
1278
1279        let logical_nulls = leaf.logical_nulls();
1280        let expected_levels = ArrayLevels {
1281            def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1282            rep_levels: None,
1283            non_null_indices: vec![0, 2, 5],
1284            max_def_level: 3,
1285            max_rep_level: 0,
1286            array: leaf,
1287            logical_nulls,
1288        };
1289        assert_eq!(&levels[0], &expected_levels);
1290    }
1291
1292    #[test]
1293    fn list_single_column() {
1294        // this tests the level generation from the arrow_writer equivalent test
1295
1296        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1297        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1298        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1299        let a_list_data = ArrayData::builder(a_list_type.clone())
1300            .len(5)
1301            .add_buffer(a_value_offsets)
1302            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1303            .add_child_data(a_values.to_data())
1304            .build()
1305            .unwrap();
1306
1307        assert_eq!(a_list_data.null_count(), 1);
1308
1309        let a = ListArray::from(a_list_data);
1310
1311        let item_field = Field::new_list_field(a_list_type, true);
1312        let mut builder = levels(&item_field, a);
1313        builder.write(2..4);
1314        let levels = builder.finish();
1315
1316        assert_eq!(levels.len(), 1);
1317
1318        let list_level = &levels[0];
1319
1320        let expected_level = ArrayLevels {
1321            def_levels: Some(vec![0, 3, 3, 3]),
1322            rep_levels: Some(vec![0, 0, 1, 1]),
1323            non_null_indices: vec![3, 4, 5],
1324            max_def_level: 3,
1325            max_rep_level: 1,
1326            array: Arc::new(a_values),
1327            logical_nulls: None,
1328        };
1329        assert_eq!(list_level, &expected_level);
1330    }
1331
1332    #[test]
1333    fn mixed_struct_list() {
1334        // this tests the level generation from the equivalent arrow_writer_complex test
1335
1336        // define schema
1337        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1338        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1339        let struct_field_g = Arc::new(Field::new(
1340            "g",
1341            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1342            false,
1343        ));
1344        let struct_field_e = Arc::new(Field::new(
1345            "e",
1346            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1347            true,
1348        ));
1349        let schema = Schema::new(vec![
1350            Field::new("a", DataType::Int32, false),
1351            Field::new("b", DataType::Int32, true),
1352            Field::new(
1353                "c",
1354                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1355                true, // https://github.com/apache/arrow-rs/issues/245
1356            ),
1357        ]);
1358
1359        // create some data
1360        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1361        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1362        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1363        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1364
1365        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1366
1367        // Construct a buffer for value offsets, for the nested array:
1368        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1369        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1370
1371        // Construct a list array from the above two
1372        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1373            .len(5)
1374            .add_buffer(g_value_offsets)
1375            .add_child_data(g_value.into_data())
1376            .build()
1377            .unwrap();
1378        let g = ListArray::from(g_list_data);
1379
1380        let e = StructArray::from(vec![
1381            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1382            (struct_field_g, Arc::new(g) as ArrayRef),
1383        ]);
1384
1385        let c = StructArray::from(vec![
1386            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1387            (struct_field_e, Arc::new(e) as ArrayRef),
1388        ]);
1389
1390        // build a record batch
1391        let batch = RecordBatch::try_new(
1392            Arc::new(schema),
1393            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1394        )
1395        .unwrap();
1396
1397        //////////////////////////////////////////////
1398        // calculate the list's level
1399        let mut levels = vec![];
1400        batch
1401            .columns()
1402            .iter()
1403            .zip(batch.schema().fields())
1404            .for_each(|(array, field)| {
1405                let mut array_levels = calculate_array_levels(array, field).unwrap();
1406                levels.append(&mut array_levels);
1407            });
1408        assert_eq!(levels.len(), 5);
1409
1410        // test "a" levels
1411        let list_level = &levels[0];
1412
1413        let expected_level = ArrayLevels {
1414            def_levels: None,
1415            rep_levels: None,
1416            non_null_indices: vec![0, 1, 2, 3, 4],
1417            max_def_level: 0,
1418            max_rep_level: 0,
1419            array: Arc::new(a),
1420            logical_nulls: None,
1421        };
1422        assert_eq!(list_level, &expected_level);
1423
1424        // test "b" levels
1425        let list_level = levels.get(1).unwrap();
1426
1427        let b_logical_nulls = b.logical_nulls();
1428        let expected_level = ArrayLevels {
1429            def_levels: Some(vec![1, 0, 0, 1, 1]),
1430            rep_levels: None,
1431            non_null_indices: vec![0, 3, 4],
1432            max_def_level: 1,
1433            max_rep_level: 0,
1434            array: Arc::new(b),
1435            logical_nulls: b_logical_nulls,
1436        };
1437        assert_eq!(list_level, &expected_level);
1438
1439        // test "d" levels
1440        let list_level = levels.get(2).unwrap();
1441
1442        let d_logical_nulls = d.logical_nulls();
1443        let expected_level = ArrayLevels {
1444            def_levels: Some(vec![1, 1, 1, 2, 1]),
1445            rep_levels: None,
1446            non_null_indices: vec![3],
1447            max_def_level: 2,
1448            max_rep_level: 0,
1449            array: Arc::new(d),
1450            logical_nulls: d_logical_nulls,
1451        };
1452        assert_eq!(list_level, &expected_level);
1453
1454        // test "f" levels
1455        let list_level = levels.get(3).unwrap();
1456
1457        let f_logical_nulls = f.logical_nulls();
1458        let expected_level = ArrayLevels {
1459            def_levels: Some(vec![3, 2, 3, 2, 3]),
1460            rep_levels: None,
1461            non_null_indices: vec![0, 2, 4],
1462            max_def_level: 3,
1463            max_rep_level: 0,
1464            array: Arc::new(f),
1465            logical_nulls: f_logical_nulls,
1466        };
1467        assert_eq!(list_level, &expected_level);
1468    }
1469
1470    #[test]
1471    fn test_null_vs_nonnull_struct() {
1472        // define schema
1473        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1474        let schema = Schema::new(vec![Field::new(
1475            "some_nested_object",
1476            DataType::Struct(vec![offset_field.clone()].into()),
1477            false,
1478        )]);
1479
1480        // create some data
1481        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1482
1483        let some_nested_object =
1484            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1485
1486        // build a record batch
1487        let batch =
1488            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1489
1490        let struct_null_level =
1491            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1492
1493        // create second batch
1494        // define schema
1495        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1496        let schema = Schema::new(vec![Field::new(
1497            "some_nested_object",
1498            DataType::Struct(vec![offset_field.clone()].into()),
1499            true,
1500        )]);
1501
1502        // create some data
1503        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1504
1505        let some_nested_object =
1506            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1507
1508        // build a record batch
1509        let batch =
1510            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1511
1512        let struct_non_null_level =
1513            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1514
1515        // The 2 levels should not be the same
1516        if struct_non_null_level == struct_null_level {
1517            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1518        }
1519    }
1520
1521    #[test]
1522    fn test_map_array() {
1523        // Note: we are using the JSON Arrow reader for brevity
1524        let json_content = r#"
1525        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1526        {"stocks":{"long": "$CCC", "short": null}}
1527        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1528        "#;
1529        let entries_struct_type = DataType::Struct(Fields::from(vec![
1530            Field::new("key", DataType::Utf8, false),
1531            Field::new("value", DataType::Utf8, true),
1532        ]));
1533        let stocks_field = Field::new(
1534            "stocks",
1535            DataType::Map(
1536                Arc::new(Field::new("entries", entries_struct_type, false)),
1537                false,
1538            ),
1539            // not nullable, so the keys have max level = 1
1540            false,
1541        );
1542        let schema = Arc::new(Schema::new(vec![stocks_field]));
1543        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1544        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1545
1546        let batch = reader.next().unwrap().unwrap();
1547
1548        // calculate the map's level
1549        let mut levels = vec![];
1550        batch
1551            .columns()
1552            .iter()
1553            .zip(batch.schema().fields())
1554            .for_each(|(array, field)| {
1555                let mut array_levels = calculate_array_levels(array, field).unwrap();
1556                levels.append(&mut array_levels);
1557            });
1558        assert_eq!(levels.len(), 2);
1559
1560        let map = batch.column(0).as_map();
1561        let map_keys_logical_nulls = map.keys().logical_nulls();
1562
1563        // test key levels
1564        let list_level = &levels[0];
1565
1566        let expected_level = ArrayLevels {
1567            def_levels: Some(vec![1; 7]),
1568            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1569            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1570            max_def_level: 1,
1571            max_rep_level: 1,
1572            array: map.keys().clone(),
1573            logical_nulls: map_keys_logical_nulls,
1574        };
1575        assert_eq!(list_level, &expected_level);
1576
1577        // test values levels
1578        let list_level = levels.get(1).unwrap();
1579        let map_values_logical_nulls = map.values().logical_nulls();
1580
1581        let expected_level = ArrayLevels {
1582            def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1583            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1584            non_null_indices: vec![0, 1, 2, 4, 6],
1585            max_def_level: 2,
1586            max_rep_level: 1,
1587            array: map.values().clone(),
1588            logical_nulls: map_values_logical_nulls,
1589        };
1590        assert_eq!(list_level, &expected_level);
1591    }
1592
1593    #[test]
1594    fn test_list_of_struct() {
1595        // define schema
1596        let int_field = Field::new("a", DataType::Int32, true);
1597        let fields = Fields::from([Arc::new(int_field)]);
1598        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1599        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1600
1601        let int_builder = Int32Builder::with_capacity(10);
1602        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1603        let mut list_builder = ListBuilder::new(struct_builder);
1604
1605        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
1606        //
1607        // [{a: 1}]
1608        let values = list_builder.values();
1609        values
1610            .field_builder::<Int32Builder>(0)
1611            .unwrap()
1612            .append_value(1);
1613        values.append(true);
1614        list_builder.append(true);
1615
1616        // []
1617        list_builder.append(true);
1618
1619        // null
1620        list_builder.append(false);
1621
1622        // [null, null]
1623        let values = list_builder.values();
1624        values
1625            .field_builder::<Int32Builder>(0)
1626            .unwrap()
1627            .append_null();
1628        values.append(false);
1629        values
1630            .field_builder::<Int32Builder>(0)
1631            .unwrap()
1632            .append_null();
1633        values.append(false);
1634        list_builder.append(true);
1635
1636        // [{a: null}]
1637        let values = list_builder.values();
1638        values
1639            .field_builder::<Int32Builder>(0)
1640            .unwrap()
1641            .append_null();
1642        values.append(true);
1643        list_builder.append(true);
1644
1645        // [{a: 2}]
1646        let values = list_builder.values();
1647        values
1648            .field_builder::<Int32Builder>(0)
1649            .unwrap()
1650            .append_value(2);
1651        values.append(true);
1652        list_builder.append(true);
1653
1654        let array = Arc::new(list_builder.finish());
1655
1656        let values = array.values().as_struct().column(0).clone();
1657        let values_len = values.len();
1658        assert_eq!(values_len, 5);
1659
1660        let schema = Arc::new(Schema::new(vec![list_field]));
1661
1662        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1663
1664        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1665        let list_level = &levels[0];
1666
1667        let logical_nulls = values.logical_nulls();
1668        let expected_level = ArrayLevels {
1669            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1670            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1671            non_null_indices: vec![0, 4],
1672            max_def_level: 4,
1673            max_rep_level: 1,
1674            array: values,
1675            logical_nulls,
1676        };
1677
1678        assert_eq!(list_level, &expected_level);
1679    }
1680
1681    #[test]
1682    fn test_struct_mask_list() {
1683        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
1684        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1685            Some(vec![Some(1), Some(2)]),
1686            Some(vec![None]),
1687            Some(vec![]),
1688            Some(vec![Some(3), None]), // Masked by struct array
1689            Some(vec![Some(4), Some(5)]),
1690            None, // Masked by struct array
1691            None,
1692        ]);
1693        let values = inner.values().clone();
1694
1695        // This test assumes that nulls don't take up space
1696        assert_eq!(inner.values().len(), 7);
1697
1698        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1699        let array = Arc::new(inner) as ArrayRef;
1700        let nulls = Buffer::from([0b01010111]);
1701        let struct_a = StructArray::from((vec![(field, array)], nulls));
1702
1703        let field = Field::new("struct", struct_a.data_type().clone(), true);
1704        let array = Arc::new(struct_a) as ArrayRef;
1705        let levels = calculate_array_levels(&array, &field).unwrap();
1706
1707        assert_eq!(levels.len(), 1);
1708
1709        let logical_nulls = values.logical_nulls();
1710        let expected_level = ArrayLevels {
1711            def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1712            rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1713            non_null_indices: vec![0, 1, 5, 6],
1714            max_def_level: 4,
1715            max_rep_level: 1,
1716            array: values,
1717            logical_nulls,
1718        };
1719
1720        assert_eq!(&levels[0], &expected_level);
1721    }
1722
1723    #[test]
1724    fn test_list_mask_struct() {
1725        // Test the null mask of a struct array and the null mask of a list array
1726        // masking out non-null elements of their children
1727
1728        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1729            Some(vec![None]), // Masked by list array
1730            Some(vec![]),     // Masked by list array
1731            Some(vec![Some(3), None]),
1732            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
1733            None,
1734            None,
1735        ]);
1736        let a1_values = a1.values().clone();
1737        let a1 = Arc::new(a1) as ArrayRef;
1738
1739        let a2 = Arc::new(Int32Array::from_iter(vec![
1740            Some(1), // Masked by list array
1741            Some(2), // Masked by list array
1742            None,
1743            Some(4), // Masked by struct array
1744            Some(5),
1745            None,
1746        ])) as ArrayRef;
1747        let a2_values = a2.clone();
1748
1749        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1750        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1751
1752        let nulls = Buffer::from([0b00110111]);
1753        let struct_a = Arc::new(StructArray::from((
1754            vec![(field_a1, a1), (field_a2, a2)],
1755            nulls,
1756        ))) as ArrayRef;
1757
1758        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1759        let nulls = Buffer::from([0b00111100]);
1760
1761        let list_type = DataType::List(Arc::new(Field::new(
1762            "struct",
1763            struct_a.data_type().clone(),
1764            true,
1765        )));
1766
1767        let data = ArrayDataBuilder::new(list_type.clone())
1768            .len(6)
1769            .null_bit_buffer(Some(nulls))
1770            .add_buffer(offsets)
1771            .add_child_data(struct_a.into_data())
1772            .build()
1773            .unwrap();
1774
1775        let list = make_array(data);
1776        let list_field = Field::new("col", list_type, true);
1777
1778        let expected = vec![
1779            r#""#.to_string(),
1780            r#""#.to_string(),
1781            r#"[]"#.to_string(),
1782            r#"[{list: [3, ], integers: }]"#.to_string(),
1783            r#"[, {list: , integers: 5}]"#.to_string(),
1784            r#"[]"#.to_string(),
1785        ];
1786
1787        let actual: Vec<_> = (0..6)
1788            .map(|x| array_value_to_string(&list, x).unwrap())
1789            .collect();
1790        assert_eq!(actual, expected);
1791
1792        let levels = calculate_array_levels(&list, &list_field).unwrap();
1793
1794        assert_eq!(levels.len(), 2);
1795
1796        let a1_logical_nulls = a1_values.logical_nulls();
1797        let expected_level = ArrayLevels {
1798            def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1799            rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1800            non_null_indices: vec![1],
1801            max_def_level: 6,
1802            max_rep_level: 2,
1803            array: a1_values,
1804            logical_nulls: a1_logical_nulls,
1805        };
1806
1807        assert_eq!(&levels[0], &expected_level);
1808
1809        let a2_logical_nulls = a2_values.logical_nulls();
1810        let expected_level = ArrayLevels {
1811            def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1812            rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1813            non_null_indices: vec![4],
1814            max_def_level: 4,
1815            max_rep_level: 1,
1816            array: a2_values,
1817            logical_nulls: a2_logical_nulls,
1818        };
1819
1820        assert_eq!(&levels[1], &expected_level);
1821    }
1822
1823    #[test]
1824    fn test_fixed_size_list() {
1825        // [[1, 2], null, null, [7, 8], null]
1826        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1827        builder.values().append_slice(&[1, 2]);
1828        builder.append(true);
1829        builder.values().append_slice(&[3, 4]);
1830        builder.append(false);
1831        builder.values().append_slice(&[5, 6]);
1832        builder.append(false);
1833        builder.values().append_slice(&[7, 8]);
1834        builder.append(true);
1835        builder.values().append_slice(&[9, 10]);
1836        builder.append(false);
1837        let a = builder.finish();
1838        let values = a.values().clone();
1839
1840        let item_field = Field::new_list_field(a.data_type().clone(), true);
1841        let mut builder = levels(&item_field, a);
1842        builder.write(1..4);
1843        let levels = builder.finish();
1844
1845        assert_eq!(levels.len(), 1);
1846
1847        let list_level = &levels[0];
1848
1849        let logical_nulls = values.logical_nulls();
1850        let expected_level = ArrayLevels {
1851            def_levels: Some(vec![0, 0, 3, 3]),
1852            rep_levels: Some(vec![0, 0, 0, 1]),
1853            non_null_indices: vec![6, 7],
1854            max_def_level: 3,
1855            max_rep_level: 1,
1856            array: values,
1857            logical_nulls,
1858        };
1859        assert_eq!(list_level, &expected_level);
1860    }
1861
1862    #[test]
1863    fn test_fixed_size_list_of_struct() {
1864        // define schema
1865        let field_a = Field::new("a", DataType::Int32, true);
1866        let field_b = Field::new("b", DataType::Int64, false);
1867        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1868        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1869        let list_field = Field::new(
1870            "list",
1871            DataType::FixedSizeList(Arc::new(item_field), 2),
1872            true,
1873        );
1874
1875        let builder_a = Int32Builder::with_capacity(10);
1876        let builder_b = Int64Builder::with_capacity(10);
1877        let struct_builder =
1878            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1879        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1880
1881        // [
1882        //   [{a: 1, b: 2}, null],
1883        //   null,
1884        //   [null, null],
1885        //   [{a: null, b: 3}, {a: 2, b: 4}]
1886        // ]
1887
1888        // [{a: 1, b: 2}, null]
1889        let values = list_builder.values();
1890        // {a: 1, b: 2}
1891        values
1892            .field_builder::<Int32Builder>(0)
1893            .unwrap()
1894            .append_value(1);
1895        values
1896            .field_builder::<Int64Builder>(1)
1897            .unwrap()
1898            .append_value(2);
1899        values.append(true);
1900        // null
1901        values
1902            .field_builder::<Int32Builder>(0)
1903            .unwrap()
1904            .append_null();
1905        values
1906            .field_builder::<Int64Builder>(1)
1907            .unwrap()
1908            .append_value(0);
1909        values.append(false);
1910        list_builder.append(true);
1911
1912        // null
1913        let values = list_builder.values();
1914        // null
1915        values
1916            .field_builder::<Int32Builder>(0)
1917            .unwrap()
1918            .append_null();
1919        values
1920            .field_builder::<Int64Builder>(1)
1921            .unwrap()
1922            .append_value(0);
1923        values.append(false);
1924        // null
1925        values
1926            .field_builder::<Int32Builder>(0)
1927            .unwrap()
1928            .append_null();
1929        values
1930            .field_builder::<Int64Builder>(1)
1931            .unwrap()
1932            .append_value(0);
1933        values.append(false);
1934        list_builder.append(false);
1935
1936        // [null, null]
1937        let values = list_builder.values();
1938        // null
1939        values
1940            .field_builder::<Int32Builder>(0)
1941            .unwrap()
1942            .append_null();
1943        values
1944            .field_builder::<Int64Builder>(1)
1945            .unwrap()
1946            .append_value(0);
1947        values.append(false);
1948        // null
1949        values
1950            .field_builder::<Int32Builder>(0)
1951            .unwrap()
1952            .append_null();
1953        values
1954            .field_builder::<Int64Builder>(1)
1955            .unwrap()
1956            .append_value(0);
1957        values.append(false);
1958        list_builder.append(true);
1959
1960        // [{a: null, b: 3}, {a: 2, b: 4}]
1961        let values = list_builder.values();
1962        // {a: null, b: 3}
1963        values
1964            .field_builder::<Int32Builder>(0)
1965            .unwrap()
1966            .append_null();
1967        values
1968            .field_builder::<Int64Builder>(1)
1969            .unwrap()
1970            .append_value(3);
1971        values.append(true);
1972        // {a: 2, b: 4}
1973        values
1974            .field_builder::<Int32Builder>(0)
1975            .unwrap()
1976            .append_value(2);
1977        values
1978            .field_builder::<Int64Builder>(1)
1979            .unwrap()
1980            .append_value(4);
1981        values.append(true);
1982        list_builder.append(true);
1983
1984        let array = Arc::new(list_builder.finish());
1985
1986        assert_eq!(array.values().len(), 8);
1987        assert_eq!(array.len(), 4);
1988
1989        let struct_values = array.values().as_struct();
1990        let values_a = struct_values.column(0).clone();
1991        let values_b = struct_values.column(1).clone();
1992
1993        let schema = Arc::new(Schema::new(vec![list_field]));
1994        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1995
1996        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1997        let a_levels = &levels[0];
1998        let b_levels = &levels[1];
1999
2000        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
2001        let values_a_logical_nulls = values_a.logical_nulls();
2002        let expected_a = ArrayLevels {
2003            def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
2004            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2005            non_null_indices: vec![0, 7],
2006            max_def_level: 4,
2007            max_rep_level: 1,
2008            array: values_a,
2009            logical_nulls: values_a_logical_nulls,
2010        };
2011        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
2012        let values_b_logical_nulls = values_b.logical_nulls();
2013        let expected_b = ArrayLevels {
2014            def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
2015            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2016            non_null_indices: vec![0, 6, 7],
2017            max_def_level: 3,
2018            max_rep_level: 1,
2019            array: values_b,
2020            logical_nulls: values_b_logical_nulls,
2021        };
2022
2023        assert_eq!(a_levels, &expected_a);
2024        assert_eq!(b_levels, &expected_b);
2025    }
2026
2027    #[test]
2028    fn test_fixed_size_list_empty() {
2029        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2030        builder.append(true);
2031        builder.append(false);
2032        builder.append(true);
2033        let array = builder.finish();
2034        let values = array.values().clone();
2035
2036        let item_field = Field::new_list_field(array.data_type().clone(), true);
2037        let mut builder = levels(&item_field, array);
2038        builder.write(0..3);
2039        let levels = builder.finish();
2040
2041        assert_eq!(levels.len(), 1);
2042
2043        let list_level = &levels[0];
2044
2045        let logical_nulls = values.logical_nulls();
2046        let expected_level = ArrayLevels {
2047            def_levels: Some(vec![1, 0, 1]),
2048            rep_levels: Some(vec![0, 0, 0]),
2049            non_null_indices: vec![],
2050            max_def_level: 3,
2051            max_rep_level: 1,
2052            array: values,
2053            logical_nulls,
2054        };
2055        assert_eq!(list_level, &expected_level);
2056    }
2057
2058    #[test]
2059    fn test_fixed_size_list_of_var_lists() {
2060        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
2061        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2062        builder.values().append_value([Some(1), None, Some(3)]);
2063        builder.values().append_null();
2064        builder.append(true);
2065        builder.values().append_value([Some(4)]);
2066        builder.values().append_value([]);
2067        builder.append(true);
2068        builder.values().append_value([Some(5), Some(6)]);
2069        builder.values().append_value([None, None]);
2070        builder.append(true);
2071        builder.values().append_null();
2072        builder.values().append_null();
2073        builder.append(false);
2074        let a = builder.finish();
2075        let values = a.values().as_list::<i32>().values().clone();
2076
2077        let item_field = Field::new_list_field(a.data_type().clone(), true);
2078        let mut builder = levels(&item_field, a);
2079        builder.write(0..4);
2080        let levels = builder.finish();
2081
2082        let logical_nulls = values.logical_nulls();
2083        let expected_level = ArrayLevels {
2084            def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2085            rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2086            non_null_indices: vec![0, 2, 3, 4, 5],
2087            max_def_level: 5,
2088            max_rep_level: 2,
2089            array: values,
2090            logical_nulls,
2091        };
2092
2093        assert_eq!(levels[0], expected_level);
2094    }
2095
2096    #[test]
2097    fn test_null_dictionary_values() {
2098        let values = Int32Array::new(
2099            vec![1, 2, 3, 4].into(),
2100            Some(NullBuffer::from(vec![true, false, true, true])),
2101        );
2102        let keys = Int32Array::new(
2103            vec![1, 54, 2, 0].into(),
2104            Some(NullBuffer::from(vec![true, false, true, true])),
2105        );
2106        // [NULL, NULL, 3, 0]
2107        let dict = DictionaryArray::new(keys, Arc::new(values));
2108
2109        let item_field = Field::new_list_field(dict.data_type().clone(), true);
2110
2111        let mut builder = levels(&item_field, dict.clone());
2112        builder.write(0..4);
2113        let levels = builder.finish();
2114
2115        let logical_nulls = dict.logical_nulls();
2116        let expected_level = ArrayLevels {
2117            def_levels: Some(vec![0, 0, 1, 1]),
2118            rep_levels: None,
2119            non_null_indices: vec![2, 3],
2120            max_def_level: 1,
2121            max_rep_level: 0,
2122            array: Arc::new(dict),
2123            logical_nulls,
2124        };
2125        assert_eq!(levels[0], expected_level);
2126    }
2127
2128    #[test]
2129    fn mismatched_types() {
2130        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2131        let field = Field::new_list_field(DataType::Float64, false);
2132
2133        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2134            .unwrap_err()
2135            .to_string();
2136
2137        assert_eq!(
2138            err,
2139            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2140        );
2141    }
2142
2143    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2144        let v = Arc::new(array) as ArrayRef;
2145        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2146    }
2147
2148    #[test]
2149    fn test_slice_for_chunk_flat() {
2150        // Case 1: required field (max_def_level=0, no def/rep levels stored).
2151        // Array has 6 values; all are non-null so non_null_indices covers every position.
2152        // The chunk selects value_offset=2, num_values=3 → the sub-array [3, 4, 5].
2153        // Since there are no levels, num_levels=0 and level_offset are irrelevant.
2154        // non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2 → [0,1,2].
2155        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2156        let logical_nulls = array.logical_nulls();
2157        let levels = ArrayLevels {
2158            def_levels: None,
2159            rep_levels: None,
2160            non_null_indices: vec![0, 1, 2, 3, 4, 5],
2161            max_def_level: 0,
2162            max_rep_level: 0,
2163            array,
2164            logical_nulls,
2165        };
2166        let sliced = levels.slice_for_chunk(&CdcChunk {
2167            level_offset: 0,
2168            num_levels: 0,
2169            value_offset: 2,
2170            num_values: 3,
2171        });
2172        assert!(sliced.def_levels.is_none());
2173        assert!(sliced.rep_levels.is_none());
2174        assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2175        assert_eq!(sliced.array.len(), 3);
2176
2177        // Case 2: optional field (max_def_level=1, def levels present, no rep levels).
2178        // Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
2179        // def_levels: [1, 0, 1, 0, 1, 1]  (1=non-null, 0=null)
2180        // non_null_indices: [0, 2, 4, 5]  (array positions of the four non-null values)
2181        //
2182        // The chunk selects level_offset=1, num_levels=3, value_offset=1, num_values=3:
2183        //   - def_levels[1..4] = [0, 1, 0]  → null, non-null, null
2184        //   - sub-array slice(1, 3) = [None, Some(3), None]
2185        //   - non_null_indices filtered to [value_offset=1, value_end=4): only index 2 qualifies,
2186        //     shifted by -1 → [1]  (position of Some(3) within the sliced sub-array)
2187        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2188            Some(1),
2189            None,
2190            Some(3),
2191            None,
2192            Some(5),
2193            Some(6),
2194        ]));
2195        let logical_nulls = array.logical_nulls();
2196        let levels = ArrayLevels {
2197            def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
2198            rep_levels: None,
2199            non_null_indices: vec![0, 2, 4, 5],
2200            max_def_level: 1,
2201            max_rep_level: 0,
2202            array,
2203            logical_nulls,
2204        };
2205        let sliced = levels.slice_for_chunk(&CdcChunk {
2206            level_offset: 1,
2207            num_levels: 3,
2208            value_offset: 1,
2209            num_values: 3,
2210        });
2211        assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
2212        assert!(sliced.rep_levels.is_none());
2213        assert_eq!(sliced.non_null_indices, vec![1]);
2214        assert_eq!(sliced.array.len(), 3);
2215    }
2216
2217    #[test]
2218    fn test_slice_for_chunk_nested() {
2219        // [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
2220        // Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
2221        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
2222        let logical_nulls = array.logical_nulls();
2223        let levels = ArrayLevels {
2224            def_levels: Some(vec![2, 2, 2, 2, 2]),
2225            rep_levels: Some(vec![0, 1, 0, 0, 1]),
2226            non_null_indices: vec![0, 1, 2, 3, 4],
2227            max_def_level: 2,
2228            max_rep_level: 1,
2229            array,
2230            logical_nulls,
2231        };
2232        let sliced = levels.slice_for_chunk(&CdcChunk {
2233            level_offset: 2,
2234            num_levels: 3,
2235            value_offset: 2,
2236            num_values: 3,
2237        });
2238        assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
2239        assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
2240        // [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
2241        assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2242        assert_eq!(sliced.array.len(), 3);
2243    }
2244
2245    #[test]
2246    fn test_slice_for_chunk_non_null_indices_boundary() {
2247        // [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / exclusive upper bounds
2248        let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
2249        let logical_nulls = array.logical_nulls();
2250        let levels = ArrayLevels {
2251            def_levels: Some(vec![1, 0, 1]),
2252            rep_levels: None,
2253            non_null_indices: vec![0, 2],
2254            max_def_level: 1,
2255            max_rep_level: 0,
2256            array,
2257            logical_nulls,
2258        };
2259        assert_eq!(
2260            levels
2261                .slice_for_chunk(&CdcChunk {
2262                    level_offset: 0,
2263                    num_levels: 1,
2264                    value_offset: 0,
2265                    num_values: 1
2266                })
2267                .non_null_indices,
2268            vec![0]
2269        );
2270        // idx 2 in range [1,3), shifted -1 → 1
2271        assert_eq!(
2272            levels
2273                .slice_for_chunk(&CdcChunk {
2274                    level_offset: 1,
2275                    num_levels: 2,
2276                    value_offset: 1,
2277                    num_values: 2
2278                })
2279                .non_null_indices,
2280            vec![1]
2281        );
2282        // idx 2 excluded from [1,2)
2283        assert_eq!(
2284            levels
2285                .slice_for_chunk(&CdcChunk {
2286                    level_offset: 1,
2287                    num_levels: 1,
2288                    value_offset: 1,
2289                    num_values: 1
2290                })
2291                .non_null_indices,
2292            Vec::<usize>::new()
2293        );
2294    }
2295}