1use crate::column::chunker::CdcChunk;
44use crate::errors::{ParquetError, Result};
45use arrow_array::cast::AsArray;
46use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
47use arrow_buffer::bit_iterator::BitIndexIterator;
48use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
49use arrow_schema::{DataType, Field};
50use std::ops::Range;
51use std::sync::Arc;
52
53pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
56 let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
57 builder.write(0..array.len());
58 Ok(builder.finish())
59}
60
61fn is_leaf(data_type: &DataType) -> bool {
64 matches!(
65 data_type,
66 DataType::Null
67 | DataType::Boolean
68 | DataType::Int8
69 | DataType::Int16
70 | DataType::Int32
71 | DataType::Int64
72 | DataType::UInt8
73 | DataType::UInt16
74 | DataType::UInt32
75 | DataType::UInt64
76 | DataType::Float16
77 | DataType::Float32
78 | DataType::Float64
79 | DataType::Utf8
80 | DataType::Utf8View
81 | DataType::LargeUtf8
82 | DataType::Timestamp(_, _)
83 | DataType::Date32
84 | DataType::Date64
85 | DataType::Time32(_)
86 | DataType::Time64(_)
87 | DataType::Duration(_)
88 | DataType::Interval(_)
89 | DataType::Binary
90 | DataType::LargeBinary
91 | DataType::BinaryView
92 | DataType::Decimal32(_, _)
93 | DataType::Decimal64(_, _)
94 | DataType::Decimal128(_, _)
95 | DataType::Decimal256(_, _)
96 | DataType::FixedSizeBinary(_)
97 )
98}
99
100#[derive(Debug, Default, Clone, Copy)]
102struct LevelContext {
103 rep_level: i16,
105 def_level: i16,
107}
108
109#[derive(Debug)]
111enum LevelInfoBuilder {
112 Primitive(ArrayLevels),
114 List(
116 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i32>, Option<NullBuffer>, ),
121 LargeList(
123 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i64>, Option<NullBuffer>, ),
128 FixedSizeList(
130 Box<LevelInfoBuilder>, LevelContext, usize, Option<NullBuffer>, ),
135 ListView(
137 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i32>, ScalarBuffer<i32>, Option<NullBuffer>, ),
143 LargeListView(
145 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i64>, ScalarBuffer<i64>, Option<NullBuffer>, ),
151 Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
153}
154
155impl LevelInfoBuilder {
156 fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
158 if !Self::types_compatible(field.data_type(), array.data_type()) {
159 return Err(arrow_err!(format!(
160 "Incompatible type. Field '{}' has type {}, array has type {}",
161 field.name(),
162 field.data_type(),
163 array.data_type(),
164 )));
165 }
166
167 let is_nullable = field.is_nullable();
168
169 match array.data_type() {
170 d if is_leaf(d) => {
171 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
172 Ok(Self::Primitive(levels))
173 }
174 DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
175 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
176 Ok(Self::Primitive(levels))
177 }
178 DataType::Struct(children) => {
179 let array = array.as_struct();
180 let def_level = match is_nullable {
181 true => parent_ctx.def_level + 1,
182 false => parent_ctx.def_level,
183 };
184
185 let ctx = LevelContext {
186 rep_level: parent_ctx.rep_level,
187 def_level,
188 };
189
190 let children = children
191 .iter()
192 .zip(array.columns())
193 .map(|(f, a)| Self::try_new(f, ctx, a))
194 .collect::<Result<_>>()?;
195
196 Ok(Self::Struct(children, ctx, array.nulls().cloned()))
197 }
198 DataType::List(child)
199 | DataType::LargeList(child)
200 | DataType::Map(child, _)
201 | DataType::FixedSizeList(child, _)
202 | DataType::ListView(child)
203 | DataType::LargeListView(child) => {
204 let def_level = match is_nullable {
205 true => parent_ctx.def_level + 2,
206 false => parent_ctx.def_level + 1,
207 };
208
209 let ctx = LevelContext {
210 rep_level: parent_ctx.rep_level + 1,
211 def_level,
212 };
213
214 Ok(match field.data_type() {
215 DataType::List(_) => {
216 let list = array.as_list();
217 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
218 let offsets = list.offsets().clone();
219 Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
220 }
221 DataType::LargeList(_) => {
222 let list = array.as_list();
223 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
224 let offsets = list.offsets().clone();
225 let nulls = list.nulls().cloned();
226 Self::LargeList(Box::new(child), ctx, offsets, nulls)
227 }
228 DataType::Map(_, _) => {
229 let map = array.as_map();
230 let entries = Arc::new(map.entries().clone()) as ArrayRef;
231 let child = Self::try_new(child.as_ref(), ctx, &entries)?;
232 let offsets = map.offsets().clone();
233 Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
234 }
235 DataType::FixedSizeList(_, size) => {
236 let list = array.as_fixed_size_list();
237 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
238 let nulls = list.nulls().cloned();
239 Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
240 }
241 DataType::ListView(_) => {
242 let list = array.as_list_view();
243 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
244 let offsets = list.offsets().clone();
245 let sizes = list.sizes().clone();
246 let nulls = list.nulls().cloned();
247 Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
248 }
249 DataType::LargeListView(_) => {
250 let list = array.as_list_view();
251 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
252 let offsets = list.offsets().clone();
253 let sizes = list.sizes().clone();
254 let nulls = list.nulls().cloned();
255 Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
256 }
257 _ => unreachable!(),
258 })
259 }
260 d => Err(nyi_err!("Datatype {} is not yet supported", d)),
261 }
262 }
263
264 fn finish(self) -> Vec<ArrayLevels> {
267 match self {
268 LevelInfoBuilder::Primitive(v) => vec![v],
269 LevelInfoBuilder::List(v, _, _, _)
270 | LevelInfoBuilder::LargeList(v, _, _, _)
271 | LevelInfoBuilder::FixedSizeList(v, _, _, _)
272 | LevelInfoBuilder::ListView(v, _, _, _, _)
273 | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
274 LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
275 }
276 }
277
278 fn write(&mut self, range: Range<usize>) {
280 match self {
281 LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
282 LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
283 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
284 }
285 LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
286 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
287 }
288 LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
289 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
290 }
291 LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
292 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
293 }
294 LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
295 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
296 }
297 LevelInfoBuilder::Struct(children, ctx, nulls) => {
298 Self::write_struct(children, ctx, nulls.as_ref(), range)
299 }
300 }
301 }
302
303 fn write_list<O: OffsetSizeTrait>(
307 child: &mut LevelInfoBuilder,
308 ctx: &LevelContext,
309 offsets: &[O],
310 nulls: Option<&NullBuffer>,
311 range: Range<usize>,
312 ) {
313 let offsets = &offsets[range.start..range.end + 1];
314
315 let write_non_null_slice =
316 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
317 child.write(start_idx..end_idx);
318 child.visit_leaves(|leaf| {
319 let rep_levels = leaf.rep_levels.as_mut().unwrap();
320 let mut rev = rep_levels.iter_mut().rev();
321 let mut remaining = end_idx - start_idx;
322
323 loop {
324 let next = rev.next().unwrap();
325 if *next > ctx.rep_level {
326 continue;
328 }
329
330 remaining -= 1;
331 if remaining == 0 {
332 *next = ctx.rep_level - 1;
333 break;
334 }
335 }
336 })
337 };
338
339 let write_empty_slice = |child: &mut LevelInfoBuilder| {
340 child.visit_leaves(|leaf| {
341 let rep_levels = leaf.rep_levels.as_mut().unwrap();
342 rep_levels.push(ctx.rep_level - 1);
343 let def_levels = leaf.def_levels.as_mut().unwrap();
344 def_levels.push(ctx.def_level - 1);
345 })
346 };
347
348 let write_null_slice = |child: &mut LevelInfoBuilder| {
349 child.visit_leaves(|leaf| {
350 let rep_levels = leaf.rep_levels.as_mut().unwrap();
351 rep_levels.push(ctx.rep_level - 1);
352 let def_levels = leaf.def_levels.as_mut().unwrap();
353 def_levels.push(ctx.def_level - 2);
354 })
355 };
356
357 match nulls {
358 Some(nulls) => {
359 let null_offset = range.start;
360 for (idx, w) in offsets.windows(2).enumerate() {
362 let is_valid = nulls.is_valid(idx + null_offset);
363 let start_idx = w[0].as_usize();
364 let end_idx = w[1].as_usize();
365 if !is_valid {
366 write_null_slice(child)
367 } else if start_idx == end_idx {
368 write_empty_slice(child)
369 } else {
370 write_non_null_slice(child, start_idx, end_idx)
371 }
372 }
373 }
374 None => {
375 for w in offsets.windows(2) {
376 let start_idx = w[0].as_usize();
377 let end_idx = w[1].as_usize();
378 if start_idx == end_idx {
379 write_empty_slice(child)
380 } else {
381 write_non_null_slice(child, start_idx, end_idx)
382 }
383 }
384 }
385 }
386 }
387
388 fn write_list_view<O: OffsetSizeTrait>(
390 child: &mut LevelInfoBuilder,
391 ctx: &LevelContext,
392 offsets: &[O],
393 sizes: &[O],
394 nulls: Option<&NullBuffer>,
395 range: Range<usize>,
396 ) {
397 let offsets = &offsets[range.start..range.end];
398 let sizes = &sizes[range.start..range.end];
399
400 let write_non_null_slice =
401 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
402 child.write(start_idx..end_idx);
403 child.visit_leaves(|leaf| {
404 let rep_levels = leaf.rep_levels.as_mut().unwrap();
405 let mut rev = rep_levels.iter_mut().rev();
406 let mut remaining = end_idx - start_idx;
407
408 loop {
409 let next = rev.next().unwrap();
410 if *next > ctx.rep_level {
411 continue;
413 }
414
415 remaining -= 1;
416 if remaining == 0 {
417 *next = ctx.rep_level - 1;
418 break;
419 }
420 }
421 })
422 };
423
424 let write_empty_slice = |child: &mut LevelInfoBuilder| {
425 child.visit_leaves(|leaf| {
426 let rep_levels = leaf.rep_levels.as_mut().unwrap();
427 rep_levels.push(ctx.rep_level - 1);
428 let def_levels = leaf.def_levels.as_mut().unwrap();
429 def_levels.push(ctx.def_level - 1);
430 })
431 };
432
433 let write_null_slice = |child: &mut LevelInfoBuilder| {
434 child.visit_leaves(|leaf| {
435 let rep_levels = leaf.rep_levels.as_mut().unwrap();
436 rep_levels.push(ctx.rep_level - 1);
437 let def_levels = leaf.def_levels.as_mut().unwrap();
438 def_levels.push(ctx.def_level - 2);
439 })
440 };
441
442 match nulls {
443 Some(nulls) => {
444 let null_offset = range.start;
445 for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
447 let is_valid = nulls.is_valid(idx + null_offset);
448 let start_idx = offset.as_usize();
449 let size = size.as_usize();
450 let end_idx = start_idx + size;
451 if !is_valid {
452 write_null_slice(child)
453 } else if size == 0 {
454 write_empty_slice(child)
455 } else {
456 write_non_null_slice(child, start_idx, end_idx)
457 }
458 }
459 }
460 None => {
461 for (offset, size) in offsets.iter().zip(sizes.iter()) {
462 let start_idx = offset.as_usize();
463 let size = size.as_usize();
464 let end_idx = start_idx + size;
465 if size == 0 {
466 write_empty_slice(child)
467 } else {
468 write_non_null_slice(child, start_idx, end_idx)
469 }
470 }
471 }
472 }
473 }
474
475 fn write_struct(
477 children: &mut [LevelInfoBuilder],
478 ctx: &LevelContext,
479 nulls: Option<&NullBuffer>,
480 range: Range<usize>,
481 ) {
482 let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
483 for child in children {
484 child.visit_leaves(|info| {
485 let len = range.end - range.start;
486
487 let def_levels = info.def_levels.as_mut().unwrap();
488 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
489
490 if let Some(rep_levels) = info.rep_levels.as_mut() {
491 rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
492 }
493 })
494 }
495 };
496
497 let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
498 for child in children {
499 child.write(range.clone())
500 }
501 };
502
503 match nulls {
504 Some(validity) => {
505 let mut last_non_null_idx = None;
506 let mut last_null_idx = None;
507
508 for i in range.clone() {
510 match validity.is_valid(i) {
511 true => {
512 if let Some(last_idx) = last_null_idx.take() {
513 write_null(children, last_idx..i)
514 }
515 last_non_null_idx.get_or_insert(i);
516 }
517 false => {
518 if let Some(last_idx) = last_non_null_idx.take() {
519 write_non_null(children, last_idx..i)
520 }
521 last_null_idx.get_or_insert(i);
522 }
523 }
524 }
525
526 if let Some(last_idx) = last_null_idx.take() {
527 write_null(children, last_idx..range.end)
528 }
529
530 if let Some(last_idx) = last_non_null_idx.take() {
531 write_non_null(children, last_idx..range.end)
532 }
533 }
534 None => write_non_null(children, range),
535 }
536 }
537
538 fn write_fixed_size_list(
540 child: &mut LevelInfoBuilder,
541 ctx: &LevelContext,
542 fixed_size: usize,
543 nulls: Option<&NullBuffer>,
544 range: Range<usize>,
545 ) {
546 let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
547 let values_start = start_idx * fixed_size;
548 let values_end = end_idx * fixed_size;
549 child.write(values_start..values_end);
550
551 child.visit_leaves(|leaf| {
552 let rep_levels = leaf.rep_levels.as_mut().unwrap();
553
554 let row_indices = (0..fixed_size)
555 .rev()
556 .cycle()
557 .take(values_end - values_start);
558
559 rep_levels
561 .iter_mut()
562 .rev()
563 .filter(|&&mut r| r == ctx.rep_level)
565 .zip(row_indices)
566 .for_each(|(r, idx)| {
567 if idx == 0 {
568 *r = ctx.rep_level - 1;
569 }
570 });
571 })
572 };
573
574 let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
576 let len = end_idx - start_idx;
577 child.visit_leaves(|leaf| {
578 let rep_levels = leaf.rep_levels.as_mut().unwrap();
579 rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
580 let def_levels = leaf.def_levels.as_mut().unwrap();
581 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
582 })
583 };
584
585 let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
586 if fixed_size > 0 {
587 write_non_null(child, start_idx, end_idx)
588 } else {
589 write_empty(child, start_idx, end_idx)
590 }
591 };
592
593 match nulls {
594 Some(nulls) => {
595 let mut start_idx = None;
596 for idx in range.clone() {
597 if nulls.is_valid(idx) {
598 start_idx.get_or_insert(idx);
600 } else {
601 if let Some(start) = start_idx.take() {
603 write_rows(child, start, idx);
604 }
605 child.visit_leaves(|leaf| {
607 let rep_levels = leaf.rep_levels.as_mut().unwrap();
608 rep_levels.push(ctx.rep_level - 1);
609 let def_levels = leaf.def_levels.as_mut().unwrap();
610 def_levels.push(ctx.def_level - 2);
611 })
612 }
613 }
614 if let Some(start) = start_idx.take() {
616 write_rows(child, start, range.end);
617 }
618 }
619 None => write_rows(child, range.start, range.end),
621 }
622 }
623
624 fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
626 let len = range.end - range.start;
627
628 match &mut info.def_levels {
629 Some(def_levels) => {
630 def_levels.reserve(len);
631 info.non_null_indices.reserve(len);
632
633 match &info.logical_nulls {
634 Some(nulls) => {
635 assert!(range.end <= nulls.len());
636 let nulls = nulls.inner();
637 def_levels.extend(range.clone().map(|i| {
638 let valid = unsafe { nulls.value_unchecked(i) };
640 info.max_def_level - (!valid as i16)
641 }));
642 info.non_null_indices.extend(
643 BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
644 .map(|i| i + range.start),
645 );
646 }
647 None => {
648 let iter = std::iter::repeat_n(info.max_def_level, len);
649 def_levels.extend(iter);
650 info.non_null_indices.extend(range);
651 }
652 }
653 }
654 None => info.non_null_indices.extend(range),
655 }
656
657 if let Some(rep_levels) = &mut info.rep_levels {
658 rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
659 }
660 }
661
662 fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
664 match self {
665 LevelInfoBuilder::Primitive(info) => visit(info),
666 LevelInfoBuilder::List(c, _, _, _)
667 | LevelInfoBuilder::LargeList(c, _, _, _)
668 | LevelInfoBuilder::FixedSizeList(c, _, _, _)
669 | LevelInfoBuilder::ListView(c, _, _, _, _)
670 | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
671 LevelInfoBuilder::Struct(children, _, _) => {
672 for c in children {
673 c.visit_leaves(visit)
674 }
675 }
676 }
677 }
678
679 fn types_compatible(a: &DataType, b: &DataType) -> bool {
685 if a.equals_datatype(b) {
687 return true;
688 }
689
690 let (a, b) = match (a, b) {
692 (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
693 (va.as_ref(), vb.as_ref())
694 }
695 (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
696 (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
697 _ => (a, b),
698 };
699
700 if a == b {
703 return true;
704 }
705
706 match a {
709 DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
711 DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
712 DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
713
714 DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
716 DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
717 DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
718
719 _ => false,
721 }
722 }
723}
724
725#[derive(Debug, Clone)]
728pub(crate) struct ArrayLevels {
729 def_levels: Option<Vec<i16>>,
733
734 rep_levels: Option<Vec<i16>>,
738
739 non_null_indices: Vec<usize>,
742
743 max_def_level: i16,
745
746 max_rep_level: i16,
748
749 array: ArrayRef,
751
752 logical_nulls: Option<NullBuffer>,
754}
755
756impl PartialEq for ArrayLevels {
757 fn eq(&self, other: &Self) -> bool {
758 self.def_levels == other.def_levels
759 && self.rep_levels == other.rep_levels
760 && self.non_null_indices == other.non_null_indices
761 && self.max_def_level == other.max_def_level
762 && self.max_rep_level == other.max_rep_level
763 && self.array.as_ref() == other.array.as_ref()
764 && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
765 }
766}
767impl Eq for ArrayLevels {}
768
769impl ArrayLevels {
770 fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
771 let max_rep_level = ctx.rep_level;
772 let max_def_level = match is_nullable {
773 true => ctx.def_level + 1,
774 false => ctx.def_level,
775 };
776
777 let logical_nulls = array.logical_nulls();
778
779 Self {
780 def_levels: (max_def_level != 0).then(Vec::new),
781 rep_levels: (max_rep_level != 0).then(Vec::new),
782 non_null_indices: vec![],
783 max_def_level,
784 max_rep_level,
785 array,
786 logical_nulls,
787 }
788 }
789
790 pub fn array(&self) -> &ArrayRef {
791 &self.array
792 }
793
794 pub fn def_levels(&self) -> Option<&[i16]> {
795 self.def_levels.as_deref()
796 }
797
798 pub fn rep_levels(&self) -> Option<&[i16]> {
799 self.rep_levels.as_deref()
800 }
801
802 pub fn non_null_indices(&self) -> &[usize] {
803 &self.non_null_indices
804 }
805
806 pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
812 let def_levels = self.def_levels.as_ref().map(|levels| {
813 levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
814 });
815 let rep_levels = self.rep_levels.as_ref().map(|levels| {
816 levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
817 });
818
819 let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
821 let start = nni.first().copied().unwrap_or(0);
826 let end = nni.last().map_or(0, |&i| i + 1);
827 let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
829 let array = self.array.slice(start, end - start);
831 let logical_nulls = array.logical_nulls();
832
833 Self {
834 def_levels,
835 rep_levels,
836 non_null_indices,
837 max_def_level: self.max_def_level,
838 max_rep_level: self.max_rep_level,
839 array,
840 logical_nulls,
841 }
842 }
843}
844
845#[cfg(test)]
846mod tests {
847 use super::*;
848 use crate::column::chunker::CdcChunk;
849
850 use arrow_array::builder::*;
851 use arrow_array::types::Int32Type;
852 use arrow_array::*;
853 use arrow_buffer::{Buffer, ToByteSlice};
854 use arrow_cast::display::array_value_to_string;
855 use arrow_data::{ArrayData, ArrayDataBuilder};
856 use arrow_schema::{Fields, Schema};
857
858 #[test]
859 fn test_calculate_array_levels_twitter_example() {
860 let leaf_type = Field::new_list_field(DataType::Int32, false);
864 let inner_type = DataType::List(Arc::new(leaf_type));
865 let inner_field = Field::new("l2", inner_type.clone(), false);
866 let outer_type = DataType::List(Arc::new(inner_field));
867 let outer_field = Field::new("l1", outer_type.clone(), false);
868
869 let primitives = Int32Array::from_iter(0..10);
870
871 let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
873 let inner_list = ArrayDataBuilder::new(inner_type)
874 .len(4)
875 .add_buffer(offsets)
876 .add_child_data(primitives.to_data())
877 .build()
878 .unwrap();
879
880 let offsets = Buffer::from_iter([0_i32, 2, 4]);
881 let outer_list = ArrayDataBuilder::new(outer_type)
882 .len(2)
883 .add_buffer(offsets)
884 .add_child_data(inner_list)
885 .build()
886 .unwrap();
887 let outer_list = make_array(outer_list);
888
889 let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
890 assert_eq!(levels.len(), 1);
891
892 let expected = ArrayLevels {
893 def_levels: Some(vec![2; 10]),
894 rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
895 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
896 max_def_level: 2,
897 max_rep_level: 2,
898 array: Arc::new(primitives),
899 logical_nulls: None,
900 };
901 assert_eq!(&levels[0], &expected);
902 }
903
904 #[test]
905 fn test_calculate_one_level_1() {
906 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
908 let field = Field::new_list_field(DataType::Int32, false);
909
910 let levels = calculate_array_levels(&array, &field).unwrap();
911 assert_eq!(levels.len(), 1);
912
913 let expected_levels = ArrayLevels {
914 def_levels: None,
915 rep_levels: None,
916 non_null_indices: (0..10).collect(),
917 max_def_level: 0,
918 max_rep_level: 0,
919 array,
920 logical_nulls: None,
921 };
922 assert_eq!(&levels[0], &expected_levels);
923 }
924
925 #[test]
926 fn test_calculate_one_level_2() {
927 let array = Arc::new(Int32Array::from_iter([
929 Some(0),
930 None,
931 Some(0),
932 Some(0),
933 None,
934 ])) as ArrayRef;
935 let field = Field::new_list_field(DataType::Int32, true);
936
937 let levels = calculate_array_levels(&array, &field).unwrap();
938 assert_eq!(levels.len(), 1);
939
940 let logical_nulls = array.logical_nulls();
941 let expected_levels = ArrayLevels {
942 def_levels: Some(vec![1, 0, 1, 1, 0]),
943 rep_levels: None,
944 non_null_indices: vec![0, 2, 3],
945 max_def_level: 1,
946 max_rep_level: 0,
947 array,
948 logical_nulls,
949 };
950 assert_eq!(&levels[0], &expected_levels);
951 }
952
953 #[test]
954 fn test_calculate_array_levels_1() {
955 let leaf_field = Field::new_list_field(DataType::Int32, false);
956 let list_type = DataType::List(Arc::new(leaf_field));
957
958 let leaf_array = Int32Array::from_iter(0..5);
962 let offsets = Buffer::from_iter(0_i32..6);
964 let list = ArrayDataBuilder::new(list_type.clone())
965 .len(5)
966 .add_buffer(offsets)
967 .add_child_data(leaf_array.to_data())
968 .build()
969 .unwrap();
970 let list = make_array(list);
971
972 let list_field = Field::new("list", list_type.clone(), false);
973 let levels = calculate_array_levels(&list, &list_field).unwrap();
974 assert_eq!(levels.len(), 1);
975
976 let expected_levels = ArrayLevels {
977 def_levels: Some(vec![1; 5]),
978 rep_levels: Some(vec![0; 5]),
979 non_null_indices: (0..5).collect(),
980 max_def_level: 1,
981 max_rep_level: 1,
982 array: Arc::new(leaf_array),
983 logical_nulls: None,
984 };
985 assert_eq!(&levels[0], &expected_levels);
986
987 let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
996 let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
997 let list = ArrayDataBuilder::new(list_type.clone())
998 .len(5)
999 .add_buffer(offsets)
1000 .add_child_data(leaf_array.to_data())
1001 .null_bit_buffer(Some(Buffer::from([0b00011101])))
1002 .build()
1003 .unwrap();
1004 let list = make_array(list);
1005
1006 let list_field = Field::new("list", list_type, true);
1007 let levels = calculate_array_levels(&list, &list_field).unwrap();
1008 assert_eq!(levels.len(), 1);
1009
1010 let expected_levels = ArrayLevels {
1011 def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1012 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1013 non_null_indices: (0..11).collect(),
1014 max_def_level: 2,
1015 max_rep_level: 1,
1016 array: Arc::new(leaf_array),
1017 logical_nulls: None,
1018 };
1019 assert_eq!(&levels[0], &expected_levels);
1020 }
1021
1022 #[test]
1023 fn test_calculate_array_levels_2() {
1024 let leaf = Int32Array::from_iter(0..11);
1038 let leaf_field = Field::new("leaf", DataType::Int32, false);
1039
1040 let list_type = DataType::List(Arc::new(leaf_field));
1041 let list = ArrayData::builder(list_type.clone())
1042 .len(5)
1043 .add_child_data(leaf.to_data())
1044 .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1045 .build()
1046 .unwrap();
1047
1048 let list = make_array(list);
1049 let list_field = Arc::new(Field::new("list", list_type, true));
1050
1051 let struct_array =
1052 StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1053 let array = Arc::new(struct_array) as ArrayRef;
1054
1055 let struct_field = Field::new("struct", array.data_type().clone(), true);
1056
1057 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1058 assert_eq!(levels.len(), 1);
1059
1060 let expected_levels = ArrayLevels {
1061 def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1062 rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1063 non_null_indices: (4..11).collect(),
1064 max_def_level: 3,
1065 max_rep_level: 1,
1066 array: Arc::new(leaf),
1067 logical_nulls: None,
1068 };
1069
1070 assert_eq!(&levels[0], &expected_levels);
1071
1072 let leaf = Int32Array::from_iter(100..122);
1081 let leaf_field = Field::new("leaf", DataType::Int32, true);
1082
1083 let l1_type = DataType::List(Arc::new(leaf_field));
1084 let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1085 let l1 = ArrayData::builder(l1_type.clone())
1086 .len(11)
1087 .add_child_data(leaf.to_data())
1088 .add_buffer(offsets)
1089 .build()
1090 .unwrap();
1091
1092 let l1_field = Field::new("l1", l1_type, true);
1093 let l2_type = DataType::List(Arc::new(l1_field));
1094 let l2 = ArrayData::builder(l2_type)
1095 .len(5)
1096 .add_child_data(l1)
1097 .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1098 .build()
1099 .unwrap();
1100
1101 let l2 = make_array(l2);
1102 let l2_field = Field::new("l2", l2.data_type().clone(), true);
1103
1104 let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1105 assert_eq!(levels.len(), 1);
1106
1107 let expected_levels = ArrayLevels {
1108 def_levels: Some(vec![
1109 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1110 ]),
1111 rep_levels: Some(vec![
1112 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1113 ]),
1114 non_null_indices: (0..22).collect(),
1115 max_def_level: 5,
1116 max_rep_level: 2,
1117 array: Arc::new(leaf),
1118 logical_nulls: None,
1119 };
1120
1121 assert_eq!(&levels[0], &expected_levels);
1122 }
1123
1124 #[test]
1125 fn test_calculate_array_levels_nested_list() {
1126 let leaf_field = Field::new("leaf", DataType::Int32, false);
1127 let list_type = DataType::List(Arc::new(leaf_field));
1128
1129 let leaf = Int32Array::from_iter([0; 4]);
1137 let list = ArrayData::builder(list_type.clone())
1138 .len(4)
1139 .add_buffer(Buffer::from_iter(0_i32..5))
1140 .add_child_data(leaf.to_data())
1141 .build()
1142 .unwrap();
1143 let list = make_array(list);
1144
1145 let list_field = Field::new("list", list_type.clone(), false);
1146 let levels = calculate_array_levels(&list, &list_field).unwrap();
1147 assert_eq!(levels.len(), 1);
1148
1149 let expected_levels = ArrayLevels {
1150 def_levels: Some(vec![1; 4]),
1151 rep_levels: Some(vec![0; 4]),
1152 non_null_indices: (0..4).collect(),
1153 max_def_level: 1,
1154 max_rep_level: 1,
1155 array: Arc::new(leaf),
1156 logical_nulls: None,
1157 };
1158 assert_eq!(&levels[0], &expected_levels);
1159
1160 let leaf = Int32Array::from_iter(0..8);
1165 let list = ArrayData::builder(list_type.clone())
1166 .len(4)
1167 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1168 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1169 .add_child_data(leaf.to_data())
1170 .build()
1171 .unwrap();
1172 let list = make_array(list);
1173 let list_field = Arc::new(Field::new("list", list_type, true));
1174
1175 let struct_array = StructArray::from(vec![(list_field, list)]);
1176 let array = Arc::new(struct_array) as ArrayRef;
1177
1178 let struct_field = Field::new("struct", array.data_type().clone(), true);
1179 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1180 assert_eq!(levels.len(), 1);
1181
1182 let expected_levels = ArrayLevels {
1183 def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1184 rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1185 non_null_indices: (0..7).collect(),
1186 max_def_level: 3,
1187 max_rep_level: 1,
1188 array: Arc::new(leaf),
1189 logical_nulls: None,
1190 };
1191 assert_eq!(&levels[0], &expected_levels);
1192
1193 let leaf = Int32Array::from_iter(201..216);
1201 let leaf_field = Field::new("leaf", DataType::Int32, false);
1202 let list_1_type = DataType::List(Arc::new(leaf_field));
1203 let list_1 = ArrayData::builder(list_1_type.clone())
1204 .len(7)
1205 .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1206 .add_child_data(leaf.to_data())
1207 .build()
1208 .unwrap();
1209
1210 let list_1_field = Field::new("l1", list_1_type, true);
1211 let list_2_type = DataType::List(Arc::new(list_1_field));
1212 let list_2 = ArrayData::builder(list_2_type.clone())
1213 .len(4)
1214 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1215 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1216 .add_child_data(list_1)
1217 .build()
1218 .unwrap();
1219
1220 let list_2 = make_array(list_2);
1221 let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1222
1223 let struct_array =
1224 StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1225 let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1226
1227 let array = Arc::new(struct_array) as ArrayRef;
1228 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1229 assert_eq!(levels.len(), 1);
1230
1231 let expected_levels = ArrayLevels {
1232 def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1233 rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1234 non_null_indices: (0..15).collect(),
1235 max_def_level: 5,
1236 max_rep_level: 2,
1237 array: Arc::new(leaf),
1238 logical_nulls: None,
1239 };
1240 assert_eq!(&levels[0], &expected_levels);
1241 }
1242
1243 #[test]
1244 fn test_calculate_nested_struct_levels() {
1245 let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1255 let leaf = Arc::new(c) as ArrayRef;
1256 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1257 let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1258
1259 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1260 let a = StructArray::from((
1261 (vec![(b_field, Arc::new(b) as ArrayRef)]),
1262 Buffer::from([0b00101111]),
1263 ));
1264
1265 let a_field = Field::new("a", a.data_type().clone(), true);
1266 let a_array = Arc::new(a) as ArrayRef;
1267
1268 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1269 assert_eq!(levels.len(), 1);
1270
1271 let logical_nulls = leaf.logical_nulls();
1272 let expected_levels = ArrayLevels {
1273 def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1274 rep_levels: None,
1275 non_null_indices: vec![0, 2, 5],
1276 max_def_level: 3,
1277 max_rep_level: 0,
1278 array: leaf,
1279 logical_nulls,
1280 };
1281 assert_eq!(&levels[0], &expected_levels);
1282 }
1283
1284 #[test]
1285 fn list_single_column() {
1286 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1289 let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1290 let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1291 let a_list_data = ArrayData::builder(a_list_type.clone())
1292 .len(5)
1293 .add_buffer(a_value_offsets)
1294 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1295 .add_child_data(a_values.to_data())
1296 .build()
1297 .unwrap();
1298
1299 assert_eq!(a_list_data.null_count(), 1);
1300
1301 let a = ListArray::from(a_list_data);
1302
1303 let item_field = Field::new_list_field(a_list_type, true);
1304 let mut builder = levels(&item_field, a);
1305 builder.write(2..4);
1306 let levels = builder.finish();
1307
1308 assert_eq!(levels.len(), 1);
1309
1310 let list_level = &levels[0];
1311
1312 let expected_level = ArrayLevels {
1313 def_levels: Some(vec![0, 3, 3, 3]),
1314 rep_levels: Some(vec![0, 0, 1, 1]),
1315 non_null_indices: vec![3, 4, 5],
1316 max_def_level: 3,
1317 max_rep_level: 1,
1318 array: Arc::new(a_values),
1319 logical_nulls: None,
1320 };
1321 assert_eq!(list_level, &expected_level);
1322 }
1323
1324 #[test]
1325 fn mixed_struct_list() {
1326 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1330 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1331 let struct_field_g = Arc::new(Field::new(
1332 "g",
1333 DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1334 false,
1335 ));
1336 let struct_field_e = Arc::new(Field::new(
1337 "e",
1338 DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1339 true,
1340 ));
1341 let schema = Schema::new(vec![
1342 Field::new("a", DataType::Int32, false),
1343 Field::new("b", DataType::Int32, true),
1344 Field::new(
1345 "c",
1346 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1347 true, ),
1349 ]);
1350
1351 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1353 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1354 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1355 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1356
1357 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1358
1359 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1362
1363 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1365 .len(5)
1366 .add_buffer(g_value_offsets)
1367 .add_child_data(g_value.into_data())
1368 .build()
1369 .unwrap();
1370 let g = ListArray::from(g_list_data);
1371
1372 let e = StructArray::from(vec![
1373 (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1374 (struct_field_g, Arc::new(g) as ArrayRef),
1375 ]);
1376
1377 let c = StructArray::from(vec![
1378 (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1379 (struct_field_e, Arc::new(e) as ArrayRef),
1380 ]);
1381
1382 let batch = RecordBatch::try_new(
1384 Arc::new(schema),
1385 vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1386 )
1387 .unwrap();
1388
1389 let mut levels = vec![];
1392 batch
1393 .columns()
1394 .iter()
1395 .zip(batch.schema().fields())
1396 .for_each(|(array, field)| {
1397 let mut array_levels = calculate_array_levels(array, field).unwrap();
1398 levels.append(&mut array_levels);
1399 });
1400 assert_eq!(levels.len(), 5);
1401
1402 let list_level = &levels[0];
1404
1405 let expected_level = ArrayLevels {
1406 def_levels: None,
1407 rep_levels: None,
1408 non_null_indices: vec![0, 1, 2, 3, 4],
1409 max_def_level: 0,
1410 max_rep_level: 0,
1411 array: Arc::new(a),
1412 logical_nulls: None,
1413 };
1414 assert_eq!(list_level, &expected_level);
1415
1416 let list_level = levels.get(1).unwrap();
1418
1419 let b_logical_nulls = b.logical_nulls();
1420 let expected_level = ArrayLevels {
1421 def_levels: Some(vec![1, 0, 0, 1, 1]),
1422 rep_levels: None,
1423 non_null_indices: vec![0, 3, 4],
1424 max_def_level: 1,
1425 max_rep_level: 0,
1426 array: Arc::new(b),
1427 logical_nulls: b_logical_nulls,
1428 };
1429 assert_eq!(list_level, &expected_level);
1430
1431 let list_level = levels.get(2).unwrap();
1433
1434 let d_logical_nulls = d.logical_nulls();
1435 let expected_level = ArrayLevels {
1436 def_levels: Some(vec![1, 1, 1, 2, 1]),
1437 rep_levels: None,
1438 non_null_indices: vec![3],
1439 max_def_level: 2,
1440 max_rep_level: 0,
1441 array: Arc::new(d),
1442 logical_nulls: d_logical_nulls,
1443 };
1444 assert_eq!(list_level, &expected_level);
1445
1446 let list_level = levels.get(3).unwrap();
1448
1449 let f_logical_nulls = f.logical_nulls();
1450 let expected_level = ArrayLevels {
1451 def_levels: Some(vec![3, 2, 3, 2, 3]),
1452 rep_levels: None,
1453 non_null_indices: vec![0, 2, 4],
1454 max_def_level: 3,
1455 max_rep_level: 0,
1456 array: Arc::new(f),
1457 logical_nulls: f_logical_nulls,
1458 };
1459 assert_eq!(list_level, &expected_level);
1460 }
1461
1462 #[test]
1463 fn test_null_vs_nonnull_struct() {
1464 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1466 let schema = Schema::new(vec![Field::new(
1467 "some_nested_object",
1468 DataType::Struct(vec![offset_field.clone()].into()),
1469 false,
1470 )]);
1471
1472 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1474
1475 let some_nested_object =
1476 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1477
1478 let batch =
1480 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1481
1482 let struct_null_level =
1483 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1484
1485 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1488 let schema = Schema::new(vec![Field::new(
1489 "some_nested_object",
1490 DataType::Struct(vec![offset_field.clone()].into()),
1491 true,
1492 )]);
1493
1494 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1496
1497 let some_nested_object =
1498 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1499
1500 let batch =
1502 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1503
1504 let struct_non_null_level =
1505 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1506
1507 if struct_non_null_level == struct_null_level {
1509 panic!("Levels should not be equal, to reflect the difference in struct nullness");
1510 }
1511 }
1512
1513 #[test]
1514 fn test_map_array() {
1515 let json_content = r#"
1517 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1518 {"stocks":{"long": "$CCC", "short": null}}
1519 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1520 "#;
1521 let entries_struct_type = DataType::Struct(Fields::from(vec![
1522 Field::new("key", DataType::Utf8, false),
1523 Field::new("value", DataType::Utf8, true),
1524 ]));
1525 let stocks_field = Field::new(
1526 "stocks",
1527 DataType::Map(
1528 Arc::new(Field::new("entries", entries_struct_type, false)),
1529 false,
1530 ),
1531 false,
1533 );
1534 let schema = Arc::new(Schema::new(vec![stocks_field]));
1535 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1536 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1537
1538 let batch = reader.next().unwrap().unwrap();
1539
1540 let mut levels = vec![];
1542 batch
1543 .columns()
1544 .iter()
1545 .zip(batch.schema().fields())
1546 .for_each(|(array, field)| {
1547 let mut array_levels = calculate_array_levels(array, field).unwrap();
1548 levels.append(&mut array_levels);
1549 });
1550 assert_eq!(levels.len(), 2);
1551
1552 let map = batch.column(0).as_map();
1553 let map_keys_logical_nulls = map.keys().logical_nulls();
1554
1555 let list_level = &levels[0];
1557
1558 let expected_level = ArrayLevels {
1559 def_levels: Some(vec![1; 7]),
1560 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1561 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1562 max_def_level: 1,
1563 max_rep_level: 1,
1564 array: map.keys().clone(),
1565 logical_nulls: map_keys_logical_nulls,
1566 };
1567 assert_eq!(list_level, &expected_level);
1568
1569 let list_level = levels.get(1).unwrap();
1571 let map_values_logical_nulls = map.values().logical_nulls();
1572
1573 let expected_level = ArrayLevels {
1574 def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1575 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1576 non_null_indices: vec![0, 1, 2, 4, 6],
1577 max_def_level: 2,
1578 max_rep_level: 1,
1579 array: map.values().clone(),
1580 logical_nulls: map_values_logical_nulls,
1581 };
1582 assert_eq!(list_level, &expected_level);
1583 }
1584
1585 #[test]
1586 fn test_list_of_struct() {
1587 let int_field = Field::new("a", DataType::Int32, true);
1589 let fields = Fields::from([Arc::new(int_field)]);
1590 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1591 let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1592
1593 let int_builder = Int32Builder::with_capacity(10);
1594 let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1595 let mut list_builder = ListBuilder::new(struct_builder);
1596
1597 let values = list_builder.values();
1601 values
1602 .field_builder::<Int32Builder>(0)
1603 .unwrap()
1604 .append_value(1);
1605 values.append(true);
1606 list_builder.append(true);
1607
1608 list_builder.append(true);
1610
1611 list_builder.append(false);
1613
1614 let values = list_builder.values();
1616 values
1617 .field_builder::<Int32Builder>(0)
1618 .unwrap()
1619 .append_null();
1620 values.append(false);
1621 values
1622 .field_builder::<Int32Builder>(0)
1623 .unwrap()
1624 .append_null();
1625 values.append(false);
1626 list_builder.append(true);
1627
1628 let values = list_builder.values();
1630 values
1631 .field_builder::<Int32Builder>(0)
1632 .unwrap()
1633 .append_null();
1634 values.append(true);
1635 list_builder.append(true);
1636
1637 let values = list_builder.values();
1639 values
1640 .field_builder::<Int32Builder>(0)
1641 .unwrap()
1642 .append_value(2);
1643 values.append(true);
1644 list_builder.append(true);
1645
1646 let array = Arc::new(list_builder.finish());
1647
1648 let values = array.values().as_struct().column(0).clone();
1649 let values_len = values.len();
1650 assert_eq!(values_len, 5);
1651
1652 let schema = Arc::new(Schema::new(vec![list_field]));
1653
1654 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1655
1656 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1657 let list_level = &levels[0];
1658
1659 let logical_nulls = values.logical_nulls();
1660 let expected_level = ArrayLevels {
1661 def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1662 rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1663 non_null_indices: vec![0, 4],
1664 max_def_level: 4,
1665 max_rep_level: 1,
1666 array: values,
1667 logical_nulls,
1668 };
1669
1670 assert_eq!(list_level, &expected_level);
1671 }
1672
1673 #[test]
1674 fn test_struct_mask_list() {
1675 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1677 Some(vec![Some(1), Some(2)]),
1678 Some(vec![None]),
1679 Some(vec![]),
1680 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1682 None, None,
1684 ]);
1685 let values = inner.values().clone();
1686
1687 assert_eq!(inner.values().len(), 7);
1689
1690 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1691 let array = Arc::new(inner) as ArrayRef;
1692 let nulls = Buffer::from([0b01010111]);
1693 let struct_a = StructArray::from((vec![(field, array)], nulls));
1694
1695 let field = Field::new("struct", struct_a.data_type().clone(), true);
1696 let array = Arc::new(struct_a) as ArrayRef;
1697 let levels = calculate_array_levels(&array, &field).unwrap();
1698
1699 assert_eq!(levels.len(), 1);
1700
1701 let logical_nulls = values.logical_nulls();
1702 let expected_level = ArrayLevels {
1703 def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1704 rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1705 non_null_indices: vec![0, 1, 5, 6],
1706 max_def_level: 4,
1707 max_rep_level: 1,
1708 array: values,
1709 logical_nulls,
1710 };
1711
1712 assert_eq!(&levels[0], &expected_level);
1713 }
1714
1715 #[test]
1716 fn test_list_mask_struct() {
1717 let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1721 Some(vec![None]), Some(vec![]), Some(vec![Some(3), None]),
1724 Some(vec![Some(4), Some(5), None, Some(6)]), None,
1726 None,
1727 ]);
1728 let a1_values = a1.values().clone();
1729 let a1 = Arc::new(a1) as ArrayRef;
1730
1731 let a2 = Arc::new(Int32Array::from_iter(vec![
1732 Some(1), Some(2), None,
1735 Some(4), Some(5),
1737 None,
1738 ])) as ArrayRef;
1739 let a2_values = a2.clone();
1740
1741 let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1742 let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1743
1744 let nulls = Buffer::from([0b00110111]);
1745 let struct_a = Arc::new(StructArray::from((
1746 vec![(field_a1, a1), (field_a2, a2)],
1747 nulls,
1748 ))) as ArrayRef;
1749
1750 let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1751 let nulls = Buffer::from([0b00111100]);
1752
1753 let list_type = DataType::List(Arc::new(Field::new(
1754 "struct",
1755 struct_a.data_type().clone(),
1756 true,
1757 )));
1758
1759 let data = ArrayDataBuilder::new(list_type.clone())
1760 .len(6)
1761 .null_bit_buffer(Some(nulls))
1762 .add_buffer(offsets)
1763 .add_child_data(struct_a.into_data())
1764 .build()
1765 .unwrap();
1766
1767 let list = make_array(data);
1768 let list_field = Field::new("col", list_type, true);
1769
1770 let expected = vec![
1771 r#""#.to_string(),
1772 r#""#.to_string(),
1773 r#"[]"#.to_string(),
1774 r#"[{list: [3, ], integers: }]"#.to_string(),
1775 r#"[, {list: , integers: 5}]"#.to_string(),
1776 r#"[]"#.to_string(),
1777 ];
1778
1779 let actual: Vec<_> = (0..6)
1780 .map(|x| array_value_to_string(&list, x).unwrap())
1781 .collect();
1782 assert_eq!(actual, expected);
1783
1784 let levels = calculate_array_levels(&list, &list_field).unwrap();
1785
1786 assert_eq!(levels.len(), 2);
1787
1788 let a1_logical_nulls = a1_values.logical_nulls();
1789 let expected_level = ArrayLevels {
1790 def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1791 rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1792 non_null_indices: vec![1],
1793 max_def_level: 6,
1794 max_rep_level: 2,
1795 array: a1_values,
1796 logical_nulls: a1_logical_nulls,
1797 };
1798
1799 assert_eq!(&levels[0], &expected_level);
1800
1801 let a2_logical_nulls = a2_values.logical_nulls();
1802 let expected_level = ArrayLevels {
1803 def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1804 rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1805 non_null_indices: vec![4],
1806 max_def_level: 4,
1807 max_rep_level: 1,
1808 array: a2_values,
1809 logical_nulls: a2_logical_nulls,
1810 };
1811
1812 assert_eq!(&levels[1], &expected_level);
1813 }
1814
1815 #[test]
1816 fn test_fixed_size_list() {
1817 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1819 builder.values().append_slice(&[1, 2]);
1820 builder.append(true);
1821 builder.values().append_slice(&[3, 4]);
1822 builder.append(false);
1823 builder.values().append_slice(&[5, 6]);
1824 builder.append(false);
1825 builder.values().append_slice(&[7, 8]);
1826 builder.append(true);
1827 builder.values().append_slice(&[9, 10]);
1828 builder.append(false);
1829 let a = builder.finish();
1830 let values = a.values().clone();
1831
1832 let item_field = Field::new_list_field(a.data_type().clone(), true);
1833 let mut builder = levels(&item_field, a);
1834 builder.write(1..4);
1835 let levels = builder.finish();
1836
1837 assert_eq!(levels.len(), 1);
1838
1839 let list_level = &levels[0];
1840
1841 let logical_nulls = values.logical_nulls();
1842 let expected_level = ArrayLevels {
1843 def_levels: Some(vec![0, 0, 3, 3]),
1844 rep_levels: Some(vec![0, 0, 0, 1]),
1845 non_null_indices: vec![6, 7],
1846 max_def_level: 3,
1847 max_rep_level: 1,
1848 array: values,
1849 logical_nulls,
1850 };
1851 assert_eq!(list_level, &expected_level);
1852 }
1853
1854 #[test]
1855 fn test_fixed_size_list_of_struct() {
1856 let field_a = Field::new("a", DataType::Int32, true);
1858 let field_b = Field::new("b", DataType::Int64, false);
1859 let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1860 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1861 let list_field = Field::new(
1862 "list",
1863 DataType::FixedSizeList(Arc::new(item_field), 2),
1864 true,
1865 );
1866
1867 let builder_a = Int32Builder::with_capacity(10);
1868 let builder_b = Int64Builder::with_capacity(10);
1869 let struct_builder =
1870 StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1871 let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1872
1873 let values = list_builder.values();
1882 values
1884 .field_builder::<Int32Builder>(0)
1885 .unwrap()
1886 .append_value(1);
1887 values
1888 .field_builder::<Int64Builder>(1)
1889 .unwrap()
1890 .append_value(2);
1891 values.append(true);
1892 values
1894 .field_builder::<Int32Builder>(0)
1895 .unwrap()
1896 .append_null();
1897 values
1898 .field_builder::<Int64Builder>(1)
1899 .unwrap()
1900 .append_value(0);
1901 values.append(false);
1902 list_builder.append(true);
1903
1904 let values = list_builder.values();
1906 values
1908 .field_builder::<Int32Builder>(0)
1909 .unwrap()
1910 .append_null();
1911 values
1912 .field_builder::<Int64Builder>(1)
1913 .unwrap()
1914 .append_value(0);
1915 values.append(false);
1916 values
1918 .field_builder::<Int32Builder>(0)
1919 .unwrap()
1920 .append_null();
1921 values
1922 .field_builder::<Int64Builder>(1)
1923 .unwrap()
1924 .append_value(0);
1925 values.append(false);
1926 list_builder.append(false);
1927
1928 let values = list_builder.values();
1930 values
1932 .field_builder::<Int32Builder>(0)
1933 .unwrap()
1934 .append_null();
1935 values
1936 .field_builder::<Int64Builder>(1)
1937 .unwrap()
1938 .append_value(0);
1939 values.append(false);
1940 values
1942 .field_builder::<Int32Builder>(0)
1943 .unwrap()
1944 .append_null();
1945 values
1946 .field_builder::<Int64Builder>(1)
1947 .unwrap()
1948 .append_value(0);
1949 values.append(false);
1950 list_builder.append(true);
1951
1952 let values = list_builder.values();
1954 values
1956 .field_builder::<Int32Builder>(0)
1957 .unwrap()
1958 .append_null();
1959 values
1960 .field_builder::<Int64Builder>(1)
1961 .unwrap()
1962 .append_value(3);
1963 values.append(true);
1964 values
1966 .field_builder::<Int32Builder>(0)
1967 .unwrap()
1968 .append_value(2);
1969 values
1970 .field_builder::<Int64Builder>(1)
1971 .unwrap()
1972 .append_value(4);
1973 values.append(true);
1974 list_builder.append(true);
1975
1976 let array = Arc::new(list_builder.finish());
1977
1978 assert_eq!(array.values().len(), 8);
1979 assert_eq!(array.len(), 4);
1980
1981 let struct_values = array.values().as_struct();
1982 let values_a = struct_values.column(0).clone();
1983 let values_b = struct_values.column(1).clone();
1984
1985 let schema = Arc::new(Schema::new(vec![list_field]));
1986 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1987
1988 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1989 let a_levels = &levels[0];
1990 let b_levels = &levels[1];
1991
1992 let values_a_logical_nulls = values_a.logical_nulls();
1994 let expected_a = ArrayLevels {
1995 def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1996 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1997 non_null_indices: vec![0, 7],
1998 max_def_level: 4,
1999 max_rep_level: 1,
2000 array: values_a,
2001 logical_nulls: values_a_logical_nulls,
2002 };
2003 let values_b_logical_nulls = values_b.logical_nulls();
2005 let expected_b = ArrayLevels {
2006 def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
2007 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2008 non_null_indices: vec![0, 6, 7],
2009 max_def_level: 3,
2010 max_rep_level: 1,
2011 array: values_b,
2012 logical_nulls: values_b_logical_nulls,
2013 };
2014
2015 assert_eq!(a_levels, &expected_a);
2016 assert_eq!(b_levels, &expected_b);
2017 }
2018
2019 #[test]
2020 fn test_fixed_size_list_empty() {
2021 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2022 builder.append(true);
2023 builder.append(false);
2024 builder.append(true);
2025 let array = builder.finish();
2026 let values = array.values().clone();
2027
2028 let item_field = Field::new_list_field(array.data_type().clone(), true);
2029 let mut builder = levels(&item_field, array);
2030 builder.write(0..3);
2031 let levels = builder.finish();
2032
2033 assert_eq!(levels.len(), 1);
2034
2035 let list_level = &levels[0];
2036
2037 let logical_nulls = values.logical_nulls();
2038 let expected_level = ArrayLevels {
2039 def_levels: Some(vec![1, 0, 1]),
2040 rep_levels: Some(vec![0, 0, 0]),
2041 non_null_indices: vec![],
2042 max_def_level: 3,
2043 max_rep_level: 1,
2044 array: values,
2045 logical_nulls,
2046 };
2047 assert_eq!(list_level, &expected_level);
2048 }
2049
2050 #[test]
2051 fn test_fixed_size_list_of_var_lists() {
2052 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2054 builder.values().append_value([Some(1), None, Some(3)]);
2055 builder.values().append_null();
2056 builder.append(true);
2057 builder.values().append_value([Some(4)]);
2058 builder.values().append_value([]);
2059 builder.append(true);
2060 builder.values().append_value([Some(5), Some(6)]);
2061 builder.values().append_value([None, None]);
2062 builder.append(true);
2063 builder.values().append_null();
2064 builder.values().append_null();
2065 builder.append(false);
2066 let a = builder.finish();
2067 let values = a.values().as_list::<i32>().values().clone();
2068
2069 let item_field = Field::new_list_field(a.data_type().clone(), true);
2070 let mut builder = levels(&item_field, a);
2071 builder.write(0..4);
2072 let levels = builder.finish();
2073
2074 let logical_nulls = values.logical_nulls();
2075 let expected_level = ArrayLevels {
2076 def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2077 rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2078 non_null_indices: vec![0, 2, 3, 4, 5],
2079 max_def_level: 5,
2080 max_rep_level: 2,
2081 array: values,
2082 logical_nulls,
2083 };
2084
2085 assert_eq!(levels[0], expected_level);
2086 }
2087
2088 #[test]
2089 fn test_null_dictionary_values() {
2090 let values = Int32Array::new(
2091 vec![1, 2, 3, 4].into(),
2092 Some(NullBuffer::from(vec![true, false, true, true])),
2093 );
2094 let keys = Int32Array::new(
2095 vec![1, 54, 2, 0].into(),
2096 Some(NullBuffer::from(vec![true, false, true, true])),
2097 );
2098 let dict = DictionaryArray::new(keys, Arc::new(values));
2100
2101 let item_field = Field::new_list_field(dict.data_type().clone(), true);
2102
2103 let mut builder = levels(&item_field, dict.clone());
2104 builder.write(0..4);
2105 let levels = builder.finish();
2106
2107 let logical_nulls = dict.logical_nulls();
2108 let expected_level = ArrayLevels {
2109 def_levels: Some(vec![0, 0, 1, 1]),
2110 rep_levels: None,
2111 non_null_indices: vec![2, 3],
2112 max_def_level: 1,
2113 max_rep_level: 0,
2114 array: Arc::new(dict),
2115 logical_nulls,
2116 };
2117 assert_eq!(levels[0], expected_level);
2118 }
2119
2120 #[test]
2121 fn mismatched_types() {
2122 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2123 let field = Field::new_list_field(DataType::Float64, false);
2124
2125 let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2126 .unwrap_err()
2127 .to_string();
2128
2129 assert_eq!(
2130 err,
2131 "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2132 );
2133 }
2134
2135 fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2136 let v = Arc::new(array) as ArrayRef;
2137 LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2138 }
2139
2140 #[test]
2141 fn test_slice_for_chunk_flat() {
2142 let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2147 let logical_nulls = array.logical_nulls();
2148 let levels = ArrayLevels {
2149 def_levels: None,
2150 rep_levels: None,
2151 non_null_indices: vec![0, 1, 2, 3, 4, 5],
2152 max_def_level: 0,
2153 max_rep_level: 0,
2154 array,
2155 logical_nulls,
2156 };
2157 let sliced = levels.slice_for_chunk(&CdcChunk {
2158 level_offset: 0,
2159 num_levels: 0,
2160 value_offset: 2,
2161 num_values: 3,
2162 });
2163 assert!(sliced.def_levels.is_none());
2164 assert!(sliced.rep_levels.is_none());
2165 assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2166 assert_eq!(sliced.array.len(), 3);
2167
2168 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2174 Some(1),
2175 None,
2176 Some(3),
2177 None,
2178 Some(5),
2179 Some(6),
2180 ]));
2181 let logical_nulls = array.logical_nulls();
2182 let levels = ArrayLevels {
2183 def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
2184 rep_levels: None,
2185 non_null_indices: vec![0, 2, 4, 5],
2186 max_def_level: 1,
2187 max_rep_level: 0,
2188 array,
2189 logical_nulls,
2190 };
2191 let sliced = levels.slice_for_chunk(&CdcChunk {
2192 level_offset: 1,
2193 num_levels: 3,
2194 value_offset: 1,
2195 num_values: 1,
2196 });
2197 assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
2198 assert!(sliced.rep_levels.is_none());
2199 assert_eq!(sliced.non_null_indices, vec![0]); assert_eq!(sliced.array.len(), 1);
2201 }
2202
2203 #[test]
2204 fn test_slice_for_chunk_nested_with_nulls() {
2205 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2224 Some(1), None, None, Some(2), None, None, None, None, Some(4), Some(5), ]));
2235 let logical_nulls = array.logical_nulls();
2236 let levels = ArrayLevels {
2237 def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
2238 rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
2239 non_null_indices: vec![0, 3, 8, 9],
2240 max_def_level: 3,
2241 max_rep_level: 1,
2242 array,
2243 logical_nulls,
2244 };
2245
2246 let chunk0 = levels.slice_for_chunk(&CdcChunk {
2248 level_offset: 0,
2249 num_levels: 2,
2250 value_offset: 0,
2251 num_values: 1,
2252 });
2253 assert_eq!(chunk0.non_null_indices, vec![0]);
2254 assert_eq!(chunk0.array.len(), 1);
2255
2256 let chunk1 = levels.slice_for_chunk(&CdcChunk {
2258 level_offset: 2,
2259 num_levels: 3,
2260 value_offset: 1,
2261 num_values: 1,
2262 });
2263 assert_eq!(chunk1.non_null_indices, vec![0]);
2264 assert_eq!(chunk1.array.len(), 1);
2265
2266 let chunk2 = levels.slice_for_chunk(&CdcChunk {
2268 level_offset: 5,
2269 num_levels: 2,
2270 value_offset: 2,
2271 num_values: 2,
2272 });
2273 assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2274 assert_eq!(chunk2.array.len(), 2);
2275 }
2276
2277 #[test]
2278 fn test_slice_for_chunk_all_null() {
2279 let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2281 let logical_nulls = array.logical_nulls();
2282 let levels = ArrayLevels {
2283 def_levels: Some(vec![1, 0, 0, 1]),
2284 rep_levels: None,
2285 non_null_indices: vec![0, 3],
2286 max_def_level: 1,
2287 max_rep_level: 0,
2288 array,
2289 logical_nulls,
2290 };
2291 let sliced = levels.slice_for_chunk(&CdcChunk {
2293 level_offset: 1,
2294 num_levels: 2,
2295 value_offset: 1,
2296 num_values: 0,
2297 });
2298 assert_eq!(sliced.def_levels, Some(vec![0, 0]));
2299 assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2300 assert_eq!(sliced.array.len(), 0);
2301 }
2302}