1use crate::column::chunker::CdcChunk;
44use crate::column::writer::LevelDataRef;
45use crate::errors::{ParquetError, Result};
46use arrow_array::cast::AsArray;
47use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
48use arrow_buffer::bit_iterator::BitIndexIterator;
49use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
50use arrow_schema::{DataType, Field};
51use std::ops::Range;
52use std::sync::Arc;
53
54pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
57 let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
58 builder.write(0..array.len());
59 Ok(builder.finish())
60}
61
62fn is_leaf(data_type: &DataType) -> bool {
65 matches!(
66 data_type,
67 DataType::Null
68 | DataType::Boolean
69 | DataType::Int8
70 | DataType::Int16
71 | DataType::Int32
72 | DataType::Int64
73 | DataType::UInt8
74 | DataType::UInt16
75 | DataType::UInt32
76 | DataType::UInt64
77 | DataType::Float16
78 | DataType::Float32
79 | DataType::Float64
80 | DataType::Utf8
81 | DataType::Utf8View
82 | DataType::LargeUtf8
83 | DataType::Timestamp(_, _)
84 | DataType::Date32
85 | DataType::Date64
86 | DataType::Time32(_)
87 | DataType::Time64(_)
88 | DataType::Duration(_)
89 | DataType::Interval(_)
90 | DataType::Binary
91 | DataType::LargeBinary
92 | DataType::BinaryView
93 | DataType::Decimal32(_, _)
94 | DataType::Decimal64(_, _)
95 | DataType::Decimal128(_, _)
96 | DataType::Decimal256(_, _)
97 | DataType::FixedSizeBinary(_)
98 )
99}
100
101#[derive(Debug, Default, Clone, Copy)]
103struct LevelContext {
104 rep_level: i16,
106 def_level: i16,
108}
109
110#[derive(Debug)]
112enum LevelInfoBuilder {
113 Primitive(ArrayLevels),
115 List(
117 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i32>, Option<NullBuffer>, ),
122 LargeList(
124 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i64>, Option<NullBuffer>, ),
129 FixedSizeList(
131 Box<LevelInfoBuilder>, LevelContext, usize, Option<NullBuffer>, ),
136 ListView(
138 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i32>, ScalarBuffer<i32>, Option<NullBuffer>, ),
144 LargeListView(
146 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i64>, ScalarBuffer<i64>, Option<NullBuffer>, ),
152 Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
154}
155
156const BULK_FILL_MIN_LEN: usize = 64;
162
163impl LevelInfoBuilder {
164 fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
166 if !Self::types_compatible(field.data_type(), array.data_type()) {
167 return Err(arrow_err!(format!(
168 "Incompatible type. Field '{}' has type {}, array has type {}",
169 field.name(),
170 field.data_type(),
171 array.data_type(),
172 )));
173 }
174
175 let is_nullable = field.is_nullable();
176
177 match array.data_type() {
178 d if is_leaf(d) => {
179 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
180 Ok(Self::Primitive(levels))
181 }
182 DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
183 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
184 Ok(Self::Primitive(levels))
185 }
186 DataType::Struct(children) => {
187 let array = array.as_struct();
188 let def_level = match is_nullable {
189 true => parent_ctx.def_level + 1,
190 false => parent_ctx.def_level,
191 };
192
193 let ctx = LevelContext {
194 rep_level: parent_ctx.rep_level,
195 def_level,
196 };
197
198 let children = children
199 .iter()
200 .zip(array.columns())
201 .map(|(f, a)| Self::try_new(f, ctx, a))
202 .collect::<Result<_>>()?;
203
204 Ok(Self::Struct(children, ctx, array.nulls().cloned()))
205 }
206 DataType::List(child)
207 | DataType::LargeList(child)
208 | DataType::Map(child, _)
209 | DataType::FixedSizeList(child, _)
210 | DataType::ListView(child)
211 | DataType::LargeListView(child) => {
212 let def_level = match is_nullable {
213 true => parent_ctx.def_level + 2,
214 false => parent_ctx.def_level + 1,
215 };
216
217 let ctx = LevelContext {
218 rep_level: parent_ctx.rep_level + 1,
219 def_level,
220 };
221
222 Ok(match field.data_type() {
223 DataType::List(_) => {
224 let list = array.as_list();
225 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
226 let offsets = list.offsets().clone();
227 Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
228 }
229 DataType::LargeList(_) => {
230 let list = array.as_list();
231 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
232 let offsets = list.offsets().clone();
233 let nulls = list.nulls().cloned();
234 Self::LargeList(Box::new(child), ctx, offsets, nulls)
235 }
236 DataType::Map(_, _) => {
237 let map = array.as_map();
238 let entries = Arc::new(map.entries().clone()) as ArrayRef;
239 let child = Self::try_new(child.as_ref(), ctx, &entries)?;
240 let offsets = map.offsets().clone();
241 Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
242 }
243 DataType::FixedSizeList(_, size) => {
244 let list = array.as_fixed_size_list();
245 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
246 let nulls = list.nulls().cloned();
247 Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
248 }
249 DataType::ListView(_) => {
250 let list = array.as_list_view();
251 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
252 let offsets = list.offsets().clone();
253 let sizes = list.sizes().clone();
254 let nulls = list.nulls().cloned();
255 Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
256 }
257 DataType::LargeListView(_) => {
258 let list = array.as_list_view();
259 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
260 let offsets = list.offsets().clone();
261 let sizes = list.sizes().clone();
262 let nulls = list.nulls().cloned();
263 Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
264 }
265 _ => unreachable!(),
266 })
267 }
268 d => Err(nyi_err!("Datatype {} is not yet supported", d)),
269 }
270 }
271
272 fn finish(self) -> Vec<ArrayLevels> {
275 match self {
276 LevelInfoBuilder::Primitive(v) => vec![v],
277 LevelInfoBuilder::List(v, _, _, _)
278 | LevelInfoBuilder::LargeList(v, _, _, _)
279 | LevelInfoBuilder::FixedSizeList(v, _, _, _)
280 | LevelInfoBuilder::ListView(v, _, _, _, _)
281 | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
282 LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
283 }
284 }
285
286 fn write(&mut self, range: Range<usize>) {
288 match self {
289 LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
290 LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
291 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
292 }
293 LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
294 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
295 }
296 LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
297 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
298 }
299 LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
300 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
301 }
302 LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
303 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
304 }
305 LevelInfoBuilder::Struct(children, ctx, nulls) => {
306 Self::write_struct(children, ctx, nulls.as_ref(), range)
307 }
308 }
309 }
310
311 fn write_list<O: OffsetSizeTrait>(
315 child: &mut LevelInfoBuilder,
316 ctx: &LevelContext,
317 offsets: &[O],
318 nulls: Option<&NullBuffer>,
319 range: Range<usize>,
320 ) {
321 if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
323 let count = range.end - range.start;
324 child.visit_leaves(|leaf| {
325 leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
326 });
327 return;
328 }
329
330 let offsets = &offsets[range.start..range.end + 1];
331
332 let write_non_null_slice =
333 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
334 child.write(start_idx..end_idx);
335 child.visit_leaves(|leaf| {
336 let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
337 let mut rev = rep_levels.iter_mut().rev();
338 let mut remaining = end_idx - start_idx;
339
340 loop {
341 let next = rev.next().unwrap();
342 if *next > ctx.rep_level {
343 continue;
345 }
346
347 remaining -= 1;
348 if remaining == 0 {
349 *next = ctx.rep_level - 1;
350 break;
351 }
352 }
353 })
354 };
355
356 let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
364 if count > 0 {
365 child.visit_leaves(|leaf| {
366 leaf.append_rep_level_run(ctx.rep_level - 1, count);
367 leaf.append_def_level_run(ctx.def_level - 2, count);
368 });
369 }
370 };
371
372 let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
373 if count > 0 {
374 child.visit_leaves(|leaf| {
375 leaf.append_rep_level_run(ctx.rep_level - 1, count);
376 leaf.append_def_level_run(ctx.def_level - 1, count);
377 });
378 }
379 };
380
381 match nulls {
382 Some(nulls) => {
383 let null_offset = range.start;
384 let mut pending_nulls: usize = 0;
385 let mut pending_empties: usize = 0;
386
387 for (idx, w) in offsets.windows(2).enumerate() {
389 let is_valid = nulls.is_valid(idx + null_offset);
390 let start_idx = w[0].as_usize();
391 let end_idx = w[1].as_usize();
392
393 if !is_valid {
394 write_empty_run(child, pending_empties);
395 pending_empties = 0;
396 pending_nulls += 1;
397 } else if start_idx == end_idx {
398 write_null_run(child, pending_nulls);
399 pending_nulls = 0;
400 pending_empties += 1;
401 } else {
402 write_null_run(child, pending_nulls);
403 pending_nulls = 0;
404 write_empty_run(child, pending_empties);
405 pending_empties = 0;
406 write_non_null_slice(child, start_idx, end_idx);
407 }
408 }
409 write_null_run(child, pending_nulls);
410 write_empty_run(child, pending_empties);
411 }
412 None => {
413 let mut pending_empties: usize = 0;
414 for w in offsets.windows(2) {
415 let start_idx = w[0].as_usize();
416 let end_idx = w[1].as_usize();
417 if start_idx == end_idx {
418 pending_empties += 1;
419 } else {
420 write_empty_run(child, pending_empties);
421 pending_empties = 0;
422 write_non_null_slice(child, start_idx, end_idx);
423 }
424 }
425 write_empty_run(child, pending_empties);
426 }
427 }
428 }
429
430 fn write_list_view<O: OffsetSizeTrait>(
432 child: &mut LevelInfoBuilder,
433 ctx: &LevelContext,
434 offsets: &[O],
435 sizes: &[O],
436 nulls: Option<&NullBuffer>,
437 range: Range<usize>,
438 ) {
439 let offsets = &offsets[range.start..range.end];
440 let sizes = &sizes[range.start..range.end];
441
442 let write_non_null_slice =
443 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
444 child.write(start_idx..end_idx);
445 child.visit_leaves(|leaf| {
446 let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
447 let mut rev = rep_levels.iter_mut().rev();
448 let mut remaining = end_idx - start_idx;
449
450 loop {
451 let next = rev.next().unwrap();
452 if *next > ctx.rep_level {
453 continue;
455 }
456
457 remaining -= 1;
458 if remaining == 0 {
459 *next = ctx.rep_level - 1;
460 break;
461 }
462 }
463 })
464 };
465
466 let write_empty_slice = |child: &mut LevelInfoBuilder| {
467 child.visit_leaves(|leaf| {
468 leaf.append_rep_level_run(ctx.rep_level - 1, 1);
469 leaf.append_def_level_run(ctx.def_level - 1, 1);
470 })
471 };
472
473 let write_null_slice = |child: &mut LevelInfoBuilder| {
474 child.visit_leaves(|leaf| {
475 leaf.append_rep_level_run(ctx.rep_level - 1, 1);
476 leaf.append_def_level_run(ctx.def_level - 2, 1);
477 })
478 };
479
480 match nulls {
481 Some(nulls) => {
482 let null_offset = range.start;
483 for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
485 let is_valid = nulls.is_valid(idx + null_offset);
486 let start_idx = offset.as_usize();
487 let size = size.as_usize();
488 let end_idx = start_idx + size;
489 if !is_valid {
490 write_null_slice(child)
491 } else if size == 0 {
492 write_empty_slice(child)
493 } else {
494 write_non_null_slice(child, start_idx, end_idx)
495 }
496 }
497 }
498 None => {
499 for (offset, size) in offsets.iter().zip(sizes.iter()) {
500 let start_idx = offset.as_usize();
501 let size = size.as_usize();
502 let end_idx = start_idx + size;
503 if size == 0 {
504 write_empty_slice(child)
505 } else {
506 write_non_null_slice(child, start_idx, end_idx)
507 }
508 }
509 }
510 }
511 }
512
513 fn write_struct(
515 children: &mut [LevelInfoBuilder],
516 ctx: &LevelContext,
517 nulls: Option<&NullBuffer>,
518 range: Range<usize>,
519 ) {
520 let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
521 let len = range.end - range.start;
522 for child in children {
523 child.visit_leaves(|info| {
524 info.extend_uniform_levels(ctx.def_level - 1, ctx.rep_level, len);
525 })
526 }
527 };
528
529 if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
531 write_null(children, range);
532 return;
533 }
534
535 let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
536 for child in children {
537 child.write(range.clone())
538 }
539 };
540
541 match nulls {
542 Some(validity) => {
543 let mut last_non_null_idx = None;
544 let mut last_null_idx = None;
545
546 for i in range.clone() {
548 match validity.is_valid(i) {
549 true => {
550 if let Some(last_idx) = last_null_idx.take() {
551 write_null(children, last_idx..i)
552 }
553 last_non_null_idx.get_or_insert(i);
554 }
555 false => {
556 if let Some(last_idx) = last_non_null_idx.take() {
557 write_non_null(children, last_idx..i)
558 }
559 last_null_idx.get_or_insert(i);
560 }
561 }
562 }
563
564 if let Some(last_idx) = last_null_idx.take() {
565 write_null(children, last_idx..range.end)
566 }
567
568 if let Some(last_idx) = last_non_null_idx.take() {
569 write_non_null(children, last_idx..range.end)
570 }
571 }
572 None => write_non_null(children, range),
573 }
574 }
575
576 fn write_fixed_size_list(
578 child: &mut LevelInfoBuilder,
579 ctx: &LevelContext,
580 fixed_size: usize,
581 nulls: Option<&NullBuffer>,
582 range: Range<usize>,
583 ) {
584 if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
586 let count = range.end - range.start;
587 child.visit_leaves(|leaf| {
588 leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
589 });
590 return;
591 }
592
593 let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
594 let values_start = start_idx * fixed_size;
595 let values_end = end_idx * fixed_size;
596 child.write(values_start..values_end);
597
598 child.visit_leaves(|leaf| {
599 let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
600
601 let row_indices = (0..fixed_size)
602 .rev()
603 .cycle()
604 .take(values_end - values_start);
605
606 rep_levels
608 .iter_mut()
609 .rev()
610 .filter(|&&mut r| r == ctx.rep_level)
612 .zip(row_indices)
613 .for_each(|(r, idx)| {
614 if idx == 0 {
615 *r = ctx.rep_level - 1;
616 }
617 });
618 })
619 };
620
621 let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
623 let len = end_idx - start_idx;
624 child.visit_leaves(|leaf| {
625 leaf.append_rep_level_run(ctx.rep_level - 1, len);
626 leaf.append_def_level_run(ctx.def_level - 1, len);
627 })
628 };
629
630 let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
631 if fixed_size > 0 {
632 write_non_null(child, start_idx, end_idx)
633 } else {
634 write_empty(child, start_idx, end_idx)
635 }
636 };
637
638 match nulls {
639 Some(nulls) => {
640 let mut start_idx = None;
641 for idx in range.clone() {
642 if nulls.is_valid(idx) {
643 start_idx.get_or_insert(idx);
645 } else {
646 if let Some(start) = start_idx.take() {
648 write_rows(child, start, idx);
649 }
650 child.visit_leaves(|leaf| {
652 leaf.append_rep_level_run(ctx.rep_level - 1, 1);
653 leaf.append_def_level_run(ctx.def_level - 2, 1);
654 })
655 }
656 }
657 if let Some(start) = start_idx.take() {
659 write_rows(child, start, range.end);
660 }
661 }
662 None => write_rows(child, range.start, range.end),
664 }
665 }
666
667 fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
669 let len = range.end - range.start;
670
671 if let Some(nulls) = &info.logical_nulls {
673 if !matches!(info.def_levels, LevelData::Absent) && nulls.null_count() == nulls.len() {
674 info.extend_uniform_levels(info.max_def_level - 1, info.max_rep_level, len);
675 return;
676 }
677 }
678
679 if matches!(info.def_levels, LevelData::Absent) {
680 info.non_null_indices.extend(range.clone());
681 } else {
682 let max_def_level = info.max_def_level;
683 match &info.logical_nulls {
684 Some(nulls) => {
685 assert!(range.end <= nulls.len());
686 if len >= BULK_FILL_MIN_LEN && nulls.null_count() * 2 >= nulls.len() {
691 let range_nulls = nulls.slice(range.start, len);
692 let valid_in_range = len - range_nulls.null_count();
693 let null_def_level = max_def_level - 1;
694 let buf = info
695 .def_levels
696 .materialize_mut()
697 .expect("definition levels present");
698 let base = buf.len();
699 buf.resize(base + len, null_def_level);
700 for i in range_nulls.valid_indices() {
701 buf[base + i] = max_def_level;
702 }
703 info.non_null_indices.reserve(valid_in_range);
704 info.non_null_indices
705 .extend(range_nulls.valid_indices().map(|i| i + range.start));
706 } else {
707 let bits = nulls.inner();
708 info.def_levels.extend_from_iter(range.clone().map(|i| {
709 let valid = unsafe { bits.value_unchecked(i) };
711 max_def_level - (!valid as i16)
712 }));
713 info.non_null_indices.reserve(len);
714 info.non_null_indices.extend(
715 BitIndexIterator::new(bits.inner(), bits.offset() + range.start, len)
716 .map(|i| i + range.start),
717 );
718 }
719 }
720 None => {
721 info.append_def_level_run(max_def_level, len);
722 info.non_null_indices.reserve(len);
723 info.non_null_indices.extend(range.clone());
724 }
725 }
726 }
727
728 if !matches!(info.rep_levels, LevelData::Absent) {
729 info.append_rep_level_run(info.max_rep_level, len);
730 }
731 }
732
733 fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
735 match self {
736 LevelInfoBuilder::Primitive(info) => visit(info),
737 LevelInfoBuilder::List(c, _, _, _)
738 | LevelInfoBuilder::LargeList(c, _, _, _)
739 | LevelInfoBuilder::FixedSizeList(c, _, _, _)
740 | LevelInfoBuilder::ListView(c, _, _, _, _)
741 | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
742 LevelInfoBuilder::Struct(children, _, _) => {
743 for c in children {
744 c.visit_leaves(visit)
745 }
746 }
747 }
748 }
749
750 fn types_compatible(a: &DataType, b: &DataType) -> bool {
756 if a.equals_datatype(b) {
758 return true;
759 }
760
761 let (a, b) = match (a, b) {
763 (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
764 (va.as_ref(), vb.as_ref())
765 }
766 (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
767 (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
768 _ => (a, b),
769 };
770
771 if a == b {
774 return true;
775 }
776
777 match a {
780 DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
782 DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
783 DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
784
785 DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
787 DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
788 DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
789
790 _ => false,
792 }
793 }
794}
795
796#[derive(Debug, Clone)]
799pub(crate) enum LevelData {
800 Absent,
801 Materialized(Vec<i16>),
802 Uniform { value: i16, count: usize },
803}
804
805impl PartialEq for LevelData {
808 fn eq(&self, other: &Self) -> bool {
809 match (self, other) {
810 (Self::Absent, Self::Absent) => true,
811 (Self::Materialized(a), Self::Materialized(b)) => a == b,
812 (Self::Uniform { value: v, count: n }, Self::Materialized(b))
813 | (Self::Materialized(b), Self::Uniform { value: v, count: n }) => {
814 b.len() == *n && b.iter().all(|x| x == v)
815 }
816 (
817 Self::Uniform {
818 value: v1,
819 count: n1,
820 },
821 Self::Uniform {
822 value: v2,
823 count: n2,
824 },
825 ) => v1 == v2 && n1 == n2,
826 _ => false,
827 }
828 }
829}
830
831impl Eq for LevelData {}
832
833impl LevelData {
834 fn new(present: bool) -> Self {
835 match present {
836 true => Self::Materialized(Vec::new()),
837 false => Self::Absent,
838 }
839 }
840
841 pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
842 match self {
843 Self::Absent => LevelDataRef::Absent,
844 Self::Materialized(values) => LevelDataRef::Materialized(values),
845 Self::Uniform { value, count } => LevelDataRef::Uniform {
846 value: *value,
847 count: *count,
848 },
849 }
850 }
851
852 pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
853 match self {
854 Self::Absent => Self::Absent,
855 Self::Materialized(values) => Self::Materialized(values[offset..offset + len].to_vec()),
856 Self::Uniform { value, .. } => Self::Uniform {
857 value: *value,
858 count: len,
859 },
860 }
861 }
862
863 fn append_run(&mut self, value: i16, count: usize) {
864 if count == 0 {
865 return;
866 }
867
868 match self {
869 Self::Absent => {}
872 Self::Materialized(values) if values.is_empty() => {
875 *self = Self::Uniform { value, count };
876 }
877 Self::Materialized(values) => values.extend(std::iter::repeat_n(value, count)),
879 Self::Uniform {
882 value: uniform_value,
883 count: uniform_count,
884 } if *uniform_value == value => {
885 *uniform_count += count;
886 }
887 Self::Uniform { .. } => {
890 let values = self.materialize_mut().unwrap();
891 values.extend(std::iter::repeat_n(value, count));
892 }
893 }
894 }
895
896 fn extend_from_iter<I>(&mut self, iter: I)
897 where
898 I: IntoIterator<Item = i16>,
899 {
900 if let Some(values) = self.materialize_mut() {
901 values.extend(iter);
902 }
903 }
904
905 fn materialize_mut(&mut self) -> Option<&mut Vec<i16>> {
908 match self {
909 Self::Absent => None,
910 Self::Materialized(values) => Some(values),
911 Self::Uniform { value, count } => {
912 let values = vec![*value; *count];
913 *self = Self::Materialized(values);
914 match self {
915 Self::Materialized(values) => Some(values),
916 _ => unreachable!(),
917 }
918 }
919 }
920 }
921}
922
923#[derive(Debug, Clone)]
924pub(crate) struct ArrayLevels {
925 def_levels: LevelData,
929
930 rep_levels: LevelData,
934
935 non_null_indices: Vec<usize>,
938
939 max_def_level: i16,
941
942 max_rep_level: i16,
944
945 array: ArrayRef,
947
948 logical_nulls: Option<NullBuffer>,
950}
951
952impl PartialEq for ArrayLevels {
953 fn eq(&self, other: &Self) -> bool {
954 self.def_levels == other.def_levels
955 && self.rep_levels == other.rep_levels
956 && self.non_null_indices == other.non_null_indices
957 && self.max_def_level == other.max_def_level
958 && self.max_rep_level == other.max_rep_level
959 && self.array.as_ref() == other.array.as_ref()
960 && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
961 }
962}
963impl Eq for ArrayLevels {}
964
965impl ArrayLevels {
966 fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
967 let max_rep_level = ctx.rep_level;
968 let max_def_level = match is_nullable {
969 true => ctx.def_level + 1,
970 false => ctx.def_level,
971 };
972
973 let logical_nulls = array.logical_nulls();
974
975 Self {
976 def_levels: LevelData::new(max_def_level != 0),
977 rep_levels: LevelData::new(max_rep_level != 0),
978 non_null_indices: vec![],
979 max_def_level,
980 max_rep_level,
981 array,
982 logical_nulls,
983 }
984 }
985
986 pub fn array(&self) -> &ArrayRef {
987 &self.array
988 }
989
990 pub(crate) fn def_level_data(&self) -> &LevelData {
991 &self.def_levels
992 }
993
994 pub(crate) fn rep_level_data(&self) -> &LevelData {
995 &self.rep_levels
996 }
997
998 pub fn non_null_indices(&self) -> &[usize] {
999 &self.non_null_indices
1000 }
1001
1002 pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
1008 let def_levels = self.def_levels.slice(chunk.level_offset, chunk.num_levels);
1009 let rep_levels = self.rep_levels.slice(chunk.level_offset, chunk.num_levels);
1010
1011 let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
1013 let start = nni.first().copied().unwrap_or(0);
1018 let end = nni.last().map_or(0, |&i| i + 1);
1019 let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
1021 let array = self.array.slice(start, end - start);
1023 let logical_nulls = array.logical_nulls();
1024
1025 Self {
1026 def_levels,
1027 rep_levels,
1028 non_null_indices,
1029 max_def_level: self.max_def_level,
1030 max_rep_level: self.max_rep_level,
1031 array,
1032 logical_nulls,
1033 }
1034 }
1035
1036 fn extend_uniform_levels(&mut self, def_val: i16, rep_val: i16, count: usize) {
1038 self.def_levels.append_run(def_val, count);
1039 self.rep_levels.append_run(rep_val, count);
1040 }
1041
1042 fn append_def_level_run(&mut self, value: i16, count: usize) {
1043 self.def_levels.append_run(value, count);
1044 }
1045
1046 fn append_rep_level_run(&mut self, value: i16, count: usize) {
1047 self.rep_levels.append_run(value, count);
1048 }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 use super::*;
1054 use crate::column::chunker::CdcChunk;
1055
1056 use arrow_array::builder::*;
1057 use arrow_array::types::Int32Type;
1058 use arrow_array::*;
1059 use arrow_buffer::{Buffer, ToByteSlice};
1060 use arrow_cast::display::array_value_to_string;
1061 use arrow_data::{ArrayData, ArrayDataBuilder};
1062 use arrow_schema::{Fields, Schema};
1063
1064 #[test]
1065 fn test_calculate_array_levels_twitter_example() {
1066 let leaf_type = Field::new_list_field(DataType::Int32, false);
1070 let inner_type = DataType::List(Arc::new(leaf_type));
1071 let inner_field = Field::new("l2", inner_type.clone(), false);
1072 let outer_type = DataType::List(Arc::new(inner_field));
1073 let outer_field = Field::new("l1", outer_type.clone(), false);
1074
1075 let primitives = Int32Array::from_iter(0..10);
1076
1077 let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
1079 let inner_list = ArrayDataBuilder::new(inner_type)
1080 .len(4)
1081 .add_buffer(offsets)
1082 .add_child_data(primitives.to_data())
1083 .build()
1084 .unwrap();
1085
1086 let offsets = Buffer::from_iter([0_i32, 2, 4]);
1087 let outer_list = ArrayDataBuilder::new(outer_type)
1088 .len(2)
1089 .add_buffer(offsets)
1090 .add_child_data(inner_list)
1091 .build()
1092 .unwrap();
1093 let outer_list = make_array(outer_list);
1094
1095 let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
1096 assert_eq!(levels.len(), 1);
1097
1098 let expected = ArrayLevels {
1099 def_levels: LevelData::Materialized(vec![2; 10]),
1100 rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
1101 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
1102 max_def_level: 2,
1103 max_rep_level: 2,
1104 array: Arc::new(primitives),
1105 logical_nulls: None,
1106 };
1107 assert_eq!(&levels[0], &expected);
1108 }
1109
1110 #[test]
1111 fn test_calculate_one_level_1() {
1112 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1114 let field = Field::new_list_field(DataType::Int32, false);
1115
1116 let levels = calculate_array_levels(&array, &field).unwrap();
1117 assert_eq!(levels.len(), 1);
1118
1119 let expected_levels = ArrayLevels {
1120 def_levels: LevelData::Absent,
1121 rep_levels: LevelData::Absent,
1122 non_null_indices: (0..10).collect(),
1123 max_def_level: 0,
1124 max_rep_level: 0,
1125 array,
1126 logical_nulls: None,
1127 };
1128 assert_eq!(&levels[0], &expected_levels);
1129 }
1130
1131 #[test]
1132 fn test_calculate_one_level_2() {
1133 let array = Arc::new(Int32Array::from_iter([
1135 Some(0),
1136 None,
1137 Some(0),
1138 Some(0),
1139 None,
1140 ])) as ArrayRef;
1141 let field = Field::new_list_field(DataType::Int32, true);
1142
1143 let levels = calculate_array_levels(&array, &field).unwrap();
1144 assert_eq!(levels.len(), 1);
1145
1146 let logical_nulls = array.logical_nulls();
1147 let expected_levels = ArrayLevels {
1148 def_levels: LevelData::Materialized(vec![1, 0, 1, 1, 0]),
1149 rep_levels: LevelData::Absent,
1150 non_null_indices: vec![0, 2, 3],
1151 max_def_level: 1,
1152 max_rep_level: 0,
1153 array,
1154 logical_nulls,
1155 };
1156 assert_eq!(&levels[0], &expected_levels);
1157 }
1158
1159 #[test]
1160 fn test_calculate_array_levels_1() {
1161 let leaf_field = Field::new_list_field(DataType::Int32, false);
1162 let list_type = DataType::List(Arc::new(leaf_field));
1163
1164 let leaf_array = Int32Array::from_iter(0..5);
1168 let offsets = Buffer::from_iter(0_i32..6);
1170 let list = ArrayDataBuilder::new(list_type.clone())
1171 .len(5)
1172 .add_buffer(offsets)
1173 .add_child_data(leaf_array.to_data())
1174 .build()
1175 .unwrap();
1176 let list = make_array(list);
1177
1178 let list_field = Field::new("list", list_type.clone(), false);
1179 let levels = calculate_array_levels(&list, &list_field).unwrap();
1180 assert_eq!(levels.len(), 1);
1181
1182 let expected_levels = ArrayLevels {
1183 def_levels: LevelData::Materialized(vec![1; 5]),
1184 rep_levels: LevelData::Materialized(vec![0; 5]),
1185 non_null_indices: (0..5).collect(),
1186 max_def_level: 1,
1187 max_rep_level: 1,
1188 array: Arc::new(leaf_array),
1189 logical_nulls: None,
1190 };
1191 assert_eq!(&levels[0], &expected_levels);
1192
1193 let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
1202 let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
1203 let list = ArrayDataBuilder::new(list_type.clone())
1204 .len(5)
1205 .add_buffer(offsets)
1206 .add_child_data(leaf_array.to_data())
1207 .null_bit_buffer(Some(Buffer::from([0b00011101])))
1208 .build()
1209 .unwrap();
1210 let list = make_array(list);
1211
1212 let list_field = Field::new("list", list_type, true);
1213 let levels = calculate_array_levels(&list, &list_field).unwrap();
1214 assert_eq!(levels.len(), 1);
1215
1216 let expected_levels = ArrayLevels {
1217 def_levels: LevelData::Materialized(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1218 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1219 non_null_indices: (0..11).collect(),
1220 max_def_level: 2,
1221 max_rep_level: 1,
1222 array: Arc::new(leaf_array),
1223 logical_nulls: None,
1224 };
1225 assert_eq!(&levels[0], &expected_levels);
1226 }
1227
1228 #[test]
1229 fn test_calculate_array_levels_2() {
1230 let leaf = Int32Array::from_iter(0..11);
1244 let leaf_field = Field::new("leaf", DataType::Int32, false);
1245
1246 let list_type = DataType::List(Arc::new(leaf_field));
1247 let list = ArrayData::builder(list_type.clone())
1248 .len(5)
1249 .add_child_data(leaf.to_data())
1250 .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1251 .build()
1252 .unwrap();
1253
1254 let list = make_array(list);
1255 let list_field = Arc::new(Field::new("list", list_type, true));
1256
1257 let struct_array =
1258 StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1259 let array = Arc::new(struct_array) as ArrayRef;
1260
1261 let struct_field = Field::new("struct", array.data_type().clone(), true);
1262
1263 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1264 assert_eq!(levels.len(), 1);
1265
1266 let expected_levels = ArrayLevels {
1267 def_levels: LevelData::Materialized(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1268 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1269 non_null_indices: (4..11).collect(),
1270 max_def_level: 3,
1271 max_rep_level: 1,
1272 array: Arc::new(leaf),
1273 logical_nulls: None,
1274 };
1275
1276 assert_eq!(&levels[0], &expected_levels);
1277
1278 let leaf = Int32Array::from_iter(100..122);
1287 let leaf_field = Field::new("leaf", DataType::Int32, true);
1288
1289 let l1_type = DataType::List(Arc::new(leaf_field));
1290 let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1291 let l1 = ArrayData::builder(l1_type.clone())
1292 .len(11)
1293 .add_child_data(leaf.to_data())
1294 .add_buffer(offsets)
1295 .build()
1296 .unwrap();
1297
1298 let l1_field = Field::new("l1", l1_type, true);
1299 let l2_type = DataType::List(Arc::new(l1_field));
1300 let l2 = ArrayData::builder(l2_type)
1301 .len(5)
1302 .add_child_data(l1)
1303 .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1304 .build()
1305 .unwrap();
1306
1307 let l2 = make_array(l2);
1308 let l2_field = Field::new("l2", l2.data_type().clone(), true);
1309
1310 let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1311 assert_eq!(levels.len(), 1);
1312
1313 let expected_levels = ArrayLevels {
1314 def_levels: LevelData::Materialized(vec![
1315 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1316 ]),
1317 rep_levels: LevelData::Materialized(vec![
1318 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1319 ]),
1320 non_null_indices: (0..22).collect(),
1321 max_def_level: 5,
1322 max_rep_level: 2,
1323 array: Arc::new(leaf),
1324 logical_nulls: None,
1325 };
1326
1327 assert_eq!(&levels[0], &expected_levels);
1328 }
1329
1330 #[test]
1331 fn test_calculate_array_levels_nested_list() {
1332 let leaf_field = Field::new("leaf", DataType::Int32, false);
1333 let list_type = DataType::List(Arc::new(leaf_field));
1334
1335 let leaf = Int32Array::from_iter([0; 4]);
1343 let list = ArrayData::builder(list_type.clone())
1344 .len(4)
1345 .add_buffer(Buffer::from_iter(0_i32..5))
1346 .add_child_data(leaf.to_data())
1347 .build()
1348 .unwrap();
1349 let list = make_array(list);
1350
1351 let list_field = Field::new("list", list_type.clone(), false);
1352 let levels = calculate_array_levels(&list, &list_field).unwrap();
1353 assert_eq!(levels.len(), 1);
1354
1355 let expected_levels = ArrayLevels {
1356 def_levels: LevelData::Materialized(vec![1; 4]),
1357 rep_levels: LevelData::Materialized(vec![0; 4]),
1358 non_null_indices: (0..4).collect(),
1359 max_def_level: 1,
1360 max_rep_level: 1,
1361 array: Arc::new(leaf),
1362 logical_nulls: None,
1363 };
1364 assert_eq!(&levels[0], &expected_levels);
1365
1366 let leaf = Int32Array::from_iter(0..8);
1371 let list = ArrayData::builder(list_type.clone())
1372 .len(4)
1373 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1374 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1375 .add_child_data(leaf.to_data())
1376 .build()
1377 .unwrap();
1378 let list = make_array(list);
1379 let list_field = Arc::new(Field::new("list", list_type, true));
1380
1381 let struct_array = StructArray::from(vec![(list_field, list)]);
1382 let array = Arc::new(struct_array) as ArrayRef;
1383
1384 let struct_field = Field::new("struct", array.data_type().clone(), true);
1385 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1386 assert_eq!(levels.len(), 1);
1387
1388 let expected_levels = ArrayLevels {
1389 def_levels: LevelData::Materialized(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1390 rep_levels: LevelData::Materialized(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1391 non_null_indices: (0..7).collect(),
1392 max_def_level: 3,
1393 max_rep_level: 1,
1394 array: Arc::new(leaf),
1395 logical_nulls: None,
1396 };
1397 assert_eq!(&levels[0], &expected_levels);
1398
1399 let leaf = Int32Array::from_iter(201..216);
1407 let leaf_field = Field::new("leaf", DataType::Int32, false);
1408 let list_1_type = DataType::List(Arc::new(leaf_field));
1409 let list_1 = ArrayData::builder(list_1_type.clone())
1410 .len(7)
1411 .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1412 .add_child_data(leaf.to_data())
1413 .build()
1414 .unwrap();
1415
1416 let list_1_field = Field::new("l1", list_1_type, true);
1417 let list_2_type = DataType::List(Arc::new(list_1_field));
1418 let list_2 = ArrayData::builder(list_2_type.clone())
1419 .len(4)
1420 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1421 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1422 .add_child_data(list_1)
1423 .build()
1424 .unwrap();
1425
1426 let list_2 = make_array(list_2);
1427 let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1428
1429 let struct_array =
1430 StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1431 let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1432
1433 let array = Arc::new(struct_array) as ArrayRef;
1434 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1435 assert_eq!(levels.len(), 1);
1436
1437 let expected_levels = ArrayLevels {
1438 def_levels: LevelData::Materialized(vec![
1439 1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5,
1440 ]),
1441 rep_levels: LevelData::Materialized(vec![
1442 0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2,
1443 ]),
1444 non_null_indices: (0..15).collect(),
1445 max_def_level: 5,
1446 max_rep_level: 2,
1447 array: Arc::new(leaf),
1448 logical_nulls: None,
1449 };
1450 assert_eq!(&levels[0], &expected_levels);
1451 }
1452
1453 #[test]
1454 fn test_calculate_nested_struct_levels() {
1455 let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1465 let leaf = Arc::new(c) as ArrayRef;
1466 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1467 let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1468
1469 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1470 let a = StructArray::from((
1471 (vec![(b_field, Arc::new(b) as ArrayRef)]),
1472 Buffer::from([0b00101111]),
1473 ));
1474
1475 let a_field = Field::new("a", a.data_type().clone(), true);
1476 let a_array = Arc::new(a) as ArrayRef;
1477
1478 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1479 assert_eq!(levels.len(), 1);
1480
1481 let logical_nulls = leaf.logical_nulls();
1482 let expected_levels = ArrayLevels {
1483 def_levels: LevelData::Materialized(vec![3, 2, 3, 1, 0, 3]),
1484 rep_levels: LevelData::Absent,
1485 non_null_indices: vec![0, 2, 5],
1486 max_def_level: 3,
1487 max_rep_level: 0,
1488 array: leaf,
1489 logical_nulls,
1490 };
1491 assert_eq!(&levels[0], &expected_levels);
1492 }
1493
1494 #[test]
1495 fn list_single_column() {
1496 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1499 let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1500 let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1501 let a_list_data = ArrayData::builder(a_list_type.clone())
1502 .len(5)
1503 .add_buffer(a_value_offsets)
1504 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1505 .add_child_data(a_values.to_data())
1506 .build()
1507 .unwrap();
1508
1509 assert_eq!(a_list_data.null_count(), 1);
1510
1511 let a = ListArray::from(a_list_data);
1512
1513 let item_field = Field::new_list_field(a_list_type, true);
1514 let mut builder = levels(&item_field, a);
1515 builder.write(2..4);
1516 let levels = builder.finish();
1517
1518 assert_eq!(levels.len(), 1);
1519
1520 let list_level = &levels[0];
1521
1522 let expected_level = ArrayLevels {
1523 def_levels: LevelData::Materialized(vec![0, 3, 3, 3]),
1524 rep_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
1525 non_null_indices: vec![3, 4, 5],
1526 max_def_level: 3,
1527 max_rep_level: 1,
1528 array: Arc::new(a_values),
1529 logical_nulls: None,
1530 };
1531 assert_eq!(list_level, &expected_level);
1532 }
1533
1534 #[test]
1535 fn mixed_struct_list() {
1536 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1540 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1541 let struct_field_g = Arc::new(Field::new(
1542 "g",
1543 DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1544 false,
1545 ));
1546 let struct_field_e = Arc::new(Field::new(
1547 "e",
1548 DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1549 true,
1550 ));
1551 let schema = Schema::new(vec![
1552 Field::new("a", DataType::Int32, false),
1553 Field::new("b", DataType::Int32, true),
1554 Field::new(
1555 "c",
1556 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1557 true, ),
1559 ]);
1560
1561 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1563 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1564 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1565 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1566
1567 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1568
1569 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1572
1573 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1575 .len(5)
1576 .add_buffer(g_value_offsets)
1577 .add_child_data(g_value.into_data())
1578 .build()
1579 .unwrap();
1580 let g = ListArray::from(g_list_data);
1581
1582 let e = StructArray::from(vec![
1583 (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1584 (struct_field_g, Arc::new(g) as ArrayRef),
1585 ]);
1586
1587 let c = StructArray::from(vec![
1588 (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1589 (struct_field_e, Arc::new(e) as ArrayRef),
1590 ]);
1591
1592 let batch = RecordBatch::try_new(
1594 Arc::new(schema),
1595 vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1596 )
1597 .unwrap();
1598
1599 let mut levels = vec![];
1602 batch
1603 .columns()
1604 .iter()
1605 .zip(batch.schema().fields())
1606 .for_each(|(array, field)| {
1607 let mut array_levels = calculate_array_levels(array, field).unwrap();
1608 levels.append(&mut array_levels);
1609 });
1610 assert_eq!(levels.len(), 5);
1611
1612 let list_level = &levels[0];
1614
1615 let expected_level = ArrayLevels {
1616 def_levels: LevelData::Absent,
1617 rep_levels: LevelData::Absent,
1618 non_null_indices: vec![0, 1, 2, 3, 4],
1619 max_def_level: 0,
1620 max_rep_level: 0,
1621 array: Arc::new(a),
1622 logical_nulls: None,
1623 };
1624 assert_eq!(list_level, &expected_level);
1625
1626 let list_level = levels.get(1).unwrap();
1628
1629 let b_logical_nulls = b.logical_nulls();
1630 let expected_level = ArrayLevels {
1631 def_levels: LevelData::Materialized(vec![1, 0, 0, 1, 1]),
1632 rep_levels: LevelData::Absent,
1633 non_null_indices: vec![0, 3, 4],
1634 max_def_level: 1,
1635 max_rep_level: 0,
1636 array: Arc::new(b),
1637 logical_nulls: b_logical_nulls,
1638 };
1639 assert_eq!(list_level, &expected_level);
1640
1641 let list_level = levels.get(2).unwrap();
1643
1644 let d_logical_nulls = d.logical_nulls();
1645 let expected_level = ArrayLevels {
1646 def_levels: LevelData::Materialized(vec![1, 1, 1, 2, 1]),
1647 rep_levels: LevelData::Absent,
1648 non_null_indices: vec![3],
1649 max_def_level: 2,
1650 max_rep_level: 0,
1651 array: Arc::new(d),
1652 logical_nulls: d_logical_nulls,
1653 };
1654 assert_eq!(list_level, &expected_level);
1655
1656 let list_level = levels.get(3).unwrap();
1658
1659 let f_logical_nulls = f.logical_nulls();
1660 let expected_level = ArrayLevels {
1661 def_levels: LevelData::Materialized(vec![3, 2, 3, 2, 3]),
1662 rep_levels: LevelData::Absent,
1663 non_null_indices: vec![0, 2, 4],
1664 max_def_level: 3,
1665 max_rep_level: 0,
1666 array: Arc::new(f),
1667 logical_nulls: f_logical_nulls,
1668 };
1669 assert_eq!(list_level, &expected_level);
1670 }
1671
1672 #[test]
1673 fn test_null_vs_nonnull_struct() {
1674 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1676 let schema = Schema::new(vec![Field::new(
1677 "some_nested_object",
1678 DataType::Struct(vec![offset_field.clone()].into()),
1679 false,
1680 )]);
1681
1682 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1684
1685 let some_nested_object =
1686 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1687
1688 let batch =
1690 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1691
1692 let struct_null_level =
1693 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1694
1695 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1698 let schema = Schema::new(vec![Field::new(
1699 "some_nested_object",
1700 DataType::Struct(vec![offset_field.clone()].into()),
1701 true,
1702 )]);
1703
1704 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1706
1707 let some_nested_object =
1708 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1709
1710 let batch =
1712 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1713
1714 let struct_non_null_level =
1715 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1716
1717 if struct_non_null_level == struct_null_level {
1719 panic!("Levels should not be equal, to reflect the difference in struct nullness");
1720 }
1721 }
1722
1723 #[test]
1724 fn test_map_array() {
1725 let json_content = r#"
1727 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1728 {"stocks":{"long": "$CCC", "short": null}}
1729 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1730 "#;
1731 let entries_struct_type = DataType::Struct(Fields::from(vec![
1732 Field::new("key", DataType::Utf8, false),
1733 Field::new("value", DataType::Utf8, true),
1734 ]));
1735 let stocks_field = Field::new(
1736 "stocks",
1737 DataType::Map(
1738 Arc::new(Field::new("entries", entries_struct_type, false)),
1739 false,
1740 ),
1741 false,
1743 );
1744 let schema = Arc::new(Schema::new(vec![stocks_field]));
1745 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1746 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1747
1748 let batch = reader.next().unwrap().unwrap();
1749
1750 let mut levels = vec![];
1752 batch
1753 .columns()
1754 .iter()
1755 .zip(batch.schema().fields())
1756 .for_each(|(array, field)| {
1757 let mut array_levels = calculate_array_levels(array, field).unwrap();
1758 levels.append(&mut array_levels);
1759 });
1760 assert_eq!(levels.len(), 2);
1761
1762 let map = batch.column(0).as_map();
1763 let map_keys_logical_nulls = map.keys().logical_nulls();
1764
1765 let list_level = &levels[0];
1767
1768 let expected_level = ArrayLevels {
1769 def_levels: LevelData::Materialized(vec![1; 7]),
1770 rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1771 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1772 max_def_level: 1,
1773 max_rep_level: 1,
1774 array: map.keys().clone(),
1775 logical_nulls: map_keys_logical_nulls,
1776 };
1777 assert_eq!(list_level, &expected_level);
1778
1779 let list_level = levels.get(1).unwrap();
1781 let map_values_logical_nulls = map.values().logical_nulls();
1782
1783 let expected_level = ArrayLevels {
1784 def_levels: LevelData::Materialized(vec![2, 2, 2, 1, 2, 1, 2]),
1785 rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1786 non_null_indices: vec![0, 1, 2, 4, 6],
1787 max_def_level: 2,
1788 max_rep_level: 1,
1789 array: map.values().clone(),
1790 logical_nulls: map_values_logical_nulls,
1791 };
1792 assert_eq!(list_level, &expected_level);
1793 }
1794
1795 #[test]
1796 fn test_list_of_struct() {
1797 let int_field = Field::new("a", DataType::Int32, true);
1799 let fields = Fields::from([Arc::new(int_field)]);
1800 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1801 let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1802
1803 let int_builder = Int32Builder::with_capacity(10);
1804 let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1805 let mut list_builder = ListBuilder::new(struct_builder);
1806
1807 let values = list_builder.values();
1811 values
1812 .field_builder::<Int32Builder>(0)
1813 .unwrap()
1814 .append_value(1);
1815 values.append(true);
1816 list_builder.append(true);
1817
1818 list_builder.append(true);
1820
1821 list_builder.append(false);
1823
1824 let values = list_builder.values();
1826 values
1827 .field_builder::<Int32Builder>(0)
1828 .unwrap()
1829 .append_null();
1830 values.append(false);
1831 values
1832 .field_builder::<Int32Builder>(0)
1833 .unwrap()
1834 .append_null();
1835 values.append(false);
1836 list_builder.append(true);
1837
1838 let values = list_builder.values();
1840 values
1841 .field_builder::<Int32Builder>(0)
1842 .unwrap()
1843 .append_null();
1844 values.append(true);
1845 list_builder.append(true);
1846
1847 let values = list_builder.values();
1849 values
1850 .field_builder::<Int32Builder>(0)
1851 .unwrap()
1852 .append_value(2);
1853 values.append(true);
1854 list_builder.append(true);
1855
1856 let array = Arc::new(list_builder.finish());
1857
1858 let values = array.values().as_struct().column(0).clone();
1859 let values_len = values.len();
1860 assert_eq!(values_len, 5);
1861
1862 let schema = Arc::new(Schema::new(vec![list_field]));
1863
1864 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1865
1866 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1867 let list_level = &levels[0];
1868
1869 let logical_nulls = values.logical_nulls();
1870 let expected_level = ArrayLevels {
1871 def_levels: LevelData::Materialized(vec![4, 1, 0, 2, 2, 3, 4]),
1872 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 0, 0]),
1873 non_null_indices: vec![0, 4],
1874 max_def_level: 4,
1875 max_rep_level: 1,
1876 array: values,
1877 logical_nulls,
1878 };
1879
1880 assert_eq!(list_level, &expected_level);
1881 }
1882
1883 #[test]
1884 fn test_struct_mask_list() {
1885 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1887 Some(vec![Some(1), Some(2)]),
1888 Some(vec![None]),
1889 Some(vec![]),
1890 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1892 None, None,
1894 ]);
1895 let values = inner.values().clone();
1896
1897 assert_eq!(inner.values().len(), 7);
1899
1900 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1901 let array = Arc::new(inner) as ArrayRef;
1902 let nulls = Buffer::from([0b01010111]);
1903 let struct_a = StructArray::from((vec![(field, array)], nulls));
1904
1905 let field = Field::new("struct", struct_a.data_type().clone(), true);
1906 let array = Arc::new(struct_a) as ArrayRef;
1907 let levels = calculate_array_levels(&array, &field).unwrap();
1908
1909 assert_eq!(levels.len(), 1);
1910
1911 let logical_nulls = values.logical_nulls();
1912 let expected_level = ArrayLevels {
1913 def_levels: LevelData::Materialized(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1914 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1915 non_null_indices: vec![0, 1, 5, 6],
1916 max_def_level: 4,
1917 max_rep_level: 1,
1918 array: values,
1919 logical_nulls,
1920 };
1921
1922 assert_eq!(&levels[0], &expected_level);
1923 }
1924
1925 #[test]
1926 fn test_list_mask_struct() {
1927 let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1931 Some(vec![None]), Some(vec![]), Some(vec![Some(3), None]),
1934 Some(vec![Some(4), Some(5), None, Some(6)]), None,
1936 None,
1937 ]);
1938 let a1_values = a1.values().clone();
1939 let a1 = Arc::new(a1) as ArrayRef;
1940
1941 let a2 = Arc::new(Int32Array::from_iter(vec![
1942 Some(1), Some(2), None,
1945 Some(4), Some(5),
1947 None,
1948 ])) as ArrayRef;
1949 let a2_values = a2.clone();
1950
1951 let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1952 let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1953
1954 let nulls = Buffer::from([0b00110111]);
1955 let struct_a = Arc::new(StructArray::from((
1956 vec![(field_a1, a1), (field_a2, a2)],
1957 nulls,
1958 ))) as ArrayRef;
1959
1960 let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1961 let nulls = Buffer::from([0b00111100]);
1962
1963 let list_type = DataType::List(Arc::new(Field::new(
1964 "struct",
1965 struct_a.data_type().clone(),
1966 true,
1967 )));
1968
1969 let data = ArrayDataBuilder::new(list_type.clone())
1970 .len(6)
1971 .null_bit_buffer(Some(nulls))
1972 .add_buffer(offsets)
1973 .add_child_data(struct_a.into_data())
1974 .build()
1975 .unwrap();
1976
1977 let list = make_array(data);
1978 let list_field = Field::new("col", list_type, true);
1979
1980 let expected = vec![
1981 r#""#.to_string(),
1982 r#""#.to_string(),
1983 r#"[]"#.to_string(),
1984 r#"[{list: [3, ], integers: }]"#.to_string(),
1985 r#"[, {list: , integers: 5}]"#.to_string(),
1986 r#"[]"#.to_string(),
1987 ];
1988
1989 let actual: Vec<_> = (0..6)
1990 .map(|x| array_value_to_string(&list, x).unwrap())
1991 .collect();
1992 assert_eq!(actual, expected);
1993
1994 let levels = calculate_array_levels(&list, &list_field).unwrap();
1995
1996 assert_eq!(levels.len(), 2);
1997
1998 let a1_logical_nulls = a1_values.logical_nulls();
1999 let expected_level = ArrayLevels {
2000 def_levels: LevelData::Materialized(vec![0, 0, 1, 6, 5, 2, 3, 1]),
2001 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 2, 0, 1, 0]),
2002 non_null_indices: vec![1],
2003 max_def_level: 6,
2004 max_rep_level: 2,
2005 array: a1_values,
2006 logical_nulls: a1_logical_nulls,
2007 };
2008
2009 assert_eq!(&levels[0], &expected_level);
2010
2011 let a2_logical_nulls = a2_values.logical_nulls();
2012 let expected_level = ArrayLevels {
2013 def_levels: LevelData::Materialized(vec![0, 0, 1, 3, 2, 4, 1]),
2014 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 0, 1, 0]),
2015 non_null_indices: vec![4],
2016 max_def_level: 4,
2017 max_rep_level: 1,
2018 array: a2_values,
2019 logical_nulls: a2_logical_nulls,
2020 };
2021
2022 assert_eq!(&levels[1], &expected_level);
2023 }
2024
2025 #[test]
2026 fn test_fixed_size_list() {
2027 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
2029 builder.values().append_slice(&[1, 2]);
2030 builder.append(true);
2031 builder.values().append_slice(&[3, 4]);
2032 builder.append(false);
2033 builder.values().append_slice(&[5, 6]);
2034 builder.append(false);
2035 builder.values().append_slice(&[7, 8]);
2036 builder.append(true);
2037 builder.values().append_slice(&[9, 10]);
2038 builder.append(false);
2039 let a = builder.finish();
2040 let values = a.values().clone();
2041
2042 let item_field = Field::new_list_field(a.data_type().clone(), true);
2043 let mut builder = levels(&item_field, a);
2044 builder.write(1..4);
2045 let levels = builder.finish();
2046
2047 assert_eq!(levels.len(), 1);
2048
2049 let list_level = &levels[0];
2050
2051 let logical_nulls = values.logical_nulls();
2052 let expected_level = ArrayLevels {
2053 def_levels: LevelData::Materialized(vec![0, 0, 3, 3]),
2054 rep_levels: LevelData::Materialized(vec![0, 0, 0, 1]),
2055 non_null_indices: vec![6, 7],
2056 max_def_level: 3,
2057 max_rep_level: 1,
2058 array: values,
2059 logical_nulls,
2060 };
2061 assert_eq!(list_level, &expected_level);
2062 }
2063
2064 #[test]
2065 fn test_fixed_size_list_of_struct() {
2066 let field_a = Field::new("a", DataType::Int32, true);
2068 let field_b = Field::new("b", DataType::Int64, false);
2069 let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
2070 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
2071 let list_field = Field::new(
2072 "list",
2073 DataType::FixedSizeList(Arc::new(item_field), 2),
2074 true,
2075 );
2076
2077 let builder_a = Int32Builder::with_capacity(10);
2078 let builder_b = Int64Builder::with_capacity(10);
2079 let struct_builder =
2080 StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
2081 let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
2082
2083 let values = list_builder.values();
2092 values
2094 .field_builder::<Int32Builder>(0)
2095 .unwrap()
2096 .append_value(1);
2097 values
2098 .field_builder::<Int64Builder>(1)
2099 .unwrap()
2100 .append_value(2);
2101 values.append(true);
2102 values
2104 .field_builder::<Int32Builder>(0)
2105 .unwrap()
2106 .append_null();
2107 values
2108 .field_builder::<Int64Builder>(1)
2109 .unwrap()
2110 .append_value(0);
2111 values.append(false);
2112 list_builder.append(true);
2113
2114 let values = list_builder.values();
2116 values
2118 .field_builder::<Int32Builder>(0)
2119 .unwrap()
2120 .append_null();
2121 values
2122 .field_builder::<Int64Builder>(1)
2123 .unwrap()
2124 .append_value(0);
2125 values.append(false);
2126 values
2128 .field_builder::<Int32Builder>(0)
2129 .unwrap()
2130 .append_null();
2131 values
2132 .field_builder::<Int64Builder>(1)
2133 .unwrap()
2134 .append_value(0);
2135 values.append(false);
2136 list_builder.append(false);
2137
2138 let values = list_builder.values();
2140 values
2142 .field_builder::<Int32Builder>(0)
2143 .unwrap()
2144 .append_null();
2145 values
2146 .field_builder::<Int64Builder>(1)
2147 .unwrap()
2148 .append_value(0);
2149 values.append(false);
2150 values
2152 .field_builder::<Int32Builder>(0)
2153 .unwrap()
2154 .append_null();
2155 values
2156 .field_builder::<Int64Builder>(1)
2157 .unwrap()
2158 .append_value(0);
2159 values.append(false);
2160 list_builder.append(true);
2161
2162 let values = list_builder.values();
2164 values
2166 .field_builder::<Int32Builder>(0)
2167 .unwrap()
2168 .append_null();
2169 values
2170 .field_builder::<Int64Builder>(1)
2171 .unwrap()
2172 .append_value(3);
2173 values.append(true);
2174 values
2176 .field_builder::<Int32Builder>(0)
2177 .unwrap()
2178 .append_value(2);
2179 values
2180 .field_builder::<Int64Builder>(1)
2181 .unwrap()
2182 .append_value(4);
2183 values.append(true);
2184 list_builder.append(true);
2185
2186 let array = Arc::new(list_builder.finish());
2187
2188 assert_eq!(array.values().len(), 8);
2189 assert_eq!(array.len(), 4);
2190
2191 let struct_values = array.values().as_struct();
2192 let values_a = struct_values.column(0).clone();
2193 let values_b = struct_values.column(1).clone();
2194
2195 let schema = Arc::new(Schema::new(vec![list_field]));
2196 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2197
2198 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2199 let a_levels = &levels[0];
2200 let b_levels = &levels[1];
2201
2202 let values_a_logical_nulls = values_a.logical_nulls();
2204 let expected_a = ArrayLevels {
2205 def_levels: LevelData::Materialized(vec![4, 2, 0, 2, 2, 3, 4]),
2206 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2207 non_null_indices: vec![0, 7],
2208 max_def_level: 4,
2209 max_rep_level: 1,
2210 array: values_a,
2211 logical_nulls: values_a_logical_nulls,
2212 };
2213 let values_b_logical_nulls = values_b.logical_nulls();
2215 let expected_b = ArrayLevels {
2216 def_levels: LevelData::Materialized(vec![3, 2, 0, 2, 2, 3, 3]),
2217 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2218 non_null_indices: vec![0, 6, 7],
2219 max_def_level: 3,
2220 max_rep_level: 1,
2221 array: values_b,
2222 logical_nulls: values_b_logical_nulls,
2223 };
2224
2225 assert_eq!(a_levels, &expected_a);
2226 assert_eq!(b_levels, &expected_b);
2227 }
2228
2229 #[test]
2230 fn test_fixed_size_list_empty() {
2231 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2232 builder.append(true);
2233 builder.append(false);
2234 builder.append(true);
2235 let array = builder.finish();
2236 let values = array.values().clone();
2237
2238 let item_field = Field::new_list_field(array.data_type().clone(), true);
2239 let mut builder = levels(&item_field, array);
2240 builder.write(0..3);
2241 let levels = builder.finish();
2242
2243 assert_eq!(levels.len(), 1);
2244
2245 let list_level = &levels[0];
2246
2247 let logical_nulls = values.logical_nulls();
2248 let expected_level = ArrayLevels {
2249 def_levels: LevelData::Materialized(vec![1, 0, 1]),
2250 rep_levels: LevelData::Materialized(vec![0, 0, 0]),
2251 non_null_indices: vec![],
2252 max_def_level: 3,
2253 max_rep_level: 1,
2254 array: values,
2255 logical_nulls,
2256 };
2257 assert_eq!(list_level, &expected_level);
2258 }
2259
2260 #[test]
2261 fn test_fixed_size_list_of_var_lists() {
2262 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2264 builder.values().append_value([Some(1), None, Some(3)]);
2265 builder.values().append_null();
2266 builder.append(true);
2267 builder.values().append_value([Some(4)]);
2268 builder.values().append_value([]);
2269 builder.append(true);
2270 builder.values().append_value([Some(5), Some(6)]);
2271 builder.values().append_value([None, None]);
2272 builder.append(true);
2273 builder.values().append_null();
2274 builder.values().append_null();
2275 builder.append(false);
2276 let a = builder.finish();
2277 let values = a.values().as_list::<i32>().values().clone();
2278
2279 let item_field = Field::new_list_field(a.data_type().clone(), true);
2280 let mut builder = levels(&item_field, a);
2281 builder.write(0..4);
2282 let levels = builder.finish();
2283
2284 let logical_nulls = values.logical_nulls();
2285 let expected_level = ArrayLevels {
2286 def_levels: LevelData::Materialized(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2287 rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2288 non_null_indices: vec![0, 2, 3, 4, 5],
2289 max_def_level: 5,
2290 max_rep_level: 2,
2291 array: values,
2292 logical_nulls,
2293 };
2294
2295 assert_eq!(levels[0], expected_level);
2296 }
2297
2298 #[test]
2299 fn test_null_dictionary_values() {
2300 let values = Int32Array::new(
2301 vec![1, 2, 3, 4].into(),
2302 Some(NullBuffer::from(vec![true, false, true, true])),
2303 );
2304 let keys = Int32Array::new(
2305 vec![1, 54, 2, 0].into(),
2306 Some(NullBuffer::from(vec![true, false, true, true])),
2307 );
2308 let dict = DictionaryArray::new(keys, Arc::new(values));
2310
2311 let item_field = Field::new_list_field(dict.data_type().clone(), true);
2312
2313 let mut builder = levels(&item_field, dict.clone());
2314 builder.write(0..4);
2315 let levels = builder.finish();
2316
2317 let logical_nulls = dict.logical_nulls();
2318 let expected_level = ArrayLevels {
2319 def_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
2320 rep_levels: LevelData::Absent,
2321 non_null_indices: vec![2, 3],
2322 max_def_level: 1,
2323 max_rep_level: 0,
2324 array: Arc::new(dict),
2325 logical_nulls,
2326 };
2327 assert_eq!(levels[0], expected_level);
2328 }
2329
2330 #[test]
2331 fn mismatched_types() {
2332 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2333 let field = Field::new_list_field(DataType::Float64, false);
2334
2335 let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2336 .unwrap_err()
2337 .to_string();
2338
2339 assert_eq!(
2340 err,
2341 "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2342 );
2343 }
2344
2345 fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2346 let v = Arc::new(array) as ArrayRef;
2347 LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2348 }
2349
2350 #[test]
2351 fn test_slice_for_chunk_flat() {
2352 let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2357 let logical_nulls = array.logical_nulls();
2358 let levels = ArrayLevels {
2359 def_levels: LevelData::Absent,
2360 rep_levels: LevelData::Absent,
2361 non_null_indices: vec![0, 1, 2, 3, 4, 5],
2362 max_def_level: 0,
2363 max_rep_level: 0,
2364 array,
2365 logical_nulls,
2366 };
2367 let sliced = levels.slice_for_chunk(&CdcChunk {
2368 level_offset: 0,
2369 num_levels: 0,
2370 value_offset: 2,
2371 num_values: 3,
2372 });
2373 assert!(matches!(sliced.def_levels, LevelData::Absent));
2374 assert!(matches!(sliced.rep_levels, LevelData::Absent));
2375 assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2376 assert_eq!(sliced.array.len(), 3);
2377
2378 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2384 Some(1),
2385 None,
2386 Some(3),
2387 None,
2388 Some(5),
2389 Some(6),
2390 ]));
2391 let logical_nulls = array.logical_nulls();
2392 let levels = ArrayLevels {
2393 def_levels: LevelData::Materialized(vec![1, 0, 1, 0, 1, 1]),
2394 rep_levels: LevelData::Absent,
2395 non_null_indices: vec![0, 2, 4, 5],
2396 max_def_level: 1,
2397 max_rep_level: 0,
2398 array,
2399 logical_nulls,
2400 };
2401 let sliced = levels.slice_for_chunk(&CdcChunk {
2402 level_offset: 1,
2403 num_levels: 3,
2404 value_offset: 1,
2405 num_values: 1,
2406 });
2407 assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 1, 0]));
2408 assert!(matches!(sliced.rep_levels, LevelData::Absent));
2409 assert_eq!(sliced.non_null_indices, vec![0]); assert_eq!(sliced.array.len(), 1);
2411 }
2412
2413 #[test]
2414 fn test_slice_for_chunk_nested_with_nulls() {
2415 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2434 Some(1), None, None, Some(2), None, None, None, None, Some(4), Some(5), ]));
2445 let logical_nulls = array.logical_nulls();
2446 let levels = ArrayLevels {
2447 def_levels: LevelData::Materialized(vec![3, 0, 3, 2, 0, 3, 3]),
2448 rep_levels: LevelData::Materialized(vec![0, 0, 0, 1, 0, 0, 1]),
2449 non_null_indices: vec![0, 3, 8, 9],
2450 max_def_level: 3,
2451 max_rep_level: 1,
2452 array,
2453 logical_nulls,
2454 };
2455
2456 let chunk0 = levels.slice_for_chunk(&CdcChunk {
2458 level_offset: 0,
2459 num_levels: 2,
2460 value_offset: 0,
2461 num_values: 1,
2462 });
2463 assert_eq!(chunk0.non_null_indices, vec![0]);
2464 assert_eq!(chunk0.array.len(), 1);
2465
2466 let chunk1 = levels.slice_for_chunk(&CdcChunk {
2468 level_offset: 2,
2469 num_levels: 3,
2470 value_offset: 1,
2471 num_values: 1,
2472 });
2473 assert_eq!(chunk1.non_null_indices, vec![0]);
2474 assert_eq!(chunk1.array.len(), 1);
2475
2476 let chunk2 = levels.slice_for_chunk(&CdcChunk {
2478 level_offset: 5,
2479 num_levels: 2,
2480 value_offset: 2,
2481 num_values: 2,
2482 });
2483 assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2484 assert_eq!(chunk2.array.len(), 2);
2485 }
2486
2487 #[test]
2488 fn test_slice_for_chunk_all_null() {
2489 let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2491 let logical_nulls = array.logical_nulls();
2492 let levels = ArrayLevels {
2493 def_levels: LevelData::Materialized(vec![1, 0, 0, 1]),
2494 rep_levels: LevelData::Absent,
2495 non_null_indices: vec![0, 3],
2496 max_def_level: 1,
2497 max_rep_level: 0,
2498 array,
2499 logical_nulls,
2500 };
2501 let sliced = levels.slice_for_chunk(&CdcChunk {
2503 level_offset: 1,
2504 num_levels: 2,
2505 value_offset: 1,
2506 num_values: 0,
2507 });
2508 assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 0]));
2509 assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2510 assert_eq!(sliced.array.len(), 0);
2511 }
2512
2513 #[test]
2514 fn test_all_null_list() {
2515 let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2521 let list = ListArray::new_null(item_field, 4);
2522 let values = list.values().clone();
2523 let field = Field::new("list", list.data_type().clone(), true);
2524 let array = Arc::new(list) as ArrayRef;
2525
2526 let levels = calculate_array_levels(&array, &field).unwrap();
2527 assert_eq!(levels.len(), 1);
2528
2529 let logical_nulls = values.logical_nulls();
2530 let expected = ArrayLevels {
2531 def_levels: LevelData::Uniform { value: 0, count: 4 },
2532 rep_levels: LevelData::Uniform { value: 0, count: 4 },
2533 non_null_indices: vec![],
2534 max_def_level: 3,
2535 max_rep_level: 1,
2536 array: values,
2537 logical_nulls,
2538 };
2539 assert_eq!(&levels[0], &expected);
2540 }
2541
2542 #[test]
2543 fn test_all_null_fixed_size_list() {
2544 let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2550 let list = FixedSizeListArray::new_null(item_field, 2, 3);
2551 let values = list.values().clone();
2552 let field = Field::new("list", list.data_type().clone(), true);
2553 let array = Arc::new(list) as ArrayRef;
2554
2555 let levels = calculate_array_levels(&array, &field).unwrap();
2556 assert_eq!(levels.len(), 1);
2557
2558 let logical_nulls = values.logical_nulls();
2559 let expected = ArrayLevels {
2560 def_levels: LevelData::Uniform { value: 0, count: 3 },
2561 rep_levels: LevelData::Uniform { value: 0, count: 3 },
2562 non_null_indices: vec![],
2563 max_def_level: 3,
2564 max_rep_level: 1,
2565 array: values,
2566 logical_nulls,
2567 };
2568 assert_eq!(&levels[0], &expected);
2569 }
2570
2571 #[test]
2572 fn test_all_null_struct() {
2573 let c = Int32Array::from(vec![None::<i32>; 4]);
2580 let leaf = Arc::new(c) as ArrayRef;
2581 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2582 let a = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2583 let a_field = Field::new("a", a.data_type().clone(), true);
2584 let a_array = Arc::new(a) as ArrayRef;
2585
2586 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2587 assert_eq!(levels.len(), 1);
2588
2589 let expected = ArrayLevels {
2590 def_levels: LevelData::Uniform { value: 0, count: 4 },
2591 rep_levels: LevelData::Absent,
2592 non_null_indices: vec![],
2593 max_def_level: 2,
2594 max_rep_level: 0,
2595 array: leaf,
2596 logical_nulls: Some(NullBuffer::new_null(4)),
2597 };
2598 assert_eq!(&levels[0], &expected);
2599 }
2600
2601 #[test]
2602 fn test_all_null_nested_struct() {
2603 let c = Int32Array::from(vec![None::<i32>; 3]);
2609 let leaf = Arc::new(c) as ArrayRef;
2610 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2611 let b = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2612 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
2613 let a = StructArray::from((
2614 vec![(b_field, Arc::new(b) as ArrayRef)],
2615 Buffer::from([0b00000000]),
2616 ));
2617 let a_field = Field::new("a", a.data_type().clone(), true);
2618 let a_array = Arc::new(a) as ArrayRef;
2619
2620 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2621 assert_eq!(levels.len(), 1);
2622
2623 let expected = ArrayLevels {
2624 def_levels: LevelData::Uniform { value: 0, count: 3 },
2625 rep_levels: LevelData::Absent,
2626 non_null_indices: vec![],
2627 max_def_level: 3,
2628 max_rep_level: 0,
2629 array: leaf,
2630 logical_nulls: Some(NullBuffer::new_null(3)),
2631 };
2632 assert_eq!(&levels[0], &expected);
2633 }
2634
2635 #[test]
2636 fn test_all_null_struct_multiple_children() {
2637 let c1 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2643 let c2 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2644 let c1_field = Arc::new(Field::new("c1", DataType::Int32, true));
2645 let c2_field = Arc::new(Field::new("c2", DataType::Int32, true));
2646 let a = StructArray::from((
2647 vec![(c1_field, c1.clone()), (c2_field, c2.clone())],
2648 Buffer::from([0b00000000]),
2649 ));
2650 let a_field = Field::new("a", a.data_type().clone(), true);
2651 let a_array = Arc::new(a) as ArrayRef;
2652
2653 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2654 assert_eq!(levels.len(), 2);
2655
2656 for (i, leaf) in [c1, c2].into_iter().enumerate() {
2657 let expected = ArrayLevels {
2658 def_levels: LevelData::Uniform { value: 0, count: 2 },
2659 rep_levels: LevelData::Absent,
2660 non_null_indices: vec![],
2661 max_def_level: 2,
2662 max_rep_level: 0,
2663 array: leaf,
2664 logical_nulls: Some(NullBuffer::new_null(2)),
2665 };
2666 assert_eq!(&levels[i], &expected, "leaf {i} mismatch");
2667 }
2668 }
2669}