Skip to main content

parquet/arrow/arrow_writer/
levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet definition and repetition levels
19//!
20//! Contains the algorithm for computing definition and repetition levels.
21//! The algorithm works by tracking the slots of an array that should
22//! ultimately be populated when writing to Parquet.
23//! Parquet achieves nesting through definition levels and repetition levels \[1\].
24//! Definition levels specify how many optional fields in the part for the column
25//! are defined.
26//! Repetition levels specify at what repeated field (list) in the path a column
27//! is defined.
28//!
29//! In a nested data structure such as `a.b.c`, one can see levels as defining
30//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
31//! Optional fields are nullable fields, thus if all 3 fields
32//! are nullable, the maximum definition could be = 3 if there are no lists.
33//!
34//! The algorithm in this module computes the necessary information to enable
35//! the writer to keep track of which columns are at which levels, and to extract
36//! the correct values at the correct slots from Arrow arrays.
37//!
38//! It works by walking a record batch's arrays, keeping track of what values
39//! are non-null, their positions and computing what their levels are.
40//!
41//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
42
43use crate::column::chunker::CdcChunk;
44use crate::errors::{ParquetError, Result};
45use arrow_array::cast::AsArray;
46use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
47use arrow_buffer::bit_iterator::BitIndexIterator;
48use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
49use arrow_schema::{DataType, Field};
50use std::ops::Range;
51use std::sync::Arc;
52
53/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
54/// for each leaf column encountered
55pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
56    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
57    builder.write(0..array.len());
58    Ok(builder.finish())
59}
60
61/// Returns true if the DataType can be represented as a primitive parquet column,
62/// i.e. a leaf array with no children
63fn is_leaf(data_type: &DataType) -> bool {
64    matches!(
65        data_type,
66        DataType::Null
67            | DataType::Boolean
68            | DataType::Int8
69            | DataType::Int16
70            | DataType::Int32
71            | DataType::Int64
72            | DataType::UInt8
73            | DataType::UInt16
74            | DataType::UInt32
75            | DataType::UInt64
76            | DataType::Float16
77            | DataType::Float32
78            | DataType::Float64
79            | DataType::Utf8
80            | DataType::Utf8View
81            | DataType::LargeUtf8
82            | DataType::Timestamp(_, _)
83            | DataType::Date32
84            | DataType::Date64
85            | DataType::Time32(_)
86            | DataType::Time64(_)
87            | DataType::Duration(_)
88            | DataType::Interval(_)
89            | DataType::Binary
90            | DataType::LargeBinary
91            | DataType::BinaryView
92            | DataType::Decimal32(_, _)
93            | DataType::Decimal64(_, _)
94            | DataType::Decimal128(_, _)
95            | DataType::Decimal256(_, _)
96            | DataType::FixedSizeBinary(_)
97    )
98}
99
100/// The definition and repetition level of an array within a potentially nested hierarchy
101#[derive(Debug, Default, Clone, Copy)]
102struct LevelContext {
103    /// The current repetition level
104    rep_level: i16,
105    /// The current definition level
106    def_level: i16,
107}
108
109/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
110#[derive(Debug)]
111enum LevelInfoBuilder {
112    /// A primitive, leaf array
113    Primitive(ArrayLevels),
114    /// A list array
115    List(
116        Box<LevelInfoBuilder>, // Child Values
117        LevelContext,          // Context
118        OffsetBuffer<i32>,     // Offsets
119        Option<NullBuffer>,    // Nulls
120    ),
121    /// A large list array
122    LargeList(
123        Box<LevelInfoBuilder>, // Child Values
124        LevelContext,          // Context
125        OffsetBuffer<i64>,     // Offsets
126        Option<NullBuffer>,    // Nulls
127    ),
128    /// A fixed size list array
129    FixedSizeList(
130        Box<LevelInfoBuilder>, // Values
131        LevelContext,          // Context
132        usize,                 // List Size
133        Option<NullBuffer>,    // Nulls
134    ),
135    /// A list view array
136    ListView(
137        Box<LevelInfoBuilder>, // Child Values
138        LevelContext,          // Context
139        ScalarBuffer<i32>,     // Offsets
140        ScalarBuffer<i32>,     // Sizes
141        Option<NullBuffer>,    // Nulls
142    ),
143    /// A large list view array
144    LargeListView(
145        Box<LevelInfoBuilder>, // Child Values
146        LevelContext,          // Context
147        ScalarBuffer<i64>,     // Offsets
148        ScalarBuffer<i64>,     // Sizes
149        Option<NullBuffer>,    // Nulls
150    ),
151    /// A struct array
152    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
153}
154
155impl LevelInfoBuilder {
156    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
157    fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
158        if !Self::types_compatible(field.data_type(), array.data_type()) {
159            return Err(arrow_err!(format!(
160                "Incompatible type. Field '{}' has type {}, array has type {}",
161                field.name(),
162                field.data_type(),
163                array.data_type(),
164            )));
165        }
166
167        let is_nullable = field.is_nullable();
168
169        match array.data_type() {
170            d if is_leaf(d) => {
171                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
172                Ok(Self::Primitive(levels))
173            }
174            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
175                let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
176                Ok(Self::Primitive(levels))
177            }
178            DataType::Struct(children) => {
179                let array = array.as_struct();
180                let def_level = match is_nullable {
181                    true => parent_ctx.def_level + 1,
182                    false => parent_ctx.def_level,
183                };
184
185                let ctx = LevelContext {
186                    rep_level: parent_ctx.rep_level,
187                    def_level,
188                };
189
190                let children = children
191                    .iter()
192                    .zip(array.columns())
193                    .map(|(f, a)| Self::try_new(f, ctx, a))
194                    .collect::<Result<_>>()?;
195
196                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
197            }
198            DataType::List(child)
199            | DataType::LargeList(child)
200            | DataType::Map(child, _)
201            | DataType::FixedSizeList(child, _)
202            | DataType::ListView(child)
203            | DataType::LargeListView(child) => {
204                let def_level = match is_nullable {
205                    true => parent_ctx.def_level + 2,
206                    false => parent_ctx.def_level + 1,
207                };
208
209                let ctx = LevelContext {
210                    rep_level: parent_ctx.rep_level + 1,
211                    def_level,
212                };
213
214                Ok(match field.data_type() {
215                    DataType::List(_) => {
216                        let list = array.as_list();
217                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
218                        let offsets = list.offsets().clone();
219                        Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
220                    }
221                    DataType::LargeList(_) => {
222                        let list = array.as_list();
223                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
224                        let offsets = list.offsets().clone();
225                        let nulls = list.nulls().cloned();
226                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
227                    }
228                    DataType::Map(_, _) => {
229                        let map = array.as_map();
230                        let entries = Arc::new(map.entries().clone()) as ArrayRef;
231                        let child = Self::try_new(child.as_ref(), ctx, &entries)?;
232                        let offsets = map.offsets().clone();
233                        Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
234                    }
235                    DataType::FixedSizeList(_, size) => {
236                        let list = array.as_fixed_size_list();
237                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
238                        let nulls = list.nulls().cloned();
239                        Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
240                    }
241                    DataType::ListView(_) => {
242                        let list = array.as_list_view();
243                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
244                        let offsets = list.offsets().clone();
245                        let sizes = list.sizes().clone();
246                        let nulls = list.nulls().cloned();
247                        Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
248                    }
249                    DataType::LargeListView(_) => {
250                        let list = array.as_list_view();
251                        let child = Self::try_new(child.as_ref(), ctx, list.values())?;
252                        let offsets = list.offsets().clone();
253                        let sizes = list.sizes().clone();
254                        let nulls = list.nulls().cloned();
255                        Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
256                    }
257                    _ => unreachable!(),
258                })
259            }
260            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
261        }
262    }
263
264    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns
265    /// as enumerated by a depth-first search
266    fn finish(self) -> Vec<ArrayLevels> {
267        match self {
268            LevelInfoBuilder::Primitive(v) => vec![v],
269            LevelInfoBuilder::List(v, _, _, _)
270            | LevelInfoBuilder::LargeList(v, _, _, _)
271            | LevelInfoBuilder::FixedSizeList(v, _, _, _)
272            | LevelInfoBuilder::ListView(v, _, _, _, _)
273            | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
274            LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
275        }
276    }
277
278    /// Given an `array`, write the level data for the elements in `range`
279    fn write(&mut self, range: Range<usize>) {
280        match self {
281            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
282            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
283                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
284            }
285            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
286                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
287            }
288            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
289                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
290            }
291            LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
292                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
293            }
294            LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
295                Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
296            }
297            LevelInfoBuilder::Struct(children, ctx, nulls) => {
298                Self::write_struct(children, ctx, nulls.as_ref(), range)
299            }
300        }
301    }
302
303    /// Write `range` elements from ListArray `array`
304    ///
305    /// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
306    fn write_list<O: OffsetSizeTrait>(
307        child: &mut LevelInfoBuilder,
308        ctx: &LevelContext,
309        offsets: &[O],
310        nulls: Option<&NullBuffer>,
311        range: Range<usize>,
312    ) {
313        let offsets = &offsets[range.start..range.end + 1];
314
315        let write_non_null_slice =
316            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
317                child.write(start_idx..end_idx);
318                child.visit_leaves(|leaf| {
319                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
320                    let mut rev = rep_levels.iter_mut().rev();
321                    let mut remaining = end_idx - start_idx;
322
323                    loop {
324                        let next = rev.next().unwrap();
325                        if *next > ctx.rep_level {
326                            // Nested element - ignore
327                            continue;
328                        }
329
330                        remaining -= 1;
331                        if remaining == 0 {
332                            *next = ctx.rep_level - 1;
333                            break;
334                        }
335                    }
336                })
337            };
338
339        let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
340            if count > 0 {
341                child.visit_leaves(|leaf| {
342                    leaf.rep_levels
343                        .as_mut()
344                        .unwrap()
345                        .extend(std::iter::repeat_n(ctx.rep_level - 1, count));
346                    leaf.def_levels
347                        .as_mut()
348                        .unwrap()
349                        .extend(std::iter::repeat_n(ctx.def_level - 2, count));
350                });
351            }
352        };
353
354        let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
355            if count > 0 {
356                child.visit_leaves(|leaf| {
357                    leaf.rep_levels
358                        .as_mut()
359                        .unwrap()
360                        .extend(std::iter::repeat_n(ctx.rep_level - 1, count));
361                    leaf.def_levels
362                        .as_mut()
363                        .unwrap()
364                        .extend(std::iter::repeat_n(ctx.def_level - 1, count));
365                });
366            }
367        };
368
369        match nulls {
370            Some(nulls) => {
371                let null_offset = range.start;
372                let mut pending_nulls: usize = 0;
373                let mut pending_empties: usize = 0;
374
375                // TODO: Faster bitmask iteration (#1757)
376                for (idx, w) in offsets.windows(2).enumerate() {
377                    let is_valid = nulls.is_valid(idx + null_offset);
378                    let start_idx = w[0].as_usize();
379                    let end_idx = w[1].as_usize();
380
381                    if !is_valid {
382                        write_empty_run(child, pending_empties);
383                        pending_empties = 0;
384                        pending_nulls += 1;
385                    } else if start_idx == end_idx {
386                        write_null_run(child, pending_nulls);
387                        pending_nulls = 0;
388                        pending_empties += 1;
389                    } else {
390                        write_null_run(child, pending_nulls);
391                        pending_nulls = 0;
392                        write_empty_run(child, pending_empties);
393                        pending_empties = 0;
394                        write_non_null_slice(child, start_idx, end_idx);
395                    }
396                }
397                write_null_run(child, pending_nulls);
398                write_empty_run(child, pending_empties);
399            }
400            None => {
401                let mut pending_empties: usize = 0;
402                for w in offsets.windows(2) {
403                    let start_idx = w[0].as_usize();
404                    let end_idx = w[1].as_usize();
405                    if start_idx == end_idx {
406                        pending_empties += 1;
407                    } else {
408                        write_empty_run(child, pending_empties);
409                        pending_empties = 0;
410                        write_non_null_slice(child, start_idx, end_idx);
411                    }
412                }
413                write_empty_run(child, pending_empties);
414            }
415        }
416    }
417
418    /// Write `range` elements from ListViewArray `array`
419    fn write_list_view<O: OffsetSizeTrait>(
420        child: &mut LevelInfoBuilder,
421        ctx: &LevelContext,
422        offsets: &[O],
423        sizes: &[O],
424        nulls: Option<&NullBuffer>,
425        range: Range<usize>,
426    ) {
427        let offsets = &offsets[range.start..range.end];
428        let sizes = &sizes[range.start..range.end];
429
430        let write_non_null_slice =
431            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
432                child.write(start_idx..end_idx);
433                child.visit_leaves(|leaf| {
434                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
435                    let mut rev = rep_levels.iter_mut().rev();
436                    let mut remaining = end_idx - start_idx;
437
438                    loop {
439                        let next = rev.next().unwrap();
440                        if *next > ctx.rep_level {
441                            // Nested element - ignore
442                            continue;
443                        }
444
445                        remaining -= 1;
446                        if remaining == 0 {
447                            *next = ctx.rep_level - 1;
448                            break;
449                        }
450                    }
451                })
452            };
453
454        let write_empty_slice = |child: &mut LevelInfoBuilder| {
455            child.visit_leaves(|leaf| {
456                let rep_levels = leaf.rep_levels.as_mut().unwrap();
457                rep_levels.push(ctx.rep_level - 1);
458                let def_levels = leaf.def_levels.as_mut().unwrap();
459                def_levels.push(ctx.def_level - 1);
460            })
461        };
462
463        let write_null_slice = |child: &mut LevelInfoBuilder| {
464            child.visit_leaves(|leaf| {
465                let rep_levels = leaf.rep_levels.as_mut().unwrap();
466                rep_levels.push(ctx.rep_level - 1);
467                let def_levels = leaf.def_levels.as_mut().unwrap();
468                def_levels.push(ctx.def_level - 2);
469            })
470        };
471
472        match nulls {
473            Some(nulls) => {
474                let null_offset = range.start;
475                // TODO: Faster bitmask iteration (#1757)
476                for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
477                    let is_valid = nulls.is_valid(idx + null_offset);
478                    let start_idx = offset.as_usize();
479                    let size = size.as_usize();
480                    let end_idx = start_idx + size;
481                    if !is_valid {
482                        write_null_slice(child)
483                    } else if size == 0 {
484                        write_empty_slice(child)
485                    } else {
486                        write_non_null_slice(child, start_idx, end_idx)
487                    }
488                }
489            }
490            None => {
491                for (offset, size) in offsets.iter().zip(sizes.iter()) {
492                    let start_idx = offset.as_usize();
493                    let size = size.as_usize();
494                    let end_idx = start_idx + size;
495                    if size == 0 {
496                        write_empty_slice(child)
497                    } else {
498                        write_non_null_slice(child, start_idx, end_idx)
499                    }
500                }
501            }
502        }
503    }
504
505    /// Write `range` elements from StructArray `array`
506    fn write_struct(
507        children: &mut [LevelInfoBuilder],
508        ctx: &LevelContext,
509        nulls: Option<&NullBuffer>,
510        range: Range<usize>,
511    ) {
512        let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
513            for child in children {
514                child.visit_leaves(|info| {
515                    let len = range.end - range.start;
516
517                    let def_levels = info.def_levels.as_mut().unwrap();
518                    def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
519
520                    if let Some(rep_levels) = info.rep_levels.as_mut() {
521                        rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
522                    }
523                })
524            }
525        };
526
527        let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
528            for child in children {
529                child.write(range.clone())
530            }
531        };
532
533        match nulls {
534            Some(validity) => {
535                let mut last_non_null_idx = None;
536                let mut last_null_idx = None;
537
538                // TODO: Faster bitmask iteration (#1757)
539                for i in range.clone() {
540                    match validity.is_valid(i) {
541                        true => {
542                            if let Some(last_idx) = last_null_idx.take() {
543                                write_null(children, last_idx..i)
544                            }
545                            last_non_null_idx.get_or_insert(i);
546                        }
547                        false => {
548                            if let Some(last_idx) = last_non_null_idx.take() {
549                                write_non_null(children, last_idx..i)
550                            }
551                            last_null_idx.get_or_insert(i);
552                        }
553                    }
554                }
555
556                if let Some(last_idx) = last_null_idx.take() {
557                    write_null(children, last_idx..range.end)
558                }
559
560                if let Some(last_idx) = last_non_null_idx.take() {
561                    write_non_null(children, last_idx..range.end)
562                }
563            }
564            None => write_non_null(children, range),
565        }
566    }
567
568    /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`.
569    fn write_fixed_size_list(
570        child: &mut LevelInfoBuilder,
571        ctx: &LevelContext,
572        fixed_size: usize,
573        nulls: Option<&NullBuffer>,
574        range: Range<usize>,
575    ) {
576        let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
577            let values_start = start_idx * fixed_size;
578            let values_end = end_idx * fixed_size;
579            child.write(values_start..values_end);
580
581            child.visit_leaves(|leaf| {
582                let rep_levels = leaf.rep_levels.as_mut().unwrap();
583
584                let row_indices = (0..fixed_size)
585                    .rev()
586                    .cycle()
587                    .take(values_end - values_start);
588
589                // Step backward over the child rep levels and mark the start of each list
590                rep_levels
591                    .iter_mut()
592                    .rev()
593                    // Filter out reps from nested children
594                    .filter(|&&mut r| r == ctx.rep_level)
595                    .zip(row_indices)
596                    .for_each(|(r, idx)| {
597                        if idx == 0 {
598                            *r = ctx.rep_level - 1;
599                        }
600                    });
601            })
602        };
603
604        // If list size is 0, ignore values and just write rep/def levels.
605        let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
606            let len = end_idx - start_idx;
607            child.visit_leaves(|leaf| {
608                let rep_levels = leaf.rep_levels.as_mut().unwrap();
609                rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
610                let def_levels = leaf.def_levels.as_mut().unwrap();
611                def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
612            })
613        };
614
615        let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
616            if fixed_size > 0 {
617                write_non_null(child, start_idx, end_idx)
618            } else {
619                write_empty(child, start_idx, end_idx)
620            }
621        };
622
623        match nulls {
624            Some(nulls) => {
625                let mut start_idx = None;
626                for idx in range.clone() {
627                    if nulls.is_valid(idx) {
628                        // Start a run of valid rows if not already inside of one
629                        start_idx.get_or_insert(idx);
630                    } else {
631                        // Write out any pending valid rows
632                        if let Some(start) = start_idx.take() {
633                            write_rows(child, start, idx);
634                        }
635                        // Add null row
636                        child.visit_leaves(|leaf| {
637                            let rep_levels = leaf.rep_levels.as_mut().unwrap();
638                            rep_levels.push(ctx.rep_level - 1);
639                            let def_levels = leaf.def_levels.as_mut().unwrap();
640                            def_levels.push(ctx.def_level - 2);
641                        })
642                    }
643                }
644                // Write out any remaining valid rows
645                if let Some(start) = start_idx.take() {
646                    write_rows(child, start, range.end);
647                }
648            }
649            // If all rows are valid then write the whole array
650            None => write_rows(child, range.start, range.end),
651        }
652    }
653
654    /// Write a primitive array, as defined by [`is_leaf`]
655    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
656        let len = range.end - range.start;
657
658        match &mut info.def_levels {
659            Some(def_levels) => {
660                def_levels.reserve(len);
661                info.non_null_indices.reserve(len);
662
663                match &info.logical_nulls {
664                    Some(nulls) => {
665                        assert!(range.end <= nulls.len());
666                        let nulls = nulls.inner();
667                        def_levels.extend(range.clone().map(|i| {
668                            // Safety: range.end was asserted to be in bounds earlier
669                            let valid = unsafe { nulls.value_unchecked(i) };
670                            info.max_def_level - (!valid as i16)
671                        }));
672                        info.non_null_indices.extend(
673                            BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
674                                .map(|i| i + range.start),
675                        );
676                    }
677                    None => {
678                        let iter = std::iter::repeat_n(info.max_def_level, len);
679                        def_levels.extend(iter);
680                        info.non_null_indices.extend(range);
681                    }
682                }
683            }
684            None => info.non_null_indices.extend(range),
685        }
686
687        if let Some(rep_levels) = &mut info.rep_levels {
688            rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
689        }
690    }
691
692    /// Visits all children of this node in depth first order
693    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
694        match self {
695            LevelInfoBuilder::Primitive(info) => visit(info),
696            LevelInfoBuilder::List(c, _, _, _)
697            | LevelInfoBuilder::LargeList(c, _, _, _)
698            | LevelInfoBuilder::FixedSizeList(c, _, _, _)
699            | LevelInfoBuilder::ListView(c, _, _, _, _)
700            | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
701            LevelInfoBuilder::Struct(children, _, _) => {
702                for c in children {
703                    c.visit_leaves(visit)
704                }
705            }
706        }
707    }
708
709    /// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
710    ///
711    /// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
712    /// and the other is a native array, the dictionary values must have the same type as the
713    /// native array
714    fn types_compatible(a: &DataType, b: &DataType) -> bool {
715        // if the Arrow data types are equal, the types are deemed compatible
716        if a.equals_datatype(b) {
717            return true;
718        }
719
720        // get the values out of the dictionaries
721        let (a, b) = match (a, b) {
722            (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
723                (va.as_ref(), vb.as_ref())
724            }
725            (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
726            (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
727            _ => (a, b),
728        };
729
730        // now that we've got the values from one/both dictionaries, if the values
731        // have the same Arrow data type, they're compatible
732        if a == b {
733            return true;
734        }
735
736        // here we have different Arrow data types, but if the array contains the same type of data
737        // then we consider the type compatible
738        match a {
739            // String, StringView and LargeString are compatible
740            DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
741            DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
742            DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
743
744            // Binary, BinaryView and LargeBinary are compatible
745            DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
746            DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
747            DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
748
749            // otherwise we have incompatible types
750            _ => false,
751        }
752    }
753}
754
755/// The data necessary to write a primitive Arrow array to parquet, taking into account
756/// any non-primitive parents it may have in the arrow representation
757#[derive(Debug, Clone)]
758pub(crate) struct ArrayLevels {
759    /// Array's definition levels
760    ///
761    /// Present if `max_def_level != 0`
762    def_levels: Option<Vec<i16>>,
763
764    /// Array's optional repetition levels
765    ///
766    /// Present if `max_rep_level != 0`
767    rep_levels: Option<Vec<i16>>,
768
769    /// The corresponding array identifying non-null slices of data
770    /// from the primitive array
771    non_null_indices: Vec<usize>,
772
773    /// The maximum definition level for this leaf column
774    max_def_level: i16,
775
776    /// The maximum repetition for this leaf column
777    max_rep_level: i16,
778
779    /// The arrow array
780    array: ArrayRef,
781
782    /// cached logical nulls of the array.
783    logical_nulls: Option<NullBuffer>,
784}
785
786impl PartialEq for ArrayLevels {
787    fn eq(&self, other: &Self) -> bool {
788        self.def_levels == other.def_levels
789            && self.rep_levels == other.rep_levels
790            && self.non_null_indices == other.non_null_indices
791            && self.max_def_level == other.max_def_level
792            && self.max_rep_level == other.max_rep_level
793            && self.array.as_ref() == other.array.as_ref()
794            && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
795    }
796}
797impl Eq for ArrayLevels {}
798
799impl ArrayLevels {
800    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
801        let max_rep_level = ctx.rep_level;
802        let max_def_level = match is_nullable {
803            true => ctx.def_level + 1,
804            false => ctx.def_level,
805        };
806
807        let logical_nulls = array.logical_nulls();
808
809        Self {
810            def_levels: (max_def_level != 0).then(Vec::new),
811            rep_levels: (max_rep_level != 0).then(Vec::new),
812            non_null_indices: vec![],
813            max_def_level,
814            max_rep_level,
815            array,
816            logical_nulls,
817        }
818    }
819
820    pub fn array(&self) -> &ArrayRef {
821        &self.array
822    }
823
824    pub fn def_levels(&self) -> Option<&[i16]> {
825        self.def_levels.as_deref()
826    }
827
828    pub fn rep_levels(&self) -> Option<&[i16]> {
829        self.rep_levels.as_deref()
830    }
831
832    pub fn non_null_indices(&self) -> &[usize] {
833        &self.non_null_indices
834    }
835
836    /// Create a sliced view of this `ArrayLevels` for a CDC chunk.
837    ///
838    /// The chunk's `value_offset`/`num_values` select the relevant slice of
839    /// `non_null_indices`. The array is sliced to the range covered by
840    /// those indices, and they are shifted to be relative to the slice.
841    pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
842        let def_levels = self.def_levels.as_ref().map(|levels| {
843            levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
844        });
845        let rep_levels = self.rep_levels.as_ref().map(|levels| {
846            levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
847        });
848
849        // Select the non-null indices for this chunk.
850        let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
851        // Compute the array range spanned by the non-null indices.
852        // When nni is empty (all-null chunk), start=0, end=0 → zero-length
853        // array slice; write_batch_internal will process only the def/rep
854        // levels and write no values.
855        let start = nni.first().copied().unwrap_or(0);
856        let end = nni.last().map_or(0, |&i| i + 1);
857        // Shift indices to be relative to the sliced array.
858        let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
859        // Slice the array to the computed range.
860        let array = self.array.slice(start, end - start);
861        let logical_nulls = array.logical_nulls();
862
863        Self {
864            def_levels,
865            rep_levels,
866            non_null_indices,
867            max_def_level: self.max_def_level,
868            max_rep_level: self.max_rep_level,
869            array,
870            logical_nulls,
871        }
872    }
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878    use crate::column::chunker::CdcChunk;
879
880    use arrow_array::builder::*;
881    use arrow_array::types::Int32Type;
882    use arrow_array::*;
883    use arrow_buffer::{Buffer, ToByteSlice};
884    use arrow_cast::display::array_value_to_string;
885    use arrow_data::{ArrayData, ArrayDataBuilder};
886    use arrow_schema::{Fields, Schema};
887
888    #[test]
889    fn test_calculate_array_levels_twitter_example() {
890        // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
891        // [[a, b, c], [d, e, f, g]], [[h], [i,j]]
892
893        let leaf_type = Field::new_list_field(DataType::Int32, false);
894        let inner_type = DataType::List(Arc::new(leaf_type));
895        let inner_field = Field::new("l2", inner_type.clone(), false);
896        let outer_type = DataType::List(Arc::new(inner_field));
897        let outer_field = Field::new("l1", outer_type.clone(), false);
898
899        let primitives = Int32Array::from_iter(0..10);
900
901        // Cannot use from_iter_primitive as always infers nullable
902        let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
903        let inner_list = ArrayDataBuilder::new(inner_type)
904            .len(4)
905            .add_buffer(offsets)
906            .add_child_data(primitives.to_data())
907            .build()
908            .unwrap();
909
910        let offsets = Buffer::from_iter([0_i32, 2, 4]);
911        let outer_list = ArrayDataBuilder::new(outer_type)
912            .len(2)
913            .add_buffer(offsets)
914            .add_child_data(inner_list)
915            .build()
916            .unwrap();
917        let outer_list = make_array(outer_list);
918
919        let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
920        assert_eq!(levels.len(), 1);
921
922        let expected = ArrayLevels {
923            def_levels: Some(vec![2; 10]),
924            rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
925            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
926            max_def_level: 2,
927            max_rep_level: 2,
928            array: Arc::new(primitives),
929            logical_nulls: None,
930        };
931        assert_eq!(&levels[0], &expected);
932    }
933
934    #[test]
935    fn test_calculate_one_level_1() {
936        // This test calculates the levels for a non-null primitive array
937        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
938        let field = Field::new_list_field(DataType::Int32, false);
939
940        let levels = calculate_array_levels(&array, &field).unwrap();
941        assert_eq!(levels.len(), 1);
942
943        let expected_levels = ArrayLevels {
944            def_levels: None,
945            rep_levels: None,
946            non_null_indices: (0..10).collect(),
947            max_def_level: 0,
948            max_rep_level: 0,
949            array,
950            logical_nulls: None,
951        };
952        assert_eq!(&levels[0], &expected_levels);
953    }
954
955    #[test]
956    fn test_calculate_one_level_2() {
957        // This test calculates the levels for a nullable primitive array
958        let array = Arc::new(Int32Array::from_iter([
959            Some(0),
960            None,
961            Some(0),
962            Some(0),
963            None,
964        ])) as ArrayRef;
965        let field = Field::new_list_field(DataType::Int32, true);
966
967        let levels = calculate_array_levels(&array, &field).unwrap();
968        assert_eq!(levels.len(), 1);
969
970        let logical_nulls = array.logical_nulls();
971        let expected_levels = ArrayLevels {
972            def_levels: Some(vec![1, 0, 1, 1, 0]),
973            rep_levels: None,
974            non_null_indices: vec![0, 2, 3],
975            max_def_level: 1,
976            max_rep_level: 0,
977            array,
978            logical_nulls,
979        };
980        assert_eq!(&levels[0], &expected_levels);
981    }
982
983    #[test]
984    fn test_calculate_array_levels_1() {
985        let leaf_field = Field::new_list_field(DataType::Int32, false);
986        let list_type = DataType::List(Arc::new(leaf_field));
987
988        // if all array values are defined (e.g. batch<list<_>>)
989        // [[0], [1], [2], [3], [4]]
990
991        let leaf_array = Int32Array::from_iter(0..5);
992        // Cannot use from_iter_primitive as always infers nullable
993        let offsets = Buffer::from_iter(0_i32..6);
994        let list = ArrayDataBuilder::new(list_type.clone())
995            .len(5)
996            .add_buffer(offsets)
997            .add_child_data(leaf_array.to_data())
998            .build()
999            .unwrap();
1000        let list = make_array(list);
1001
1002        let list_field = Field::new("list", list_type.clone(), false);
1003        let levels = calculate_array_levels(&list, &list_field).unwrap();
1004        assert_eq!(levels.len(), 1);
1005
1006        let expected_levels = ArrayLevels {
1007            def_levels: Some(vec![1; 5]),
1008            rep_levels: Some(vec![0; 5]),
1009            non_null_indices: (0..5).collect(),
1010            max_def_level: 1,
1011            max_rep_level: 1,
1012            array: Arc::new(leaf_array),
1013            logical_nulls: None,
1014        };
1015        assert_eq!(&levels[0], &expected_levels);
1016
1017        // array: [[0, 0], NULL, [2, 2], [3, 3, 3, 3], [4, 4, 4]]
1018        // all values are defined as we do not have nulls on the root (batch)
1019        // repetition:
1020        //   0: 0, 1
1021        //   1: 0
1022        //   2: 0, 1
1023        //   3: 0, 1, 1, 1
1024        //   4: 0, 1, 1
1025        let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
1026        let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
1027        let list = ArrayDataBuilder::new(list_type.clone())
1028            .len(5)
1029            .add_buffer(offsets)
1030            .add_child_data(leaf_array.to_data())
1031            .null_bit_buffer(Some(Buffer::from([0b00011101])))
1032            .build()
1033            .unwrap();
1034        let list = make_array(list);
1035
1036        let list_field = Field::new("list", list_type, true);
1037        let levels = calculate_array_levels(&list, &list_field).unwrap();
1038        assert_eq!(levels.len(), 1);
1039
1040        let expected_levels = ArrayLevels {
1041            def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1042            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1043            non_null_indices: (0..11).collect(),
1044            max_def_level: 2,
1045            max_rep_level: 1,
1046            array: Arc::new(leaf_array),
1047            logical_nulls: None,
1048        };
1049        assert_eq!(&levels[0], &expected_levels);
1050    }
1051
1052    #[test]
1053    fn test_calculate_array_levels_2() {
1054        // If some values are null
1055        //
1056        // This emulates an array in the form: <struct<list<?>>
1057        // with values:
1058        // - 0: [0, 1], but is null because of the struct
1059        // - 1: []
1060        // - 2: [2, 3], but is null because of the struct
1061        // - 3: [4, 5, 6, 7]
1062        // - 4: [8, 9, 10]
1063        //
1064        // If the first values of a list are null due to a parent, we have to still account for them
1065        // while indexing, because they would affect the way the child is indexed
1066        // i.e. in the above example, we have to know that [0, 1] has to be skipped
1067        let leaf = Int32Array::from_iter(0..11);
1068        let leaf_field = Field::new("leaf", DataType::Int32, false);
1069
1070        let list_type = DataType::List(Arc::new(leaf_field));
1071        let list = ArrayData::builder(list_type.clone())
1072            .len(5)
1073            .add_child_data(leaf.to_data())
1074            .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1075            .build()
1076            .unwrap();
1077
1078        let list = make_array(list);
1079        let list_field = Arc::new(Field::new("list", list_type, true));
1080
1081        let struct_array =
1082            StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1083        let array = Arc::new(struct_array) as ArrayRef;
1084
1085        let struct_field = Field::new("struct", array.data_type().clone(), true);
1086
1087        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1088        assert_eq!(levels.len(), 1);
1089
1090        let expected_levels = ArrayLevels {
1091            def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1092            rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1093            non_null_indices: (4..11).collect(),
1094            max_def_level: 3,
1095            max_rep_level: 1,
1096            array: Arc::new(leaf),
1097            logical_nulls: None,
1098        };
1099
1100        assert_eq!(&levels[0], &expected_levels);
1101
1102        // nested lists
1103
1104        // 0: [[100, 101], [102, 103]]
1105        // 1: []
1106        // 2: [[104, 105], [106, 107]]
1107        // 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
1108        // 4: [[116, 117], [118, 119], [120, 121]]
1109
1110        let leaf = Int32Array::from_iter(100..122);
1111        let leaf_field = Field::new("leaf", DataType::Int32, true);
1112
1113        let l1_type = DataType::List(Arc::new(leaf_field));
1114        let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1115        let l1 = ArrayData::builder(l1_type.clone())
1116            .len(11)
1117            .add_child_data(leaf.to_data())
1118            .add_buffer(offsets)
1119            .build()
1120            .unwrap();
1121
1122        let l1_field = Field::new("l1", l1_type, true);
1123        let l2_type = DataType::List(Arc::new(l1_field));
1124        let l2 = ArrayData::builder(l2_type)
1125            .len(5)
1126            .add_child_data(l1)
1127            .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1128            .build()
1129            .unwrap();
1130
1131        let l2 = make_array(l2);
1132        let l2_field = Field::new("l2", l2.data_type().clone(), true);
1133
1134        let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1135        assert_eq!(levels.len(), 1);
1136
1137        let expected_levels = ArrayLevels {
1138            def_levels: Some(vec![
1139                5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1140            ]),
1141            rep_levels: Some(vec![
1142                0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1143            ]),
1144            non_null_indices: (0..22).collect(),
1145            max_def_level: 5,
1146            max_rep_level: 2,
1147            array: Arc::new(leaf),
1148            logical_nulls: None,
1149        };
1150
1151        assert_eq!(&levels[0], &expected_levels);
1152    }
1153
1154    #[test]
1155    fn test_calculate_array_levels_nested_list() {
1156        let leaf_field = Field::new("leaf", DataType::Int32, false);
1157        let list_type = DataType::List(Arc::new(leaf_field));
1158
1159        // if all array values are defined (e.g. batch<list<_>>)
1160        // The array at this level looks like:
1161        // 0: [a]
1162        // 1: [a]
1163        // 2: [a]
1164        // 3: [a]
1165
1166        let leaf = Int32Array::from_iter([0; 4]);
1167        let list = ArrayData::builder(list_type.clone())
1168            .len(4)
1169            .add_buffer(Buffer::from_iter(0_i32..5))
1170            .add_child_data(leaf.to_data())
1171            .build()
1172            .unwrap();
1173        let list = make_array(list);
1174
1175        let list_field = Field::new("list", list_type.clone(), false);
1176        let levels = calculate_array_levels(&list, &list_field).unwrap();
1177        assert_eq!(levels.len(), 1);
1178
1179        let expected_levels = ArrayLevels {
1180            def_levels: Some(vec![1; 4]),
1181            rep_levels: Some(vec![0; 4]),
1182            non_null_indices: (0..4).collect(),
1183            max_def_level: 1,
1184            max_rep_level: 1,
1185            array: Arc::new(leaf),
1186            logical_nulls: None,
1187        };
1188        assert_eq!(&levels[0], &expected_levels);
1189
1190        // 0: null
1191        // 1: [1, 2, 3]
1192        // 2: [4, 5]
1193        // 3: [6, 7]
1194        let leaf = Int32Array::from_iter(0..8);
1195        let list = ArrayData::builder(list_type.clone())
1196            .len(4)
1197            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1198            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1199            .add_child_data(leaf.to_data())
1200            .build()
1201            .unwrap();
1202        let list = make_array(list);
1203        let list_field = Arc::new(Field::new("list", list_type, true));
1204
1205        let struct_array = StructArray::from(vec![(list_field, list)]);
1206        let array = Arc::new(struct_array) as ArrayRef;
1207
1208        let struct_field = Field::new("struct", array.data_type().clone(), true);
1209        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1210        assert_eq!(levels.len(), 1);
1211
1212        let expected_levels = ArrayLevels {
1213            def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1214            rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1215            non_null_indices: (0..7).collect(),
1216            max_def_level: 3,
1217            max_rep_level: 1,
1218            array: Arc::new(leaf),
1219            logical_nulls: None,
1220        };
1221        assert_eq!(&levels[0], &expected_levels);
1222
1223        // nested lists
1224        // In a JSON syntax with the schema: <struct<list<list<primitive>>>>, this translates into:
1225        // 0: {"struct": null }
1226        // 1: {"struct": [ [201], [202, 203], [] ]}
1227        // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
1228        // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
1229
1230        let leaf = Int32Array::from_iter(201..216);
1231        let leaf_field = Field::new("leaf", DataType::Int32, false);
1232        let list_1_type = DataType::List(Arc::new(leaf_field));
1233        let list_1 = ArrayData::builder(list_1_type.clone())
1234            .len(7)
1235            .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1236            .add_child_data(leaf.to_data())
1237            .build()
1238            .unwrap();
1239
1240        let list_1_field = Field::new("l1", list_1_type, true);
1241        let list_2_type = DataType::List(Arc::new(list_1_field));
1242        let list_2 = ArrayData::builder(list_2_type.clone())
1243            .len(4)
1244            .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1245            .null_bit_buffer(Some(Buffer::from([0b00001110])))
1246            .add_child_data(list_1)
1247            .build()
1248            .unwrap();
1249
1250        let list_2 = make_array(list_2);
1251        let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1252
1253        let struct_array =
1254            StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1255        let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1256
1257        let array = Arc::new(struct_array) as ArrayRef;
1258        let levels = calculate_array_levels(&array, &struct_field).unwrap();
1259        assert_eq!(levels.len(), 1);
1260
1261        let expected_levels = ArrayLevels {
1262            def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1263            rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1264            non_null_indices: (0..15).collect(),
1265            max_def_level: 5,
1266            max_rep_level: 2,
1267            array: Arc::new(leaf),
1268            logical_nulls: None,
1269        };
1270        assert_eq!(&levels[0], &expected_levels);
1271    }
1272
1273    #[test]
1274    fn test_calculate_nested_struct_levels() {
1275        // tests a <struct[a]<struct[b]<int[c]>>
1276        // array:
1277        //  - {a: {b: {c: 1}}}
1278        //  - {a: {b: {c: null}}}
1279        //  - {a: {b: {c: 3}}}
1280        //  - {a: {b: null}}
1281        //  - {a: null}}
1282        //  - {a: {b: {c: 6}}}
1283
1284        let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1285        let leaf = Arc::new(c) as ArrayRef;
1286        let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1287        let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1288
1289        let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1290        let a = StructArray::from((
1291            (vec![(b_field, Arc::new(b) as ArrayRef)]),
1292            Buffer::from([0b00101111]),
1293        ));
1294
1295        let a_field = Field::new("a", a.data_type().clone(), true);
1296        let a_array = Arc::new(a) as ArrayRef;
1297
1298        let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1299        assert_eq!(levels.len(), 1);
1300
1301        let logical_nulls = leaf.logical_nulls();
1302        let expected_levels = ArrayLevels {
1303            def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1304            rep_levels: None,
1305            non_null_indices: vec![0, 2, 5],
1306            max_def_level: 3,
1307            max_rep_level: 0,
1308            array: leaf,
1309            logical_nulls,
1310        };
1311        assert_eq!(&levels[0], &expected_levels);
1312    }
1313
1314    #[test]
1315    fn list_single_column() {
1316        // this tests the level generation from the arrow_writer equivalent test
1317
1318        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1319        let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1320        let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1321        let a_list_data = ArrayData::builder(a_list_type.clone())
1322            .len(5)
1323            .add_buffer(a_value_offsets)
1324            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1325            .add_child_data(a_values.to_data())
1326            .build()
1327            .unwrap();
1328
1329        assert_eq!(a_list_data.null_count(), 1);
1330
1331        let a = ListArray::from(a_list_data);
1332
1333        let item_field = Field::new_list_field(a_list_type, true);
1334        let mut builder = levels(&item_field, a);
1335        builder.write(2..4);
1336        let levels = builder.finish();
1337
1338        assert_eq!(levels.len(), 1);
1339
1340        let list_level = &levels[0];
1341
1342        let expected_level = ArrayLevels {
1343            def_levels: Some(vec![0, 3, 3, 3]),
1344            rep_levels: Some(vec![0, 0, 1, 1]),
1345            non_null_indices: vec![3, 4, 5],
1346            max_def_level: 3,
1347            max_rep_level: 1,
1348            array: Arc::new(a_values),
1349            logical_nulls: None,
1350        };
1351        assert_eq!(list_level, &expected_level);
1352    }
1353
1354    #[test]
1355    fn mixed_struct_list() {
1356        // this tests the level generation from the equivalent arrow_writer_complex test
1357
1358        // define schema
1359        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1360        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1361        let struct_field_g = Arc::new(Field::new(
1362            "g",
1363            DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1364            false,
1365        ));
1366        let struct_field_e = Arc::new(Field::new(
1367            "e",
1368            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1369            true,
1370        ));
1371        let schema = Schema::new(vec![
1372            Field::new("a", DataType::Int32, false),
1373            Field::new("b", DataType::Int32, true),
1374            Field::new(
1375                "c",
1376                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1377                true, // https://github.com/apache/arrow-rs/issues/245
1378            ),
1379        ]);
1380
1381        // create some data
1382        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1383        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1384        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1385        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1386
1387        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1388
1389        // Construct a buffer for value offsets, for the nested array:
1390        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1391        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1392
1393        // Construct a list array from the above two
1394        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1395            .len(5)
1396            .add_buffer(g_value_offsets)
1397            .add_child_data(g_value.into_data())
1398            .build()
1399            .unwrap();
1400        let g = ListArray::from(g_list_data);
1401
1402        let e = StructArray::from(vec![
1403            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1404            (struct_field_g, Arc::new(g) as ArrayRef),
1405        ]);
1406
1407        let c = StructArray::from(vec![
1408            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1409            (struct_field_e, Arc::new(e) as ArrayRef),
1410        ]);
1411
1412        // build a record batch
1413        let batch = RecordBatch::try_new(
1414            Arc::new(schema),
1415            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1416        )
1417        .unwrap();
1418
1419        //////////////////////////////////////////////
1420        // calculate the list's level
1421        let mut levels = vec![];
1422        batch
1423            .columns()
1424            .iter()
1425            .zip(batch.schema().fields())
1426            .for_each(|(array, field)| {
1427                let mut array_levels = calculate_array_levels(array, field).unwrap();
1428                levels.append(&mut array_levels);
1429            });
1430        assert_eq!(levels.len(), 5);
1431
1432        // test "a" levels
1433        let list_level = &levels[0];
1434
1435        let expected_level = ArrayLevels {
1436            def_levels: None,
1437            rep_levels: None,
1438            non_null_indices: vec![0, 1, 2, 3, 4],
1439            max_def_level: 0,
1440            max_rep_level: 0,
1441            array: Arc::new(a),
1442            logical_nulls: None,
1443        };
1444        assert_eq!(list_level, &expected_level);
1445
1446        // test "b" levels
1447        let list_level = levels.get(1).unwrap();
1448
1449        let b_logical_nulls = b.logical_nulls();
1450        let expected_level = ArrayLevels {
1451            def_levels: Some(vec![1, 0, 0, 1, 1]),
1452            rep_levels: None,
1453            non_null_indices: vec![0, 3, 4],
1454            max_def_level: 1,
1455            max_rep_level: 0,
1456            array: Arc::new(b),
1457            logical_nulls: b_logical_nulls,
1458        };
1459        assert_eq!(list_level, &expected_level);
1460
1461        // test "d" levels
1462        let list_level = levels.get(2).unwrap();
1463
1464        let d_logical_nulls = d.logical_nulls();
1465        let expected_level = ArrayLevels {
1466            def_levels: Some(vec![1, 1, 1, 2, 1]),
1467            rep_levels: None,
1468            non_null_indices: vec![3],
1469            max_def_level: 2,
1470            max_rep_level: 0,
1471            array: Arc::new(d),
1472            logical_nulls: d_logical_nulls,
1473        };
1474        assert_eq!(list_level, &expected_level);
1475
1476        // test "f" levels
1477        let list_level = levels.get(3).unwrap();
1478
1479        let f_logical_nulls = f.logical_nulls();
1480        let expected_level = ArrayLevels {
1481            def_levels: Some(vec![3, 2, 3, 2, 3]),
1482            rep_levels: None,
1483            non_null_indices: vec![0, 2, 4],
1484            max_def_level: 3,
1485            max_rep_level: 0,
1486            array: Arc::new(f),
1487            logical_nulls: f_logical_nulls,
1488        };
1489        assert_eq!(list_level, &expected_level);
1490    }
1491
1492    #[test]
1493    fn test_null_vs_nonnull_struct() {
1494        // define schema
1495        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1496        let schema = Schema::new(vec![Field::new(
1497            "some_nested_object",
1498            DataType::Struct(vec![offset_field.clone()].into()),
1499            false,
1500        )]);
1501
1502        // create some data
1503        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1504
1505        let some_nested_object =
1506            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1507
1508        // build a record batch
1509        let batch =
1510            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1511
1512        let struct_null_level =
1513            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1514
1515        // create second batch
1516        // define schema
1517        let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1518        let schema = Schema::new(vec![Field::new(
1519            "some_nested_object",
1520            DataType::Struct(vec![offset_field.clone()].into()),
1521            true,
1522        )]);
1523
1524        // create some data
1525        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1526
1527        let some_nested_object =
1528            StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1529
1530        // build a record batch
1531        let batch =
1532            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1533
1534        let struct_non_null_level =
1535            calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1536
1537        // The 2 levels should not be the same
1538        if struct_non_null_level == struct_null_level {
1539            panic!("Levels should not be equal, to reflect the difference in struct nullness");
1540        }
1541    }
1542
1543    #[test]
1544    fn test_map_array() {
1545        // Note: we are using the JSON Arrow reader for brevity
1546        let json_content = r#"
1547        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1548        {"stocks":{"long": "$CCC", "short": null}}
1549        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1550        "#;
1551        let entries_struct_type = DataType::Struct(Fields::from(vec![
1552            Field::new("key", DataType::Utf8, false),
1553            Field::new("value", DataType::Utf8, true),
1554        ]));
1555        let stocks_field = Field::new(
1556            "stocks",
1557            DataType::Map(
1558                Arc::new(Field::new("entries", entries_struct_type, false)),
1559                false,
1560            ),
1561            // not nullable, so the keys have max level = 1
1562            false,
1563        );
1564        let schema = Arc::new(Schema::new(vec![stocks_field]));
1565        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1566        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1567
1568        let batch = reader.next().unwrap().unwrap();
1569
1570        // calculate the map's level
1571        let mut levels = vec![];
1572        batch
1573            .columns()
1574            .iter()
1575            .zip(batch.schema().fields())
1576            .for_each(|(array, field)| {
1577                let mut array_levels = calculate_array_levels(array, field).unwrap();
1578                levels.append(&mut array_levels);
1579            });
1580        assert_eq!(levels.len(), 2);
1581
1582        let map = batch.column(0).as_map();
1583        let map_keys_logical_nulls = map.keys().logical_nulls();
1584
1585        // test key levels
1586        let list_level = &levels[0];
1587
1588        let expected_level = ArrayLevels {
1589            def_levels: Some(vec![1; 7]),
1590            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1591            non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1592            max_def_level: 1,
1593            max_rep_level: 1,
1594            array: map.keys().clone(),
1595            logical_nulls: map_keys_logical_nulls,
1596        };
1597        assert_eq!(list_level, &expected_level);
1598
1599        // test values levels
1600        let list_level = levels.get(1).unwrap();
1601        let map_values_logical_nulls = map.values().logical_nulls();
1602
1603        let expected_level = ArrayLevels {
1604            def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1605            rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1606            non_null_indices: vec![0, 1, 2, 4, 6],
1607            max_def_level: 2,
1608            max_rep_level: 1,
1609            array: map.values().clone(),
1610            logical_nulls: map_values_logical_nulls,
1611        };
1612        assert_eq!(list_level, &expected_level);
1613    }
1614
1615    #[test]
1616    fn test_list_of_struct() {
1617        // define schema
1618        let int_field = Field::new("a", DataType::Int32, true);
1619        let fields = Fields::from([Arc::new(int_field)]);
1620        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1621        let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1622
1623        let int_builder = Int32Builder::with_capacity(10);
1624        let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1625        let mut list_builder = ListBuilder::new(struct_builder);
1626
1627        // [{a: 1}], [], null, [null, null], [{a: null}], [{a: 2}]
1628        //
1629        // [{a: 1}]
1630        let values = list_builder.values();
1631        values
1632            .field_builder::<Int32Builder>(0)
1633            .unwrap()
1634            .append_value(1);
1635        values.append(true);
1636        list_builder.append(true);
1637
1638        // []
1639        list_builder.append(true);
1640
1641        // null
1642        list_builder.append(false);
1643
1644        // [null, null]
1645        let values = list_builder.values();
1646        values
1647            .field_builder::<Int32Builder>(0)
1648            .unwrap()
1649            .append_null();
1650        values.append(false);
1651        values
1652            .field_builder::<Int32Builder>(0)
1653            .unwrap()
1654            .append_null();
1655        values.append(false);
1656        list_builder.append(true);
1657
1658        // [{a: null}]
1659        let values = list_builder.values();
1660        values
1661            .field_builder::<Int32Builder>(0)
1662            .unwrap()
1663            .append_null();
1664        values.append(true);
1665        list_builder.append(true);
1666
1667        // [{a: 2}]
1668        let values = list_builder.values();
1669        values
1670            .field_builder::<Int32Builder>(0)
1671            .unwrap()
1672            .append_value(2);
1673        values.append(true);
1674        list_builder.append(true);
1675
1676        let array = Arc::new(list_builder.finish());
1677
1678        let values = array.values().as_struct().column(0).clone();
1679        let values_len = values.len();
1680        assert_eq!(values_len, 5);
1681
1682        let schema = Arc::new(Schema::new(vec![list_field]));
1683
1684        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1685
1686        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1687        let list_level = &levels[0];
1688
1689        let logical_nulls = values.logical_nulls();
1690        let expected_level = ArrayLevels {
1691            def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1692            rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1693            non_null_indices: vec![0, 4],
1694            max_def_level: 4,
1695            max_rep_level: 1,
1696            array: values,
1697            logical_nulls,
1698        };
1699
1700        assert_eq!(list_level, &expected_level);
1701    }
1702
1703    #[test]
1704    fn test_struct_mask_list() {
1705        // Test the null mask of a struct array masking out non-empty slices of a child ListArray
1706        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1707            Some(vec![Some(1), Some(2)]),
1708            Some(vec![None]),
1709            Some(vec![]),
1710            Some(vec![Some(3), None]), // Masked by struct array
1711            Some(vec![Some(4), Some(5)]),
1712            None, // Masked by struct array
1713            None,
1714        ]);
1715        let values = inner.values().clone();
1716
1717        // This test assumes that nulls don't take up space
1718        assert_eq!(inner.values().len(), 7);
1719
1720        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1721        let array = Arc::new(inner) as ArrayRef;
1722        let nulls = Buffer::from([0b01010111]);
1723        let struct_a = StructArray::from((vec![(field, array)], nulls));
1724
1725        let field = Field::new("struct", struct_a.data_type().clone(), true);
1726        let array = Arc::new(struct_a) as ArrayRef;
1727        let levels = calculate_array_levels(&array, &field).unwrap();
1728
1729        assert_eq!(levels.len(), 1);
1730
1731        let logical_nulls = values.logical_nulls();
1732        let expected_level = ArrayLevels {
1733            def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1734            rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1735            non_null_indices: vec![0, 1, 5, 6],
1736            max_def_level: 4,
1737            max_rep_level: 1,
1738            array: values,
1739            logical_nulls,
1740        };
1741
1742        assert_eq!(&levels[0], &expected_level);
1743    }
1744
1745    #[test]
1746    fn test_list_mask_struct() {
1747        // Test the null mask of a struct array and the null mask of a list array
1748        // masking out non-null elements of their children
1749
1750        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1751            Some(vec![None]), // Masked by list array
1752            Some(vec![]),     // Masked by list array
1753            Some(vec![Some(3), None]),
1754            Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array
1755            None,
1756            None,
1757        ]);
1758        let a1_values = a1.values().clone();
1759        let a1 = Arc::new(a1) as ArrayRef;
1760
1761        let a2 = Arc::new(Int32Array::from_iter(vec![
1762            Some(1), // Masked by list array
1763            Some(2), // Masked by list array
1764            None,
1765            Some(4), // Masked by struct array
1766            Some(5),
1767            None,
1768        ])) as ArrayRef;
1769        let a2_values = a2.clone();
1770
1771        let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1772        let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1773
1774        let nulls = Buffer::from([0b00110111]);
1775        let struct_a = Arc::new(StructArray::from((
1776            vec![(field_a1, a1), (field_a2, a2)],
1777            nulls,
1778        ))) as ArrayRef;
1779
1780        let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1781        let nulls = Buffer::from([0b00111100]);
1782
1783        let list_type = DataType::List(Arc::new(Field::new(
1784            "struct",
1785            struct_a.data_type().clone(),
1786            true,
1787        )));
1788
1789        let data = ArrayDataBuilder::new(list_type.clone())
1790            .len(6)
1791            .null_bit_buffer(Some(nulls))
1792            .add_buffer(offsets)
1793            .add_child_data(struct_a.into_data())
1794            .build()
1795            .unwrap();
1796
1797        let list = make_array(data);
1798        let list_field = Field::new("col", list_type, true);
1799
1800        let expected = vec![
1801            r#""#.to_string(),
1802            r#""#.to_string(),
1803            r#"[]"#.to_string(),
1804            r#"[{list: [3, ], integers: }]"#.to_string(),
1805            r#"[, {list: , integers: 5}]"#.to_string(),
1806            r#"[]"#.to_string(),
1807        ];
1808
1809        let actual: Vec<_> = (0..6)
1810            .map(|x| array_value_to_string(&list, x).unwrap())
1811            .collect();
1812        assert_eq!(actual, expected);
1813
1814        let levels = calculate_array_levels(&list, &list_field).unwrap();
1815
1816        assert_eq!(levels.len(), 2);
1817
1818        let a1_logical_nulls = a1_values.logical_nulls();
1819        let expected_level = ArrayLevels {
1820            def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1821            rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1822            non_null_indices: vec![1],
1823            max_def_level: 6,
1824            max_rep_level: 2,
1825            array: a1_values,
1826            logical_nulls: a1_logical_nulls,
1827        };
1828
1829        assert_eq!(&levels[0], &expected_level);
1830
1831        let a2_logical_nulls = a2_values.logical_nulls();
1832        let expected_level = ArrayLevels {
1833            def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1834            rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1835            non_null_indices: vec![4],
1836            max_def_level: 4,
1837            max_rep_level: 1,
1838            array: a2_values,
1839            logical_nulls: a2_logical_nulls,
1840        };
1841
1842        assert_eq!(&levels[1], &expected_level);
1843    }
1844
1845    #[test]
1846    fn test_fixed_size_list() {
1847        // [[1, 2], null, null, [7, 8], null]
1848        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1849        builder.values().append_slice(&[1, 2]);
1850        builder.append(true);
1851        builder.values().append_slice(&[3, 4]);
1852        builder.append(false);
1853        builder.values().append_slice(&[5, 6]);
1854        builder.append(false);
1855        builder.values().append_slice(&[7, 8]);
1856        builder.append(true);
1857        builder.values().append_slice(&[9, 10]);
1858        builder.append(false);
1859        let a = builder.finish();
1860        let values = a.values().clone();
1861
1862        let item_field = Field::new_list_field(a.data_type().clone(), true);
1863        let mut builder = levels(&item_field, a);
1864        builder.write(1..4);
1865        let levels = builder.finish();
1866
1867        assert_eq!(levels.len(), 1);
1868
1869        let list_level = &levels[0];
1870
1871        let logical_nulls = values.logical_nulls();
1872        let expected_level = ArrayLevels {
1873            def_levels: Some(vec![0, 0, 3, 3]),
1874            rep_levels: Some(vec![0, 0, 0, 1]),
1875            non_null_indices: vec![6, 7],
1876            max_def_level: 3,
1877            max_rep_level: 1,
1878            array: values,
1879            logical_nulls,
1880        };
1881        assert_eq!(list_level, &expected_level);
1882    }
1883
1884    #[test]
1885    fn test_fixed_size_list_of_struct() {
1886        // define schema
1887        let field_a = Field::new("a", DataType::Int32, true);
1888        let field_b = Field::new("b", DataType::Int64, false);
1889        let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1890        let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1891        let list_field = Field::new(
1892            "list",
1893            DataType::FixedSizeList(Arc::new(item_field), 2),
1894            true,
1895        );
1896
1897        let builder_a = Int32Builder::with_capacity(10);
1898        let builder_b = Int64Builder::with_capacity(10);
1899        let struct_builder =
1900            StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1901        let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1902
1903        // [
1904        //   [{a: 1, b: 2}, null],
1905        //   null,
1906        //   [null, null],
1907        //   [{a: null, b: 3}, {a: 2, b: 4}]
1908        // ]
1909
1910        // [{a: 1, b: 2}, null]
1911        let values = list_builder.values();
1912        // {a: 1, b: 2}
1913        values
1914            .field_builder::<Int32Builder>(0)
1915            .unwrap()
1916            .append_value(1);
1917        values
1918            .field_builder::<Int64Builder>(1)
1919            .unwrap()
1920            .append_value(2);
1921        values.append(true);
1922        // null
1923        values
1924            .field_builder::<Int32Builder>(0)
1925            .unwrap()
1926            .append_null();
1927        values
1928            .field_builder::<Int64Builder>(1)
1929            .unwrap()
1930            .append_value(0);
1931        values.append(false);
1932        list_builder.append(true);
1933
1934        // null
1935        let values = list_builder.values();
1936        // null
1937        values
1938            .field_builder::<Int32Builder>(0)
1939            .unwrap()
1940            .append_null();
1941        values
1942            .field_builder::<Int64Builder>(1)
1943            .unwrap()
1944            .append_value(0);
1945        values.append(false);
1946        // null
1947        values
1948            .field_builder::<Int32Builder>(0)
1949            .unwrap()
1950            .append_null();
1951        values
1952            .field_builder::<Int64Builder>(1)
1953            .unwrap()
1954            .append_value(0);
1955        values.append(false);
1956        list_builder.append(false);
1957
1958        // [null, null]
1959        let values = list_builder.values();
1960        // null
1961        values
1962            .field_builder::<Int32Builder>(0)
1963            .unwrap()
1964            .append_null();
1965        values
1966            .field_builder::<Int64Builder>(1)
1967            .unwrap()
1968            .append_value(0);
1969        values.append(false);
1970        // null
1971        values
1972            .field_builder::<Int32Builder>(0)
1973            .unwrap()
1974            .append_null();
1975        values
1976            .field_builder::<Int64Builder>(1)
1977            .unwrap()
1978            .append_value(0);
1979        values.append(false);
1980        list_builder.append(true);
1981
1982        // [{a: null, b: 3}, {a: 2, b: 4}]
1983        let values = list_builder.values();
1984        // {a: null, b: 3}
1985        values
1986            .field_builder::<Int32Builder>(0)
1987            .unwrap()
1988            .append_null();
1989        values
1990            .field_builder::<Int64Builder>(1)
1991            .unwrap()
1992            .append_value(3);
1993        values.append(true);
1994        // {a: 2, b: 4}
1995        values
1996            .field_builder::<Int32Builder>(0)
1997            .unwrap()
1998            .append_value(2);
1999        values
2000            .field_builder::<Int64Builder>(1)
2001            .unwrap()
2002            .append_value(4);
2003        values.append(true);
2004        list_builder.append(true);
2005
2006        let array = Arc::new(list_builder.finish());
2007
2008        assert_eq!(array.values().len(), 8);
2009        assert_eq!(array.len(), 4);
2010
2011        let struct_values = array.values().as_struct();
2012        let values_a = struct_values.column(0).clone();
2013        let values_b = struct_values.column(1).clone();
2014
2015        let schema = Arc::new(Schema::new(vec![list_field]));
2016        let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2017
2018        let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2019        let a_levels = &levels[0];
2020        let b_levels = &levels[1];
2021
2022        // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
2023        let values_a_logical_nulls = values_a.logical_nulls();
2024        let expected_a = ArrayLevels {
2025            def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
2026            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2027            non_null_indices: vec![0, 7],
2028            max_def_level: 4,
2029            max_rep_level: 1,
2030            array: values_a,
2031            logical_nulls: values_a_logical_nulls,
2032        };
2033        // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
2034        let values_b_logical_nulls = values_b.logical_nulls();
2035        let expected_b = ArrayLevels {
2036            def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
2037            rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2038            non_null_indices: vec![0, 6, 7],
2039            max_def_level: 3,
2040            max_rep_level: 1,
2041            array: values_b,
2042            logical_nulls: values_b_logical_nulls,
2043        };
2044
2045        assert_eq!(a_levels, &expected_a);
2046        assert_eq!(b_levels, &expected_b);
2047    }
2048
2049    #[test]
2050    fn test_fixed_size_list_empty() {
2051        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2052        builder.append(true);
2053        builder.append(false);
2054        builder.append(true);
2055        let array = builder.finish();
2056        let values = array.values().clone();
2057
2058        let item_field = Field::new_list_field(array.data_type().clone(), true);
2059        let mut builder = levels(&item_field, array);
2060        builder.write(0..3);
2061        let levels = builder.finish();
2062
2063        assert_eq!(levels.len(), 1);
2064
2065        let list_level = &levels[0];
2066
2067        let logical_nulls = values.logical_nulls();
2068        let expected_level = ArrayLevels {
2069            def_levels: Some(vec![1, 0, 1]),
2070            rep_levels: Some(vec![0, 0, 0]),
2071            non_null_indices: vec![],
2072            max_def_level: 3,
2073            max_rep_level: 1,
2074            array: values,
2075            logical_nulls,
2076        };
2077        assert_eq!(list_level, &expected_level);
2078    }
2079
2080    #[test]
2081    fn test_fixed_size_list_of_var_lists() {
2082        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
2083        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2084        builder.values().append_value([Some(1), None, Some(3)]);
2085        builder.values().append_null();
2086        builder.append(true);
2087        builder.values().append_value([Some(4)]);
2088        builder.values().append_value([]);
2089        builder.append(true);
2090        builder.values().append_value([Some(5), Some(6)]);
2091        builder.values().append_value([None, None]);
2092        builder.append(true);
2093        builder.values().append_null();
2094        builder.values().append_null();
2095        builder.append(false);
2096        let a = builder.finish();
2097        let values = a.values().as_list::<i32>().values().clone();
2098
2099        let item_field = Field::new_list_field(a.data_type().clone(), true);
2100        let mut builder = levels(&item_field, a);
2101        builder.write(0..4);
2102        let levels = builder.finish();
2103
2104        let logical_nulls = values.logical_nulls();
2105        let expected_level = ArrayLevels {
2106            def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2107            rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2108            non_null_indices: vec![0, 2, 3, 4, 5],
2109            max_def_level: 5,
2110            max_rep_level: 2,
2111            array: values,
2112            logical_nulls,
2113        };
2114
2115        assert_eq!(levels[0], expected_level);
2116    }
2117
2118    #[test]
2119    fn test_null_dictionary_values() {
2120        let values = Int32Array::new(
2121            vec![1, 2, 3, 4].into(),
2122            Some(NullBuffer::from(vec![true, false, true, true])),
2123        );
2124        let keys = Int32Array::new(
2125            vec![1, 54, 2, 0].into(),
2126            Some(NullBuffer::from(vec![true, false, true, true])),
2127        );
2128        // [NULL, NULL, 3, 0]
2129        let dict = DictionaryArray::new(keys, Arc::new(values));
2130
2131        let item_field = Field::new_list_field(dict.data_type().clone(), true);
2132
2133        let mut builder = levels(&item_field, dict.clone());
2134        builder.write(0..4);
2135        let levels = builder.finish();
2136
2137        let logical_nulls = dict.logical_nulls();
2138        let expected_level = ArrayLevels {
2139            def_levels: Some(vec![0, 0, 1, 1]),
2140            rep_levels: None,
2141            non_null_indices: vec![2, 3],
2142            max_def_level: 1,
2143            max_rep_level: 0,
2144            array: Arc::new(dict),
2145            logical_nulls,
2146        };
2147        assert_eq!(levels[0], expected_level);
2148    }
2149
2150    #[test]
2151    fn mismatched_types() {
2152        let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2153        let field = Field::new_list_field(DataType::Float64, false);
2154
2155        let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2156            .unwrap_err()
2157            .to_string();
2158
2159        assert_eq!(
2160            err,
2161            "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2162        );
2163    }
2164
2165    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2166        let v = Arc::new(array) as ArrayRef;
2167        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2168    }
2169
2170    #[test]
2171    fn test_slice_for_chunk_flat() {
2172        // Case 1: required field (max_def_level=0, no def/rep levels stored).
2173        // Array has 6 values; all are non-null so non_null_indices covers every position.
2174        // value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4].
2175        // Array is sliced (no def_levels → write_batch_internal uses values.len()).
2176        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2177        let logical_nulls = array.logical_nulls();
2178        let levels = ArrayLevels {
2179            def_levels: None,
2180            rep_levels: None,
2181            non_null_indices: vec![0, 1, 2, 3, 4, 5],
2182            max_def_level: 0,
2183            max_rep_level: 0,
2184            array,
2185            logical_nulls,
2186        };
2187        let sliced = levels.slice_for_chunk(&CdcChunk {
2188            level_offset: 0,
2189            num_levels: 0,
2190            value_offset: 2,
2191            num_values: 3,
2192        });
2193        assert!(sliced.def_levels.is_none());
2194        assert!(sliced.rep_levels.is_none());
2195        assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2196        assert_eq!(sliced.array.len(), 3);
2197
2198        // Case 2: optional field (max_def_level=1, def levels present, no rep levels).
2199        // Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
2200        // non_null_indices: [0, 2, 4, 5]
2201        // value_offset=1, num_values=1 → non_null_indices[1..2] = [2].
2202        // Array is not sliced (def_levels present → num_levels from def_levels.len()).
2203        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2204            Some(1),
2205            None,
2206            Some(3),
2207            None,
2208            Some(5),
2209            Some(6),
2210        ]));
2211        let logical_nulls = array.logical_nulls();
2212        let levels = ArrayLevels {
2213            def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
2214            rep_levels: None,
2215            non_null_indices: vec![0, 2, 4, 5],
2216            max_def_level: 1,
2217            max_rep_level: 0,
2218            array,
2219            logical_nulls,
2220        };
2221        let sliced = levels.slice_for_chunk(&CdcChunk {
2222            level_offset: 1,
2223            num_levels: 3,
2224            value_offset: 1,
2225            num_values: 1,
2226        });
2227        assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
2228        assert!(sliced.rep_levels.is_none());
2229        assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2 (nni[0])
2230        assert_eq!(sliced.array.len(), 1);
2231    }
2232
2233    #[test]
2234    fn test_slice_for_chunk_nested_with_nulls() {
2235        // Regression test for https://github.com/apache/arrow-rs/issues/9637
2236        //
2237        // Simulates a List<Int32?> where null list entries have non-zero child
2238        // ranges (valid per Arrow spec: "a null value may correspond to a
2239        // non-empty segment in the child array"). This creates gaps in the
2240        // leaf array that don't correspond to any levels.
2241        //
2242        // 5 rows with 2 null list entries owning non-empty child ranges:
2243        //   row 0: [1]       → leaf[0]
2244        //   row 1: null list → owns leaf[1..3] (gap of 2)
2245        //   row 2: [2, null] → leaf[3], leaf[4]=null element
2246        //   row 3: null list → owns leaf[5..8] (gap of 3)
2247        //   row 4: [4, 5]   → leaf[8], leaf[9]
2248        //
2249        // def_levels: [3,  0,  3, 2,  0,  3, 3]
2250        // rep_levels: [0,  0,  0, 1,  0,  0, 1]
2251        // non_null_indices: [0, 3, 8, 9]
2252        //   gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7)
2253        let array: ArrayRef = Arc::new(Int32Array::from(vec![
2254            Some(1), // 0: row 0
2255            None,    // 1: gap (null list row 1)
2256            None,    // 2: gap (null list row 1)
2257            Some(2), // 3: row 2
2258            None,    // 4: row 2, null element
2259            None,    // 5: gap (null list row 3)
2260            None,    // 6: gap (null list row 3)
2261            None,    // 7: gap (null list row 3)
2262            Some(4), // 8: row 4
2263            Some(5), // 9: row 4
2264        ]));
2265        let logical_nulls = array.logical_nulls();
2266        let levels = ArrayLevels {
2267            def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
2268            rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
2269            non_null_indices: vec![0, 3, 8, 9],
2270            max_def_level: 3,
2271            max_rep_level: 1,
2272            array,
2273            logical_nulls,
2274        };
2275
2276        // Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1]
2277        let chunk0 = levels.slice_for_chunk(&CdcChunk {
2278            level_offset: 0,
2279            num_levels: 2,
2280            value_offset: 0,
2281            num_values: 1,
2282        });
2283        assert_eq!(chunk0.non_null_indices, vec![0]);
2284        assert_eq!(chunk0.array.len(), 1);
2285
2286        // Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4]
2287        let chunk1 = levels.slice_for_chunk(&CdcChunk {
2288            level_offset: 2,
2289            num_levels: 3,
2290            value_offset: 1,
2291            num_values: 1,
2292        });
2293        assert_eq!(chunk1.non_null_indices, vec![0]);
2294        assert_eq!(chunk1.array.len(), 1);
2295
2296        // Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10]
2297        let chunk2 = levels.slice_for_chunk(&CdcChunk {
2298            level_offset: 5,
2299            num_levels: 2,
2300            value_offset: 2,
2301            num_values: 2,
2302        });
2303        assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2304        assert_eq!(chunk2.array.len(), 2);
2305    }
2306
2307    #[test]
2308    fn test_slice_for_chunk_all_null() {
2309        // All-null chunk: num_values=0 → empty nni slice → zero-length array.
2310        let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2311        let logical_nulls = array.logical_nulls();
2312        let levels = ArrayLevels {
2313            def_levels: Some(vec![1, 0, 0, 1]),
2314            rep_levels: None,
2315            non_null_indices: vec![0, 3],
2316            max_def_level: 1,
2317            max_rep_level: 0,
2318            array,
2319            logical_nulls,
2320        };
2321        // Chunk covering only the two null rows (levels 1..3), zero non-null values.
2322        let sliced = levels.slice_for_chunk(&CdcChunk {
2323            level_offset: 1,
2324            num_levels: 2,
2325            value_offset: 1,
2326            num_values: 0,
2327        });
2328        assert_eq!(sliced.def_levels, Some(vec![0, 0]));
2329        assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2330        assert_eq!(sliced.array.len(), 0);
2331    }
2332}