1#[cfg(feature = "arrow")]
19use crate::column::writer::LevelDataRef;
20use crate::errors::{ParquetError, Result};
21use crate::file::properties::CdcOptions;
22use crate::schema::types::ColumnDescriptor;
23
24use super::CdcChunk;
25use super::cdc_generated::{GEARHASH_TABLE, NUM_GEARHASH_TABLES};
26
27#[derive(Debug)]
88pub(crate) struct ContentDefinedChunker {
89 max_def_level: i16,
91 max_rep_level: i16,
93 repeated_ancestor_def_level: i16,
95
96 min_chunk_size: i64,
101 max_chunk_size: i64,
109 rolling_hash_mask: u64,
111
112 rolling_hash: u64,
114 has_matched: bool,
116 nth_run: usize,
118 chunk_size: i64,
120}
121
122impl ContentDefinedChunker {
123 pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self> {
124 let rolling_hash_mask = Self::calculate_mask(
125 options.min_chunk_size as i64,
126 options.max_chunk_size as i64,
127 options.norm_level,
128 )?;
129 Ok(Self {
130 max_def_level: desc.max_def_level(),
131 max_rep_level: desc.max_rep_level(),
132 repeated_ancestor_def_level: desc.repeated_ancestor_def_level(),
133 min_chunk_size: options.min_chunk_size as i64,
134 max_chunk_size: options.max_chunk_size as i64,
135 rolling_hash_mask,
136 rolling_hash: 0,
137 has_matched: false,
138 nth_run: 0,
139 chunk_size: 0,
140 })
141 }
142
143 fn calculate_mask(min_chunk_size: i64, max_chunk_size: i64, norm_level: i32) -> Result<u64> {
148 if min_chunk_size < 0 {
149 return Err(ParquetError::General(
150 "min_chunk_size must be non-negative".to_string(),
151 ));
152 }
153 if max_chunk_size <= min_chunk_size {
154 return Err(ParquetError::General(
155 "max_chunk_size must be greater than min_chunk_size".to_string(),
156 ));
157 }
158
159 let avg_chunk_size = (min_chunk_size + max_chunk_size) / 2;
160 let target_size = (avg_chunk_size - min_chunk_size) / NUM_GEARHASH_TABLES as i64;
163
164 let mask_bits = if target_size > 0 {
166 63 - target_size.leading_zeros() as i32
167 } else {
168 0
169 };
170
171 let effective_bits = mask_bits - norm_level;
172
173 if !(1..=63).contains(&effective_bits) {
174 return Err(ParquetError::General(format!(
175 "The number of bits in the CDC mask must be between 1 and 63, got {effective_bits}"
176 )));
177 }
178
179 Ok(u64::MAX << (64 - effective_bits))
181 }
182
183 #[inline]
190 fn roll(&mut self, bytes: &[u8]) {
191 self.chunk_size += bytes.len() as i64;
192 if self.chunk_size < self.min_chunk_size {
193 return;
194 }
195 for &b in bytes {
196 self.rolling_hash = self
197 .rolling_hash
198 .wrapping_shl(1)
199 .wrapping_add(GEARHASH_TABLE[self.nth_run][b as usize]);
200 self.has_matched =
201 self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
202 }
203 }
204
205 #[inline(always)]
210 fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N]) {
211 self.chunk_size += N as i64;
212 if self.chunk_size < self.min_chunk_size {
213 return;
214 }
215 for j in 0..N {
216 self.rolling_hash = self
217 .rolling_hash
218 .wrapping_shl(1)
219 .wrapping_add(GEARHASH_TABLE[self.nth_run][bytes[j] as usize]);
220 self.has_matched =
221 self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
222 }
223 }
224
225 #[inline]
227 fn roll_level(&mut self, level: i16) {
228 self.roll_fixed(&level.to_le_bytes());
229 }
230
231 #[inline]
249 fn need_new_chunk(&mut self) -> bool {
250 if self.has_matched {
251 self.has_matched = false;
252 self.nth_run += 1;
253 if self.nth_run >= NUM_GEARHASH_TABLES {
254 self.nth_run = 0;
255 self.chunk_size = 0;
256 return true;
257 }
258 }
259 if self.chunk_size >= self.max_chunk_size {
260 self.chunk_size = 0;
261 return true;
262 }
263 false
264 }
265
266 fn calculate<F>(
279 &mut self,
280 def_levels: LevelDataRef<'_>,
281 rep_levels: LevelDataRef<'_>,
282 num_levels: usize,
283 mut roll_value: F,
284 ) -> Vec<CdcChunk>
285 where
286 F: FnMut(&mut Self, usize),
287 {
288 let has_def_levels = self.max_def_level > 0;
289 let has_rep_levels = self.max_rep_level > 0;
290
291 let mut chunks = Vec::new();
292 let mut prev_offset: usize = 0;
293 let mut prev_value_offset: usize = 0;
294 let mut value_offset: usize = 0;
295
296 if !has_rep_levels && !has_def_levels {
297 for offset in 0..num_levels {
305 roll_value(self, offset);
306 if self.need_new_chunk() {
307 chunks.push(CdcChunk {
308 level_offset: prev_offset,
309 num_levels: offset - prev_offset,
310 value_offset: prev_offset,
311 num_values: offset - prev_offset,
312 });
313 prev_offset = offset;
314 }
315 }
316 prev_value_offset = prev_offset;
317 value_offset = num_levels;
318 } else if !has_rep_levels {
319 #[allow(clippy::needless_range_loop)]
328 for offset in 0..num_levels {
329 let def_level = def_levels
330 .value_at(offset)
331 .expect("def_levels required when max_def_level > 0");
332 self.roll_level(def_level);
333 if def_level == self.max_def_level {
334 roll_value(self, offset);
338 }
339 if self.need_new_chunk() {
342 chunks.push(CdcChunk {
343 level_offset: prev_offset,
344 num_levels: offset - prev_offset,
345 value_offset: prev_value_offset,
346 num_values: value_offset - prev_value_offset,
347 });
348 prev_offset = offset;
349 prev_value_offset = value_offset;
350 }
351 if def_level == self.max_def_level {
352 value_offset += 1;
353 }
354 }
355 } else {
356 let mut leaf_offset: usize = 0;
387
388 for offset in 0..num_levels {
389 let def_level = def_levels
390 .value_at(offset)
391 .expect("def_levels required for nested data");
392 let rep_level = rep_levels
393 .value_at(offset)
394 .expect("rep_levels required for nested data");
395
396 self.roll_level(def_level);
397 self.roll_level(rep_level);
398 if def_level == self.max_def_level {
399 roll_value(self, leaf_offset);
400 }
401
402 if rep_level == 0 && self.need_new_chunk() {
405 let levels_to_write = offset - prev_offset;
406 if levels_to_write > 0 {
407 chunks.push(CdcChunk {
408 level_offset: prev_offset,
409 num_levels: levels_to_write,
410 value_offset: prev_value_offset,
411 num_values: value_offset - prev_value_offset,
412 });
413 prev_offset = offset;
414 prev_value_offset = value_offset;
415 }
416 }
417 if def_level == self.max_def_level {
418 value_offset += 1;
419 }
420 if def_level >= self.repeated_ancestor_def_level {
421 leaf_offset += 1;
422 }
423 }
424 }
425
426 if prev_offset < num_levels {
428 chunks.push(CdcChunk {
429 level_offset: prev_offset,
430 num_levels: num_levels - prev_offset,
431 value_offset: prev_value_offset,
432 num_values: value_offset - prev_value_offset,
433 });
434 }
435
436 #[cfg(debug_assertions)]
437 self.validate_chunks(&chunks, num_levels, value_offset);
438
439 chunks
440 }
441
442 #[cfg(feature = "arrow")]
445 pub(crate) fn get_arrow_chunks(
446 &mut self,
447 def_levels: LevelDataRef<'_>,
448 rep_levels: LevelDataRef<'_>,
449 array: &dyn arrow_array::Array,
450 ) -> Result<Vec<CdcChunk>> {
451 use arrow_array::cast::AsArray;
452 use arrow_schema::DataType;
453
454 let num_levels = match (def_levels.len(), rep_levels.len()) {
459 (0, 0) => array.len(),
460 (d, r) => d.max(r),
461 };
462
463 macro_rules! fixed_width {
464 ($N:literal) => {{
465 let data = array.to_data();
466 let buffer = data.buffers()[0].as_slice();
467 let values = &buffer[data.offset() * $N..];
468 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
469 let offset = i * $N;
470 let slice = &values[offset..offset + $N];
471 c.roll_fixed::<$N>(slice.try_into().unwrap());
472 })
473 }};
474 }
475
476 macro_rules! binary_like {
477 ($a:expr) => {{
478 let a = $a;
479 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
480 c.roll(a.value(i).as_ref());
481 })
482 }};
483 }
484
485 let dtype = array.data_type();
486 let chunks = match dtype {
487 DataType::Null => self.calculate(def_levels, rep_levels, num_levels, |_, _| {}),
488 DataType::Boolean => {
489 let a = array.as_boolean();
490 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
491 c.roll_fixed(&[a.value(i) as u8]);
492 })
493 }
494 DataType::Int8 | DataType::UInt8 => fixed_width!(1),
495 DataType::Int16 | DataType::UInt16 | DataType::Float16 => fixed_width!(2),
496 DataType::Int32
497 | DataType::UInt32
498 | DataType::Float32
499 | DataType::Date32
500 | DataType::Time32(_)
501 | DataType::Interval(arrow_schema::IntervalUnit::YearMonth)
502 | DataType::Decimal32(_, _) => fixed_width!(4),
503 DataType::Int64
504 | DataType::UInt64
505 | DataType::Float64
506 | DataType::Date64
507 | DataType::Time64(_)
508 | DataType::Timestamp(_, _)
509 | DataType::Duration(_)
510 | DataType::Interval(arrow_schema::IntervalUnit::DayTime)
511 | DataType::Decimal64(_, _) => fixed_width!(8),
512 DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
513 | DataType::Decimal128(_, _) => fixed_width!(16),
514 DataType::Decimal256(_, _) => fixed_width!(32),
515 DataType::FixedSizeBinary(_) => binary_like!(array.as_fixed_size_binary()),
516 DataType::Binary => binary_like!(array.as_binary::<i32>()),
517 DataType::LargeBinary => binary_like!(array.as_binary::<i64>()),
518 DataType::Utf8 => binary_like!(array.as_string::<i32>()),
519 DataType::LargeUtf8 => binary_like!(array.as_string::<i64>()),
520 DataType::BinaryView => binary_like!(array.as_binary_view()),
521 DataType::Utf8View => binary_like!(array.as_string_view()),
522 DataType::Dictionary(_, _) => {
523 let dict = array.as_any_dictionary();
524 self.get_arrow_chunks(def_levels, rep_levels, dict.keys())?
525 }
526 _ => {
527 return Err(ParquetError::General(format!(
528 "content-defined chunking is not supported for data type {dtype:?}",
529 )));
530 }
531 };
532 Ok(chunks)
533 }
534
535 #[cfg(debug_assertions)]
536 fn validate_chunks(&self, chunks: &[CdcChunk], num_levels: usize, total_values: usize) {
537 assert!(!chunks.is_empty(), "chunks must be non-empty");
538
539 let first = &chunks[0];
540 assert_eq!(first.level_offset, 0, "first chunk must start at level 0");
541 assert_eq!(first.value_offset, 0, "first chunk must start at value 0");
542
543 let mut sum_levels = first.num_levels;
544 let mut sum_values = first.num_values;
545 for i in 1..chunks.len() {
546 let chunk = &chunks[i];
547 let prev = &chunks[i - 1];
548 assert!(chunk.num_levels > 0, "chunk must have levels");
549 assert_eq!(
550 chunk.level_offset,
551 prev.level_offset + prev.num_levels,
552 "level offsets must be contiguous"
553 );
554 assert_eq!(
555 chunk.value_offset,
556 prev.value_offset + prev.num_values,
557 "value offsets must be contiguous"
558 );
559 sum_levels += chunk.num_levels;
560 sum_values += chunk.num_values;
561 }
562 assert_eq!(sum_levels, num_levels, "chunks must cover all levels");
563 assert_eq!(sum_values, total_values, "chunks must cover all values");
564
565 let last = chunks.last().unwrap();
566 assert_eq!(
567 last.level_offset + last.num_levels,
568 num_levels,
569 "last chunk must end at num_levels"
570 );
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use crate::basic::Type as PhysicalType;
578 #[cfg(feature = "arrow")]
579 use crate::column::writer::LevelDataRef;
580 use crate::schema::types::{ColumnPath, Type};
581 use std::sync::Arc;
582
583 fn make_desc(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
584 let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
585 .build()
586 .unwrap();
587 ColumnDescriptor::new(
588 Arc::new(tp),
589 max_def_level,
590 max_rep_level,
591 ColumnPath::new(vec![]),
592 )
593 }
594
595 #[test]
596 fn test_calculate_mask_defaults() {
597 let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 0).unwrap();
598 let expected = u64::MAX << (64 - 15);
601 assert_eq!(mask, expected);
602 }
603
604 #[test]
605 fn test_calculate_mask_with_norm_level() {
606 let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 1).unwrap();
607 let expected = u64::MAX << (64 - 14);
608 assert_eq!(mask, expected);
609 }
610
611 #[test]
612 fn test_calculate_mask_invalid() {
613 assert!(ContentDefinedChunker::calculate_mask(-1, 100, 0).is_err());
614 assert!(ContentDefinedChunker::calculate_mask(100, 50, 0).is_err());
615 assert!(ContentDefinedChunker::calculate_mask(100, 100, 0).is_err());
616 }
617
618 #[test]
619 fn test_non_nested_non_null_single_chunk() {
620 let options = CdcOptions {
621 min_chunk_size: 8,
622 max_chunk_size: 1024,
623 norm_level: 0,
624 };
625 let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
626
627 let num_values = 4;
629 let chunks = chunker.calculate(
630 LevelDataRef::Absent,
631 LevelDataRef::Absent,
632 num_values,
633 |c, i| {
634 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
635 },
636 );
637 assert_eq!(chunks.len(), 1);
638 assert_eq!(chunks[0].level_offset, 0);
639 assert_eq!(chunks[0].value_offset, 0);
640 assert_eq!(chunks[0].num_levels, 4);
641 }
642
643 #[test]
644 fn test_max_chunk_size_forces_boundary() {
645 let options = CdcOptions {
646 min_chunk_size: 256,
647 max_chunk_size: 1024,
648 norm_level: 0,
649 };
650 let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
651
652 let num_values = 2000;
655 let chunks = chunker.calculate(
656 LevelDataRef::Absent,
657 LevelDataRef::Absent,
658 num_values,
659 |c, i| {
660 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
661 },
662 );
663
664 assert!(chunks.len() > 1);
666
667 let mut total_levels = 0;
669 for (i, chunk) in chunks.iter().enumerate() {
670 assert_eq!(chunk.level_offset, total_levels);
671 if i < chunks.len() - 1 {
672 assert!(chunk.num_levels > 0);
673 }
674 total_levels += chunk.num_levels;
675 }
676 assert_eq!(total_levels, num_values);
677 }
678
679 #[test]
680 fn test_deterministic_chunks() {
681 let options = CdcOptions {
682 min_chunk_size: 4,
683 max_chunk_size: 64,
684 norm_level: 0,
685 };
686
687 let roll = |c: &mut ContentDefinedChunker, i: usize| {
688 c.roll_fixed::<8>(&(i as i64).to_le_bytes());
689 };
690
691 let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
692 let chunks1 = chunker1.calculate(LevelDataRef::Absent, LevelDataRef::Absent, 200, roll);
693
694 let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
695 let chunks2 = chunker2.calculate(LevelDataRef::Absent, LevelDataRef::Absent, 200, roll);
696
697 assert_eq!(chunks1.len(), chunks2.len());
698 for (a, b) in chunks1.iter().zip(chunks2.iter()) {
699 assert_eq!(a.level_offset, b.level_offset);
700 assert_eq!(a.num_levels, b.num_levels);
701 assert_eq!(a.value_offset, b.value_offset);
702 assert_eq!(a.num_values, b.num_values);
703 }
704 }
705
706 #[test]
707 fn test_nullable_non_nested() {
708 let options = CdcOptions {
709 min_chunk_size: 4,
710 max_chunk_size: 64,
711 norm_level: 0,
712 };
713 let mut chunker = ContentDefinedChunker::new(&make_desc(1, 0), &options).unwrap();
714
715 let num_levels = 20;
716 let def_levels: Vec<i16> = (0..num_levels)
719 .map(|i| if i % 3 == 0 { 0 } else { 1 })
720 .collect();
721 let expected_non_null: usize = def_levels.iter().filter(|&&d| d == 1).count();
722
723 let chunks = chunker.calculate(
724 LevelDataRef::Materialized(&def_levels),
725 LevelDataRef::Absent,
726 num_levels,
727 |c, i| {
728 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
729 },
730 );
731
732 assert!(!chunks.is_empty());
733 let total_levels: usize = chunks.iter().map(|c| c.num_levels).sum();
734 let total_values: usize = chunks.iter().map(|c| c.num_values).sum();
735 assert_eq!(total_levels, num_levels);
736 assert_eq!(total_values, expected_non_null);
737 assert!(total_values < total_levels);
739 }
740}
741
742#[cfg(all(test, feature = "arrow"))]
745mod arrow_tests {
746 use std::borrow::Borrow;
747 use std::sync::Arc;
748
749 use arrow::util::data_gen::create_random_batch;
750 use arrow_array::cast::AsArray;
751 use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
752 use arrow_buffer::Buffer;
753 use arrow_data::ArrayData;
754 use arrow_schema::{DataType, Field, Fields, Schema};
755
756 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
757 use crate::arrow::arrow_writer::ArrowWriter;
758 use crate::column::writer::LevelDataRef;
759 use crate::file::properties::{CdcOptions, WriterProperties};
760 use crate::file::reader::{FileReader, SerializedFileReader};
761
762 const CDC_MIN_CHUNK_SIZE: usize = 4 * 1024;
765 const CDC_MAX_CHUNK_SIZE: usize = 16 * 1024;
766 const CDC_PART_SIZE: usize = 128 * 1024;
767 const CDC_EDIT_SIZE: usize = 128;
768 const CDC_ROW_GROUP_LENGTH: usize = 1024 * 1024;
769
770 fn test_hash(seed: u64, index: u64) -> u64 {
774 let mut h = (index.wrapping_add(seed)).wrapping_mul(0xc4ceb9fe1a85ec53u64);
775 h ^= h >> 33;
776 h = h.wrapping_mul(0xff51afd7ed558ccdu64);
777 h ^= h >> 33;
778 h = h.wrapping_mul(0xc4ceb9fe1a85ec53u64);
779 h ^= h >> 33;
780 h
781 }
782
783 fn generate_array(dtype: &DataType, nullable: bool, length: usize, seed: u64) -> ArrayRef {
785 macro_rules! gen_primitive {
786 ($array_type:ty, $cast:expr) => {{
787 if nullable {
788 let arr: $array_type = (0..length)
789 .map(|i| {
790 let val = test_hash(seed, i as u64);
791 if val % 10 == 0 {
792 None
793 } else {
794 Some($cast(val))
795 }
796 })
797 .collect();
798 Arc::new(arr) as ArrayRef
799 } else {
800 let arr: $array_type = (0..length)
801 .map(|i| Some($cast(test_hash(seed, i as u64))))
802 .collect();
803 Arc::new(arr) as ArrayRef
804 }
805 }};
806 }
807
808 match dtype {
809 DataType::Boolean => {
810 if nullable {
811 let arr: BooleanArray = (0..length)
812 .map(|i| {
813 let val = test_hash(seed, i as u64);
814 if val % 10 == 0 {
815 None
816 } else {
817 Some(val % 2 == 0)
818 }
819 })
820 .collect();
821 Arc::new(arr)
822 } else {
823 let arr: BooleanArray = (0..length)
824 .map(|i| Some(test_hash(seed, i as u64) % 2 == 0))
825 .collect();
826 Arc::new(arr)
827 }
828 }
829 DataType::Int32 => gen_primitive!(Int32Array, |v: u64| v as i32),
830 DataType::Int64 => {
831 gen_primitive!(arrow_array::Int64Array, |v: u64| v as i64)
832 }
833 DataType::Float64 => {
834 gen_primitive!(arrow_array::Float64Array, |v: u64| (v % 100000) as f64
835 / 1000.0)
836 }
837 DataType::Utf8 => {
838 let arr: arrow_array::StringArray = if nullable {
839 (0..length)
840 .map(|i| {
841 let val = test_hash(seed, i as u64);
842 if val % 10 == 0 {
843 None
844 } else {
845 Some(format!("str_{val}"))
846 }
847 })
848 .collect()
849 } else {
850 (0..length)
851 .map(|i| Some(format!("str_{}", test_hash(seed, i as u64))))
852 .collect()
853 };
854 Arc::new(arr)
855 }
856 DataType::Binary => {
857 let arr: arrow_array::BinaryArray = if nullable {
858 (0..length)
859 .map(|i| {
860 let val = test_hash(seed, i as u64);
861 if val % 10 == 0 {
862 None
863 } else {
864 Some(format!("bin_{val}").into_bytes())
865 }
866 })
867 .collect()
868 } else {
869 (0..length)
870 .map(|i| Some(format!("bin_{}", test_hash(seed, i as u64)).into_bytes()))
871 .collect()
872 };
873 Arc::new(arr)
874 }
875 DataType::FixedSizeBinary(size) => {
876 let size = *size;
877 let mut builder = arrow_array::builder::FixedSizeBinaryBuilder::new(size);
878 for i in 0..length {
879 let val = test_hash(seed, i as u64);
880 if nullable && val % 10 == 0 {
881 builder.append_null();
882 } else {
883 let s = format!("bin_{val}");
884 let bytes = s.as_bytes();
885 let mut buf = vec![0u8; size as usize];
886 let copy_len = bytes.len().min(size as usize);
887 buf[..copy_len].copy_from_slice(&bytes[..copy_len]);
888 builder.append_value(&buf).unwrap();
889 }
890 }
891 Arc::new(builder.finish())
892 }
893 DataType::Date32 => {
894 gen_primitive!(arrow_array::Date32Array, |v: u64| v as i32)
895 }
896 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
897 gen_primitive!(arrow_array::TimestampNanosecondArray, |v: u64| v as i64)
898 }
899 _ => panic!("Unsupported test data type: {dtype:?}"),
900 }
901 }
902
903 fn generate_table(schema: &Arc<Schema>, length: usize, seed: u64) -> RecordBatch {
905 let arrays: Vec<ArrayRef> = schema
906 .fields()
907 .iter()
908 .enumerate()
909 .map(|(i, field)| {
910 generate_array(
911 field.data_type(),
912 field.is_nullable(),
913 length,
914 seed + i as u64 * 10,
915 )
916 })
917 .collect();
918 RecordBatch::try_new(schema.clone(), arrays).unwrap()
919 }
920
921 fn cdc_byte_width(dtype: &DataType) -> usize {
924 match dtype {
925 DataType::Boolean => 1,
926 DataType::Int8 | DataType::UInt8 => 1,
927 DataType::Int16 | DataType::UInt16 | DataType::Float16 => 2,
928 DataType::Int32
929 | DataType::UInt32
930 | DataType::Float32
931 | DataType::Date32
932 | DataType::Time32(_) => 4,
933 DataType::Int64
934 | DataType::UInt64
935 | DataType::Float64
936 | DataType::Date64
937 | DataType::Time64(_)
938 | DataType::Timestamp(_, _)
939 | DataType::Duration(_) => 8,
940 DataType::Decimal128(_, _) => 16,
941 DataType::Decimal256(_, _) => 32,
942 DataType::FixedSizeBinary(n) => *n as usize,
943 _ => 0, }
945 }
946
947 fn bytes_per_record(dtype: &DataType, nullable: bool) -> usize {
949 let bw = cdc_byte_width(dtype);
950 if bw > 0 {
951 if nullable { bw + 2 } else { bw }
952 } else {
953 16 }
955 }
956
957 fn calculate_cdc_size(array: &dyn Array, nullable: bool) -> i64 {
959 let dtype = array.data_type();
960 let bw = cdc_byte_width(dtype);
961 let result = if bw > 0 {
962 let valid_count = array.len() - array.null_count();
964 (valid_count * bw) as i64
965 } else {
966 match dtype {
968 DataType::Utf8 => {
969 let a = array.as_string::<i32>();
970 (0..a.len())
971 .filter(|&i| a.is_valid(i))
972 .map(|i| a.value(i).len() as i64)
973 .sum()
974 }
975 DataType::Binary => {
976 let a = array.as_binary::<i32>();
977 (0..a.len())
978 .filter(|&i| a.is_valid(i))
979 .map(|i| a.value(i).len() as i64)
980 .sum()
981 }
982 DataType::LargeBinary => {
983 let a = array.as_binary::<i64>();
984 (0..a.len())
985 .filter(|&i| a.is_valid(i))
986 .map(|i| a.value(i).len() as i64)
987 .sum()
988 }
989 _ => panic!("CDC size calculation not implemented for {dtype:?}"),
990 }
991 };
992
993 if nullable {
994 result + array.len() as i64 * 2
996 } else {
997 result
998 }
999 }
1000
1001 struct ColumnInfo {
1003 page_lengths: Vec<i64>,
1004 has_dictionary_page: bool,
1005 }
1006
1007 fn get_column_info(data: &[u8], column_index: usize) -> Vec<ColumnInfo> {
1009 let reader = SerializedFileReader::new(bytes::Bytes::from(data.to_vec())).unwrap();
1010 let metadata = reader.metadata();
1011 let mut result = Vec::new();
1012 for rg in 0..metadata.num_row_groups() {
1013 let rg_reader = reader.get_row_group(rg).unwrap();
1014 let col_reader = rg_reader.get_column_page_reader(column_index).unwrap();
1015 let mut info = ColumnInfo {
1016 page_lengths: Vec::new(),
1017 has_dictionary_page: false,
1018 };
1019 for page in col_reader {
1020 let page = page.unwrap();
1021 match page.page_type() {
1022 crate::basic::PageType::DATA_PAGE | crate::basic::PageType::DATA_PAGE_V2 => {
1023 info.page_lengths.push(page.num_values() as i64);
1024 }
1025 crate::basic::PageType::DICTIONARY_PAGE => {
1026 info.has_dictionary_page = true;
1027 }
1028 _ => {}
1029 }
1030 }
1031 result.push(info);
1032 }
1033 result
1034 }
1035
1036 fn assert_cdc_chunk_sizes(
1039 array: &ArrayRef,
1040 info: &ColumnInfo,
1041 nullable: bool,
1042 min_chunk_size: usize,
1043 max_chunk_size: usize,
1044 expect_dictionary_page: bool,
1045 ) {
1046 let expect_dict = match array.data_type() {
1048 DataType::Boolean | DataType::FixedSizeBinary(_) => false,
1049 _ => expect_dictionary_page,
1050 };
1051 assert_eq!(
1052 info.has_dictionary_page,
1053 expect_dict,
1054 "dictionary page mismatch for {:?}",
1055 array.data_type()
1056 );
1057
1058 let page_lengths = &info.page_lengths;
1059 assert!(
1060 page_lengths.len() > 1,
1061 "CDC should produce multiple pages, got {page_lengths:?}"
1062 );
1063
1064 let bw = cdc_byte_width(array.data_type());
1065 if bw > 0
1067 || matches!(
1068 array.data_type(),
1069 DataType::Utf8 | DataType::Binary | DataType::LargeBinary
1070 )
1071 {
1072 let mut offset = 0i64;
1073 for (i, &page_len) in page_lengths.iter().enumerate() {
1074 let slice = array.slice(offset as usize, page_len as usize);
1075 let cdc_size = calculate_cdc_size(slice.as_ref(), nullable);
1076 if i < page_lengths.len() - 1 {
1077 assert!(
1078 cdc_size >= min_chunk_size as i64,
1079 "Page {i}: CDC size {cdc_size} < min {min_chunk_size}, pages={page_lengths:?}"
1080 );
1081 }
1082 assert!(
1083 cdc_size <= max_chunk_size as i64,
1084 "Page {i}: CDC size {cdc_size} > max {max_chunk_size}, pages={page_lengths:?}"
1085 );
1086 offset += page_len;
1087 }
1088 assert_eq!(
1089 offset,
1090 array.len() as i64,
1091 "page lengths must sum to array length"
1092 );
1093 }
1094 }
1095
1096 fn write_with_cdc_options(
1099 batches: &[&RecordBatch],
1100 min_chunk_size: usize,
1101 max_chunk_size: usize,
1102 max_row_group_rows: Option<usize>,
1103 enable_dictionary: bool,
1104 ) -> Vec<u8> {
1105 assert!(!batches.is_empty());
1106 let schema = batches[0].schema();
1107 let mut builder = WriterProperties::builder()
1108 .set_dictionary_enabled(enable_dictionary)
1109 .set_content_defined_chunking(Some(CdcOptions {
1110 min_chunk_size,
1111 max_chunk_size,
1112 norm_level: 0,
1113 }));
1114 if let Some(max_rows) = max_row_group_rows {
1115 builder = builder.set_max_row_group_row_count(Some(max_rows));
1116 }
1117 let props = builder.build();
1118 let mut buf = Vec::new();
1119 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
1120 for batch in batches {
1121 writer.write(batch).unwrap();
1122 }
1123 writer.close().unwrap();
1124
1125 let readback = read_batches(&buf);
1127 let original_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1128 let readback_rows: usize = readback.iter().map(|b| b.num_rows()).sum();
1129 assert_eq!(original_rows, readback_rows, "Roundtrip row count mismatch");
1130 if original_rows > 0 {
1131 let original = concat_batches(batches.iter().copied());
1132 let roundtrip = concat_batches(&readback);
1133 assert_eq!(original, roundtrip, "Roundtrip validation failed");
1134 }
1135
1136 buf
1137 }
1138
1139 #[test]
1140 fn cdc_all_null_arrow_column_writes_data_pages() {
1141 let array = Arc::new(Int32Array::from(vec![None::<i32>; 4096])) as ArrayRef;
1142 let schema = Arc::new(Schema::new(vec![Field::new("f0", DataType::Int32, true)]));
1143 let batch = RecordBatch::try_new(schema, vec![array.clone()]).unwrap();
1144
1145 let data = write_with_cdc_options(&[&batch], 64, 256, Some(4096), false);
1146 let info = get_column_info(&data, 0);
1147
1148 assert_eq!(info.len(), 1);
1149 assert!(
1150 !info[0].page_lengths.is_empty(),
1151 "all-null CDC write should still emit data pages"
1152 );
1153 assert_eq!(
1154 info[0].page_lengths.iter().sum::<i64>(),
1155 array.len() as i64,
1156 "all-null CDC pages should account for every input row"
1157 );
1158 }
1159
1160 fn read_batches(data: &[u8]) -> Vec<RecordBatch> {
1161 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(data.to_vec()))
1162 .unwrap()
1163 .build()
1164 .unwrap();
1165 reader.collect::<std::result::Result<Vec<_>, _>>().unwrap()
1166 }
1167
1168 fn concat_batches(batches: impl IntoIterator<Item = impl Borrow<RecordBatch>>) -> RecordBatch {
1169 let batches: Vec<_> = batches.into_iter().collect();
1170 let schema = batches[0].borrow().schema();
1171 let batches = batches.iter().map(|b| b.borrow());
1172 arrow_select::concat::concat_batches(&schema, batches).unwrap()
1173 }
1174
1175 fn find_differences(first: &[i64], second: &[i64]) -> Vec<(Vec<i64>, Vec<i64>)> {
1178 let n = first.len();
1179 let m = second.len();
1180 let mut dp = vec![vec![0usize; m + 1]; n + 1];
1181 for i in 0..n {
1182 for j in 0..m {
1183 if first[i] == second[j] {
1184 dp[i + 1][j + 1] = dp[i][j] + 1;
1185 } else {
1186 dp[i + 1][j + 1] = dp[i + 1][j].max(dp[i][j + 1]);
1187 }
1188 }
1189 }
1190 let mut common = Vec::new();
1191 let (mut i, mut j) = (n, m);
1192 while i > 0 && j > 0 {
1193 if first[i - 1] == second[j - 1] {
1194 common.push((i - 1, j - 1));
1195 i -= 1;
1196 j -= 1;
1197 } else if dp[i - 1][j] >= dp[i][j - 1] {
1198 i -= 1;
1199 } else {
1200 j -= 1;
1201 }
1202 }
1203 common.reverse();
1204
1205 let mut result = Vec::new();
1206 let (mut last_i, mut last_j) = (0usize, 0usize);
1207 for (ci, cj) in &common {
1208 if *ci > last_i || *cj > last_j {
1209 result.push((first[last_i..*ci].to_vec(), second[last_j..*cj].to_vec()));
1210 }
1211 last_i = ci + 1;
1212 last_j = cj + 1;
1213 }
1214 if last_i < n || last_j < m {
1215 result.push((first[last_i..].to_vec(), second[last_j..].to_vec()));
1216 }
1217
1218 let mut merged: Vec<(Vec<i64>, Vec<i64>)> = Vec::new();
1220 for diff in result {
1221 if let Some(prev) = merged.last_mut() {
1222 if prev.0.is_empty() && diff.1.is_empty() {
1223 prev.0 = diff.0;
1224 continue;
1225 } else if prev.1.is_empty() && diff.0.is_empty() {
1226 prev.1 = diff.1;
1227 continue;
1228 }
1229 }
1230 merged.push(diff);
1231 }
1232 merged
1233 }
1234
1235 fn assert_page_length_differences(
1238 original: &ColumnInfo,
1239 modified: &ColumnInfo,
1240 exact_equal_diffs: usize,
1241 exact_larger_diffs: usize,
1242 exact_smaller_diffs: usize,
1243 edit_length: i64,
1244 ) {
1245 let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1246 let expected = exact_equal_diffs + exact_larger_diffs + exact_smaller_diffs;
1247
1248 if diffs.len() != expected {
1249 eprintln!("Original: {:?}", original.page_lengths);
1250 eprintln!("Modified: {:?}", modified.page_lengths);
1251 for d in &diffs {
1252 eprintln!(" Diff: {:?} vs {:?}", d.0, d.1);
1253 }
1254 }
1255 assert_eq!(
1256 diffs.len(),
1257 expected,
1258 "Expected {expected} diffs, got {}",
1259 diffs.len()
1260 );
1261
1262 let (mut eq, mut larger, mut smaller) = (0usize, 0usize, 0usize);
1263 for (left, right) in &diffs {
1264 let left_sum: i64 = left.iter().sum();
1265 let right_sum: i64 = right.iter().sum();
1266 if left_sum == right_sum {
1267 eq += 1;
1268 } else if left_sum < right_sum {
1269 larger += 1;
1270 assert_eq!(
1271 left_sum + edit_length,
1272 right_sum,
1273 "Larger diff mismatch: {left_sum} + {edit_length} != {right_sum}"
1274 );
1275 } else {
1276 smaller += 1;
1277 assert_eq!(
1278 left_sum,
1279 right_sum + edit_length,
1280 "Smaller diff mismatch: {left_sum} != {right_sum} + {edit_length}"
1281 );
1282 }
1283 }
1284
1285 assert_eq!(eq, exact_equal_diffs, "equal diffs count");
1286 assert_eq!(larger, exact_larger_diffs, "larger diffs count");
1287 assert_eq!(smaller, exact_smaller_diffs, "smaller diffs count");
1288 }
1289
1290 fn assert_page_length_differences_update(
1293 original: &ColumnInfo,
1294 modified: &ColumnInfo,
1295 max_equal_diffs: usize,
1296 ) {
1297 let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1298 assert!(
1299 diffs.len() <= max_equal_diffs,
1300 "Expected at most {max_equal_diffs} diffs, got {}",
1301 diffs.len()
1302 );
1303 for (left, right) in &diffs {
1304 let left_sum: i64 = left.iter().sum();
1305 let right_sum: i64 = right.iter().sum();
1306 assert_eq!(
1307 left_sum, right_sum,
1308 "Update diff should not change total row count"
1309 );
1310 }
1311 }
1312
1313 #[test]
1316 fn test_find_differences_basic() {
1317 let diffs = find_differences(&[1, 2, 3, 4, 5], &[1, 7, 8, 4, 5]);
1318 assert_eq!(diffs.len(), 1);
1319 assert_eq!(diffs[0].0, vec![2, 3]);
1320 assert_eq!(diffs[0].1, vec![7, 8]);
1321 }
1322
1323 #[test]
1324 fn test_find_differences_multiple() {
1325 let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7], &[1, 8, 9, 4, 10, 6, 11]);
1326 assert_eq!(diffs.len(), 3);
1327 assert_eq!(diffs[0].0, vec![2, 3]);
1328 assert_eq!(diffs[0].1, vec![8, 9]);
1329 assert_eq!(diffs[1].0, vec![5]);
1330 assert_eq!(diffs[1].1, vec![10]);
1331 assert_eq!(diffs[2].0, vec![7]);
1332 assert_eq!(diffs[2].1, vec![11]);
1333 }
1334
1335 #[test]
1336 fn test_find_differences_different_lengths() {
1337 let diffs = find_differences(&[1, 2, 3], &[1, 2, 3, 4, 5]);
1338 assert_eq!(diffs.len(), 1);
1339 assert!(diffs[0].0.is_empty());
1340 assert_eq!(diffs[0].1, vec![4, 5]);
1341 }
1342
1343 #[test]
1344 fn test_find_differences_empty() {
1345 let diffs = find_differences(&[], &[]);
1346 assert!(diffs.is_empty());
1347 }
1348
1349 #[test]
1350 fn test_find_differences_changes_at_both_ends() {
1351 let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7, 8, 9], &[0, 0, 2, 3, 4, 5, 7, 7, 8]);
1352 assert_eq!(diffs.len(), 3);
1353 assert_eq!(diffs[0].0, vec![1]);
1354 assert_eq!(diffs[0].1, vec![0, 0]);
1355 assert_eq!(diffs[1].0, vec![6]);
1356 assert_eq!(diffs[1].1, vec![7]);
1357 assert_eq!(diffs[2].0, vec![9]);
1358 assert!(diffs[2].1.is_empty());
1359 }
1360
1361 #[test]
1362 fn test_find_differences_additional() {
1363 let diffs = find_differences(
1364 &[445, 312, 393, 401, 410, 138, 558, 457],
1365 &[445, 312, 393, 393, 410, 138, 558, 457],
1366 );
1367 assert_eq!(diffs.len(), 1);
1368 assert_eq!(diffs[0].0, vec![401]);
1369 assert_eq!(diffs[0].1, vec![393]);
1370 }
1371
1372 macro_rules! cdc_single_rg_tests {
1375 ($mod_name:ident, $dtype:expr, $nullable:expr) => {
1376 mod $mod_name {
1377 use super::*;
1378
1379 fn config() -> (DataType, bool, usize, usize) {
1380 let dtype: DataType = $dtype;
1381 let nullable: bool = $nullable;
1382 let bpr = bytes_per_record(&dtype, nullable);
1383 let part_length = CDC_PART_SIZE / bpr;
1384 let edit_length = CDC_EDIT_SIZE / bpr;
1385 (dtype, nullable, part_length, edit_length)
1386 }
1387
1388 fn make_schema(dtype: &DataType, nullable: bool) -> Arc<Schema> {
1389 Arc::new(Schema::new(vec![Field::new("f0", dtype.clone(), nullable)]))
1390 }
1391
1392 #[test]
1393 fn delete_once() {
1394 let (dtype, nullable, part_length, edit_length) = config();
1395 let schema = make_schema(&dtype, nullable);
1396
1397 let part1 = generate_table(&schema, part_length, 0);
1398 let part2 = generate_table(&schema, edit_length, 1);
1399 let part3 = generate_table(&schema, part_length, part_length as u64);
1400
1401 let base = concat_batches([&part1, &part2, &part3]);
1402 let modified = concat_batches([&part1, &part3]);
1403
1404 for enable_dictionary in [false, true] {
1405 let base_data = write_with_cdc_options(
1406 &[&base],
1407 CDC_MIN_CHUNK_SIZE,
1408 CDC_MAX_CHUNK_SIZE,
1409 Some(CDC_ROW_GROUP_LENGTH),
1410 enable_dictionary,
1411 );
1412 let mod_data = write_with_cdc_options(
1413 &[&modified],
1414 CDC_MIN_CHUNK_SIZE,
1415 CDC_MAX_CHUNK_SIZE,
1416 Some(CDC_ROW_GROUP_LENGTH),
1417 enable_dictionary,
1418 );
1419
1420 let base_info = get_column_info(&base_data, 0);
1421 let mod_info = get_column_info(&mod_data, 0);
1422 assert_eq!(base_info.len(), 1);
1423 assert_eq!(mod_info.len(), 1);
1424
1425 assert_cdc_chunk_sizes(
1426 &base.column(0).clone(),
1427 &base_info[0],
1428 nullable,
1429 CDC_MIN_CHUNK_SIZE,
1430 CDC_MAX_CHUNK_SIZE,
1431 enable_dictionary,
1432 );
1433 assert_cdc_chunk_sizes(
1434 &modified.column(0).clone(),
1435 &mod_info[0],
1436 nullable,
1437 CDC_MIN_CHUNK_SIZE,
1438 CDC_MAX_CHUNK_SIZE,
1439 enable_dictionary,
1440 );
1441
1442 assert_page_length_differences(
1443 &base_info[0],
1444 &mod_info[0],
1445 0,
1446 0,
1447 1,
1448 edit_length as i64,
1449 );
1450 }
1451 }
1452
1453 #[test]
1454 fn delete_twice() {
1455 let (dtype, nullable, part_length, edit_length) = config();
1456 let schema = make_schema(&dtype, nullable);
1457
1458 let part1 = generate_table(&schema, part_length, 0);
1459 let part2 = generate_table(&schema, edit_length, 1);
1460 let part3 = generate_table(&schema, part_length, part_length as u64);
1461 let part4 = generate_table(&schema, edit_length, 2);
1462 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1463
1464 let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1465 let modified = concat_batches([&part1, &part3, &part5]);
1466
1467 for enable_dictionary in [false, true] {
1468 let base_data = write_with_cdc_options(
1469 &[&base],
1470 CDC_MIN_CHUNK_SIZE,
1471 CDC_MAX_CHUNK_SIZE,
1472 Some(CDC_ROW_GROUP_LENGTH),
1473 enable_dictionary,
1474 );
1475 let mod_data = write_with_cdc_options(
1476 &[&modified],
1477 CDC_MIN_CHUNK_SIZE,
1478 CDC_MAX_CHUNK_SIZE,
1479 Some(CDC_ROW_GROUP_LENGTH),
1480 enable_dictionary,
1481 );
1482
1483 let base_info = get_column_info(&base_data, 0);
1484 let mod_info = get_column_info(&mod_data, 0);
1485 assert_eq!(base_info.len(), 1);
1486 assert_eq!(mod_info.len(), 1);
1487
1488 assert_cdc_chunk_sizes(
1489 &base.column(0).clone(),
1490 &base_info[0],
1491 nullable,
1492 CDC_MIN_CHUNK_SIZE,
1493 CDC_MAX_CHUNK_SIZE,
1494 enable_dictionary,
1495 );
1496 assert_cdc_chunk_sizes(
1497 &modified.column(0).clone(),
1498 &mod_info[0],
1499 nullable,
1500 CDC_MIN_CHUNK_SIZE,
1501 CDC_MAX_CHUNK_SIZE,
1502 enable_dictionary,
1503 );
1504
1505 assert_page_length_differences(
1506 &base_info[0],
1507 &mod_info[0],
1508 0,
1509 0,
1510 2,
1511 edit_length as i64,
1512 );
1513 }
1514 }
1515
1516 #[test]
1517 fn insert_once() {
1518 let (dtype, nullable, part_length, edit_length) = config();
1519 let schema = make_schema(&dtype, nullable);
1520
1521 let part1 = generate_table(&schema, part_length, 0);
1522 let part2 = generate_table(&schema, edit_length, 1);
1523 let part3 = generate_table(&schema, part_length, part_length as u64);
1524
1525 let base = concat_batches([&part1, &part3]);
1526 let modified = concat_batches([&part1, &part2, &part3]);
1527
1528 for enable_dictionary in [false, true] {
1529 let base_data = write_with_cdc_options(
1530 &[&base],
1531 CDC_MIN_CHUNK_SIZE,
1532 CDC_MAX_CHUNK_SIZE,
1533 Some(CDC_ROW_GROUP_LENGTH),
1534 enable_dictionary,
1535 );
1536 let mod_data = write_with_cdc_options(
1537 &[&modified],
1538 CDC_MIN_CHUNK_SIZE,
1539 CDC_MAX_CHUNK_SIZE,
1540 Some(CDC_ROW_GROUP_LENGTH),
1541 enable_dictionary,
1542 );
1543
1544 let base_info = get_column_info(&base_data, 0);
1545 let mod_info = get_column_info(&mod_data, 0);
1546 assert_eq!(base_info.len(), 1);
1547 assert_eq!(mod_info.len(), 1);
1548
1549 assert_cdc_chunk_sizes(
1550 &base.column(0).clone(),
1551 &base_info[0],
1552 nullable,
1553 CDC_MIN_CHUNK_SIZE,
1554 CDC_MAX_CHUNK_SIZE,
1555 enable_dictionary,
1556 );
1557 assert_cdc_chunk_sizes(
1558 &modified.column(0).clone(),
1559 &mod_info[0],
1560 nullable,
1561 CDC_MIN_CHUNK_SIZE,
1562 CDC_MAX_CHUNK_SIZE,
1563 enable_dictionary,
1564 );
1565
1566 assert_page_length_differences(
1567 &base_info[0],
1568 &mod_info[0],
1569 0,
1570 1,
1571 0,
1572 edit_length as i64,
1573 );
1574 }
1575 }
1576
1577 #[test]
1578 fn insert_twice() {
1579 let (dtype, nullable, part_length, edit_length) = config();
1580 let schema = make_schema(&dtype, nullable);
1581
1582 let part1 = generate_table(&schema, part_length, 0);
1583 let part2 = generate_table(&schema, edit_length, 1);
1584 let part3 = generate_table(&schema, part_length, part_length as u64);
1585 let part4 = generate_table(&schema, edit_length, 2);
1586 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1587
1588 let base = concat_batches([&part1, &part3, &part5]);
1589 let modified = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1590
1591 for enable_dictionary in [false, true] {
1592 let base_data = write_with_cdc_options(
1593 &[&base],
1594 CDC_MIN_CHUNK_SIZE,
1595 CDC_MAX_CHUNK_SIZE,
1596 Some(CDC_ROW_GROUP_LENGTH),
1597 enable_dictionary,
1598 );
1599 let mod_data = write_with_cdc_options(
1600 &[&modified],
1601 CDC_MIN_CHUNK_SIZE,
1602 CDC_MAX_CHUNK_SIZE,
1603 Some(CDC_ROW_GROUP_LENGTH),
1604 enable_dictionary,
1605 );
1606
1607 let base_info = get_column_info(&base_data, 0);
1608 let mod_info = get_column_info(&mod_data, 0);
1609 assert_eq!(base_info.len(), 1);
1610 assert_eq!(mod_info.len(), 1);
1611
1612 assert_cdc_chunk_sizes(
1613 &base.column(0).clone(),
1614 &base_info[0],
1615 nullable,
1616 CDC_MIN_CHUNK_SIZE,
1617 CDC_MAX_CHUNK_SIZE,
1618 enable_dictionary,
1619 );
1620 assert_cdc_chunk_sizes(
1621 &modified.column(0).clone(),
1622 &mod_info[0],
1623 nullable,
1624 CDC_MIN_CHUNK_SIZE,
1625 CDC_MAX_CHUNK_SIZE,
1626 enable_dictionary,
1627 );
1628
1629 assert_page_length_differences(
1630 &base_info[0],
1631 &mod_info[0],
1632 0,
1633 2,
1634 0,
1635 edit_length as i64,
1636 );
1637 }
1638 }
1639
1640 #[test]
1641 fn update_once() {
1642 let (dtype, nullable, part_length, edit_length) = config();
1643 let schema = make_schema(&dtype, nullable);
1644
1645 let part1 = generate_table(&schema, part_length, 0);
1646 let part2 = generate_table(&schema, edit_length, 1);
1647 let part3 = generate_table(&schema, part_length, part_length as u64);
1648 let part4 = generate_table(&schema, edit_length, 2);
1649
1650 let base = concat_batches([&part1, &part2, &part3]);
1651 let modified = concat_batches([&part1, &part4, &part3]);
1652
1653 for enable_dictionary in [false, true] {
1654 let base_data = write_with_cdc_options(
1655 &[&base],
1656 CDC_MIN_CHUNK_SIZE,
1657 CDC_MAX_CHUNK_SIZE,
1658 Some(CDC_ROW_GROUP_LENGTH),
1659 enable_dictionary,
1660 );
1661 let mod_data = write_with_cdc_options(
1662 &[&modified],
1663 CDC_MIN_CHUNK_SIZE,
1664 CDC_MAX_CHUNK_SIZE,
1665 Some(CDC_ROW_GROUP_LENGTH),
1666 enable_dictionary,
1667 );
1668
1669 let base_info = get_column_info(&base_data, 0);
1670 let mod_info = get_column_info(&mod_data, 0);
1671 assert_eq!(base_info.len(), 1);
1672 assert_eq!(mod_info.len(), 1);
1673
1674 assert_cdc_chunk_sizes(
1675 &base.column(0).clone(),
1676 &base_info[0],
1677 nullable,
1678 CDC_MIN_CHUNK_SIZE,
1679 CDC_MAX_CHUNK_SIZE,
1680 enable_dictionary,
1681 );
1682 assert_cdc_chunk_sizes(
1683 &modified.column(0).clone(),
1684 &mod_info[0],
1685 nullable,
1686 CDC_MIN_CHUNK_SIZE,
1687 CDC_MAX_CHUNK_SIZE,
1688 enable_dictionary,
1689 );
1690
1691 assert_page_length_differences_update(&base_info[0], &mod_info[0], 1);
1692 }
1693 }
1694
1695 #[test]
1696 fn update_twice() {
1697 let (dtype, nullable, part_length, edit_length) = config();
1698 let schema = make_schema(&dtype, nullable);
1699
1700 let part1 = generate_table(&schema, part_length, 0);
1701 let part2 = generate_table(&schema, edit_length, 1);
1702 let part3 = generate_table(&schema, part_length, part_length as u64);
1703 let part4 = generate_table(&schema, edit_length, 2);
1704 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1705 let part6 = generate_table(&schema, edit_length, 3);
1706 let part7 = generate_table(&schema, edit_length, 4);
1707
1708 let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1709 let modified = concat_batches([&part1, &part6, &part3, &part7, &part5]);
1710
1711 for enable_dictionary in [false, true] {
1712 let base_data = write_with_cdc_options(
1713 &[&base],
1714 CDC_MIN_CHUNK_SIZE,
1715 CDC_MAX_CHUNK_SIZE,
1716 Some(CDC_ROW_GROUP_LENGTH),
1717 enable_dictionary,
1718 );
1719 let mod_data = write_with_cdc_options(
1720 &[&modified],
1721 CDC_MIN_CHUNK_SIZE,
1722 CDC_MAX_CHUNK_SIZE,
1723 Some(CDC_ROW_GROUP_LENGTH),
1724 enable_dictionary,
1725 );
1726
1727 let base_info = get_column_info(&base_data, 0);
1728 let mod_info = get_column_info(&mod_data, 0);
1729 assert_eq!(base_info.len(), 1);
1730 assert_eq!(mod_info.len(), 1);
1731
1732 assert_cdc_chunk_sizes(
1733 &base.column(0).clone(),
1734 &base_info[0],
1735 nullable,
1736 CDC_MIN_CHUNK_SIZE,
1737 CDC_MAX_CHUNK_SIZE,
1738 enable_dictionary,
1739 );
1740 assert_cdc_chunk_sizes(
1741 &modified.column(0).clone(),
1742 &mod_info[0],
1743 nullable,
1744 CDC_MIN_CHUNK_SIZE,
1745 CDC_MAX_CHUNK_SIZE,
1746 enable_dictionary,
1747 );
1748
1749 assert_page_length_differences_update(&base_info[0], &mod_info[0], 2);
1750 }
1751 }
1752
1753 #[test]
1754 fn prepend() {
1755 let (dtype, nullable, part_length, edit_length) = config();
1756 let schema = make_schema(&dtype, nullable);
1757
1758 let part1 = generate_table(&schema, part_length, 0);
1759 let part2 = generate_table(&schema, edit_length, 1);
1760 let part3 = generate_table(&schema, part_length, part_length as u64);
1761 let part4 = generate_table(&schema, edit_length, 2);
1762
1763 let base = concat_batches([&part1, &part2, &part3]);
1764 let modified = concat_batches([&part4, &part1, &part2, &part3]);
1765
1766 for enable_dictionary in [false, true] {
1767 let base_data = write_with_cdc_options(
1768 &[&base],
1769 CDC_MIN_CHUNK_SIZE,
1770 CDC_MAX_CHUNK_SIZE,
1771 Some(CDC_ROW_GROUP_LENGTH),
1772 enable_dictionary,
1773 );
1774 let mod_data = write_with_cdc_options(
1775 &[&modified],
1776 CDC_MIN_CHUNK_SIZE,
1777 CDC_MAX_CHUNK_SIZE,
1778 Some(CDC_ROW_GROUP_LENGTH),
1779 enable_dictionary,
1780 );
1781
1782 let base_info = get_column_info(&base_data, 0);
1783 let mod_info = get_column_info(&mod_data, 0);
1784 assert_eq!(base_info.len(), 1);
1785 assert_eq!(mod_info.len(), 1);
1786
1787 assert_cdc_chunk_sizes(
1788 &base.column(0).clone(),
1789 &base_info[0],
1790 nullable,
1791 CDC_MIN_CHUNK_SIZE,
1792 CDC_MAX_CHUNK_SIZE,
1793 enable_dictionary,
1794 );
1795 assert_cdc_chunk_sizes(
1796 &modified.column(0).clone(),
1797 &mod_info[0],
1798 nullable,
1799 CDC_MIN_CHUNK_SIZE,
1800 CDC_MAX_CHUNK_SIZE,
1801 enable_dictionary,
1802 );
1803
1804 assert!(
1805 mod_info[0].page_lengths.len() >= base_info[0].page_lengths.len(),
1806 "Modified should have same or more pages"
1807 );
1808
1809 assert_page_length_differences(
1810 &base_info[0],
1811 &mod_info[0],
1812 0,
1813 1,
1814 0,
1815 edit_length as i64,
1816 );
1817 }
1818 }
1819
1820 #[test]
1821 fn append() {
1822 let (dtype, nullable, part_length, edit_length) = config();
1823 let schema = make_schema(&dtype, nullable);
1824
1825 let part1 = generate_table(&schema, part_length, 0);
1826 let part2 = generate_table(&schema, edit_length, 1);
1827 let part3 = generate_table(&schema, part_length, part_length as u64);
1828 let part4 = generate_table(&schema, edit_length, 2);
1829
1830 let base = concat_batches([&part1, &part2, &part3]);
1831 let modified = concat_batches([&part1, &part2, &part3, &part4]);
1832
1833 for enable_dictionary in [false, true] {
1834 let base_data = write_with_cdc_options(
1835 &[&base],
1836 CDC_MIN_CHUNK_SIZE,
1837 CDC_MAX_CHUNK_SIZE,
1838 Some(CDC_ROW_GROUP_LENGTH),
1839 enable_dictionary,
1840 );
1841 let mod_data = write_with_cdc_options(
1842 &[&modified],
1843 CDC_MIN_CHUNK_SIZE,
1844 CDC_MAX_CHUNK_SIZE,
1845 Some(CDC_ROW_GROUP_LENGTH),
1846 enable_dictionary,
1847 );
1848
1849 let base_info = get_column_info(&base_data, 0);
1850 let mod_info = get_column_info(&mod_data, 0);
1851 assert_eq!(base_info.len(), 1);
1852 assert_eq!(mod_info.len(), 1);
1853
1854 assert_cdc_chunk_sizes(
1855 &base.column(0).clone(),
1856 &base_info[0],
1857 nullable,
1858 CDC_MIN_CHUNK_SIZE,
1859 CDC_MAX_CHUNK_SIZE,
1860 enable_dictionary,
1861 );
1862 assert_cdc_chunk_sizes(
1863 &modified.column(0).clone(),
1864 &mod_info[0],
1865 nullable,
1866 CDC_MIN_CHUNK_SIZE,
1867 CDC_MAX_CHUNK_SIZE,
1868 enable_dictionary,
1869 );
1870
1871 let bp = &base_info[0].page_lengths;
1872 let mp = &mod_info[0].page_lengths;
1873 assert!(mp.len() >= bp.len());
1874 for i in 0..bp.len() - 1 {
1875 assert_eq!(bp[i], mp[i], "Page {i} should be identical");
1876 }
1877 assert!(mp[bp.len() - 1] >= bp[bp.len() - 1]);
1878 }
1879 }
1880
1881 #[test]
1882 fn empty_table() {
1883 let (dtype, nullable, _, _) = config();
1884 let schema = make_schema(&dtype, nullable);
1885
1886 let empty = RecordBatch::new_empty(schema);
1887 for enable_dictionary in [false, true] {
1888 let data = write_with_cdc_options(
1889 &[&empty],
1890 CDC_MIN_CHUNK_SIZE,
1891 CDC_MAX_CHUNK_SIZE,
1892 Some(CDC_ROW_GROUP_LENGTH),
1893 enable_dictionary,
1894 );
1895 let info = get_column_info(&data, 0);
1896 if !info.is_empty() {
1898 assert!(info[0].page_lengths.is_empty());
1899 }
1900 }
1901 }
1902
1903 #[test]
1904 fn array_offsets() {
1905 let (dtype, nullable, part_length, edit_length) = config();
1906 let schema = make_schema(&dtype, nullable);
1907
1908 let table = concat_batches([
1909 &generate_table(&schema, part_length, 0),
1910 &generate_table(&schema, edit_length, 1),
1911 &generate_table(&schema, part_length, part_length as u64),
1912 ]);
1913
1914 for offset in [0usize, 512, 1024] {
1915 if offset >= table.num_rows() {
1916 continue;
1917 }
1918 let sliced = table.slice(offset, table.num_rows() - offset);
1919 let data = write_with_cdc_options(
1920 &[&sliced],
1921 CDC_MIN_CHUNK_SIZE,
1922 CDC_MAX_CHUNK_SIZE,
1923 Some(CDC_ROW_GROUP_LENGTH),
1924 true,
1925 );
1926 let info = get_column_info(&data, 0);
1927 assert_eq!(info.len(), 1);
1928
1929 assert_cdc_chunk_sizes(
1931 &sliced.column(0).clone(),
1932 &info[0],
1933 nullable,
1934 CDC_MIN_CHUNK_SIZE,
1935 CDC_MAX_CHUNK_SIZE,
1936 true,
1937 );
1938 }
1939 }
1940 }
1941 };
1942 }
1943
1944 cdc_single_rg_tests!(cdc_bool_non_null, DataType::Boolean, false);
1946 cdc_single_rg_tests!(cdc_i32_non_null, DataType::Int32, false);
1947 cdc_single_rg_tests!(cdc_i64_nullable, DataType::Int64, true);
1948 cdc_single_rg_tests!(cdc_f64_nullable, DataType::Float64, true);
1949 cdc_single_rg_tests!(cdc_utf8_non_null, DataType::Utf8, false);
1950 cdc_single_rg_tests!(cdc_binary_nullable, DataType::Binary, true);
1951 cdc_single_rg_tests!(cdc_fsb16_nullable, DataType::FixedSizeBinary(16), true);
1952 cdc_single_rg_tests!(cdc_date32_non_null, DataType::Date32, false);
1953 cdc_single_rg_tests!(
1954 cdc_timestamp_nullable,
1955 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1956 true
1957 );
1958
1959 mod cdc_multiple_row_groups {
1962 use super::*;
1963
1964 const PART_LENGTH: usize = 128 * 1024;
1965 const EDIT_LENGTH: usize = 128;
1966 const ROW_GROUP_LENGTH: usize = 64 * 1024;
1967
1968 fn schema() -> Arc<Schema> {
1969 Arc::new(Schema::new(vec![
1970 Field::new("int32", DataType::Int32, true),
1971 Field::new("float64", DataType::Float64, true),
1972 Field::new("bool", DataType::Boolean, false),
1973 ]))
1974 }
1975
1976 #[test]
1977 fn insert_once() {
1978 let s = schema();
1979 let part1 = generate_table(&s, PART_LENGTH, 0);
1980 let part2 = generate_table(&s, PART_LENGTH, 2);
1981 let part3 = generate_table(&s, PART_LENGTH, 4);
1982 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1983 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1984
1985 let base = concat_batches([&part1, &edit1, &part2, &part3]);
1986 let modified = concat_batches([&part1, &edit1, &edit2, &part2, &part3]);
1987 assert_eq!(modified.num_rows(), base.num_rows() + EDIT_LENGTH);
1988
1989 let base_data = write_with_cdc_options(
1990 &[&base],
1991 CDC_MIN_CHUNK_SIZE,
1992 CDC_MAX_CHUNK_SIZE,
1993 Some(ROW_GROUP_LENGTH),
1994 false,
1995 );
1996 let mod_data = write_with_cdc_options(
1997 &[&modified],
1998 CDC_MIN_CHUNK_SIZE,
1999 CDC_MAX_CHUNK_SIZE,
2000 Some(ROW_GROUP_LENGTH),
2001 false,
2002 );
2003
2004 for col in 0..s.fields().len() {
2005 let base_info = get_column_info(&base_data, col);
2006 let mod_info = get_column_info(&mod_data, col);
2007
2008 assert_eq!(base_info.len(), 7, "expected 7 row groups for col {col}");
2009 assert_eq!(mod_info.len(), 7);
2010
2011 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2013 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2014
2015 for i in 2..mod_info.len() - 1 {
2017 assert_page_length_differences(
2018 &base_info[i],
2019 &mod_info[i],
2020 0,
2021 1,
2022 1,
2023 EDIT_LENGTH as i64,
2024 );
2025 }
2026 assert_page_length_differences(
2028 base_info.last().unwrap(),
2029 mod_info.last().unwrap(),
2030 0,
2031 1,
2032 0,
2033 EDIT_LENGTH as i64,
2034 );
2035 }
2036 }
2037
2038 #[test]
2039 fn delete_once() {
2040 let s = schema();
2041 let part1 = generate_table(&s, PART_LENGTH, 0);
2042 let part2 = generate_table(&s, PART_LENGTH, 2);
2043 let part3 = generate_table(&s, PART_LENGTH, 4);
2044 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2045 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2046
2047 let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2048 let modified = concat_batches([&part1, &part2, &part3, &edit2]);
2049
2050 let base_data = write_with_cdc_options(
2051 &[&base],
2052 CDC_MIN_CHUNK_SIZE,
2053 CDC_MAX_CHUNK_SIZE,
2054 Some(ROW_GROUP_LENGTH),
2055 false,
2056 );
2057 let mod_data = write_with_cdc_options(
2058 &[&modified],
2059 CDC_MIN_CHUNK_SIZE,
2060 CDC_MAX_CHUNK_SIZE,
2061 Some(ROW_GROUP_LENGTH),
2062 false,
2063 );
2064
2065 for col in 0..s.fields().len() {
2066 let base_info = get_column_info(&base_data, col);
2067 let mod_info = get_column_info(&mod_data, col);
2068
2069 assert_eq!(base_info.len(), 7);
2070 assert_eq!(mod_info.len(), 7);
2071
2072 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2073 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2074
2075 for i in 2..mod_info.len() - 1 {
2076 assert_page_length_differences(
2077 &base_info[i],
2078 &mod_info[i],
2079 0,
2080 1,
2081 1,
2082 EDIT_LENGTH as i64,
2083 );
2084 }
2085 assert_page_length_differences(
2086 base_info.last().unwrap(),
2087 mod_info.last().unwrap(),
2088 0,
2089 0,
2090 1,
2091 EDIT_LENGTH as i64,
2092 );
2093 }
2094 }
2095
2096 #[test]
2097 fn update_once() {
2098 let s = schema();
2099 let part1 = generate_table(&s, PART_LENGTH, 0);
2100 let part2 = generate_table(&s, PART_LENGTH, 2);
2101 let part3 = generate_table(&s, PART_LENGTH, 4);
2102 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2103 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2104 let edit3 = generate_table(&s, EDIT_LENGTH, 5);
2105
2106 let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2107 let modified = concat_batches([&part1, &edit3, &part2, &part3, &edit2]);
2108
2109 let base_data = write_with_cdc_options(
2110 &[&base],
2111 CDC_MIN_CHUNK_SIZE,
2112 CDC_MAX_CHUNK_SIZE,
2113 Some(ROW_GROUP_LENGTH),
2114 false,
2115 );
2116 let mod_data = write_with_cdc_options(
2117 &[&modified],
2118 CDC_MIN_CHUNK_SIZE,
2119 CDC_MAX_CHUNK_SIZE,
2120 Some(ROW_GROUP_LENGTH),
2121 false,
2122 );
2123
2124 for col in 0..s.fields().len() {
2125 let nullable = s.field(col).is_nullable();
2126 let base_info = get_column_info(&base_data, col);
2127 let mod_info = get_column_info(&mod_data, col);
2128
2129 assert_eq!(base_info.len(), 7);
2130 assert_eq!(mod_info.len(), 7);
2131
2132 assert_cdc_chunk_sizes(
2134 &base.column(col).slice(0, ROW_GROUP_LENGTH),
2135 &base_info[0],
2136 nullable,
2137 CDC_MIN_CHUNK_SIZE,
2138 CDC_MAX_CHUNK_SIZE,
2139 false,
2140 );
2141
2142 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2143 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2144
2145 assert_page_length_differences_update(&base_info[2], &mod_info[2], 1);
2147
2148 for i in 3..mod_info.len() {
2150 assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2151 }
2152 }
2153 }
2154
2155 #[test]
2156 fn append() {
2157 let s = schema();
2158 let part1 = generate_table(&s, PART_LENGTH, 0);
2159 let part2 = generate_table(&s, PART_LENGTH, 2);
2160 let part3 = generate_table(&s, PART_LENGTH, 4);
2161 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2162 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2163
2164 let base = concat_batches([&part1, &edit1, &part2, &part3]);
2165 let modified = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2166
2167 let base_data = write_with_cdc_options(
2168 &[&base],
2169 CDC_MIN_CHUNK_SIZE,
2170 CDC_MAX_CHUNK_SIZE,
2171 Some(ROW_GROUP_LENGTH),
2172 false,
2173 );
2174 let mod_data = write_with_cdc_options(
2175 &[&modified],
2176 CDC_MIN_CHUNK_SIZE,
2177 CDC_MAX_CHUNK_SIZE,
2178 Some(ROW_GROUP_LENGTH),
2179 false,
2180 );
2181
2182 for col in 0..s.fields().len() {
2183 let nullable = s.field(col).is_nullable();
2184 let base_info = get_column_info(&base_data, col);
2185 let mod_info = get_column_info(&mod_data, col);
2186
2187 assert_eq!(base_info.len(), 7);
2188 assert_eq!(mod_info.len(), 7);
2189
2190 assert_cdc_chunk_sizes(
2192 &base.column(col).slice(0, ROW_GROUP_LENGTH),
2193 &base_info[0],
2194 nullable,
2195 CDC_MIN_CHUNK_SIZE,
2196 CDC_MAX_CHUNK_SIZE,
2197 false,
2198 );
2199
2200 for i in 0..base_info.len() - 1 {
2202 assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2203 }
2204
2205 let bp = &base_info.last().unwrap().page_lengths;
2207 let mp = &mod_info.last().unwrap().page_lengths;
2208 assert!(mp.len() >= bp.len());
2209 for i in 0..bp.len() - 1 {
2210 assert_eq!(bp[i], mp[i]);
2211 }
2212 }
2213 }
2214 }
2215
2216 #[test]
2219 fn test_cdc_array_offsets_direct() {
2220 use crate::basic::Type as PhysicalType;
2221 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
2222
2223 let options = CdcOptions {
2224 min_chunk_size: CDC_MIN_CHUNK_SIZE,
2225 max_chunk_size: CDC_MAX_CHUNK_SIZE,
2226 norm_level: 0,
2227 };
2228 let desc = {
2229 let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
2230 .build()
2231 .unwrap();
2232 ColumnDescriptor::new(Arc::new(tp), 0, 0, ColumnPath::new(vec![]))
2233 };
2234
2235 let bpr = bytes_per_record(&DataType::Int32, false);
2236 let n = CDC_PART_SIZE / bpr;
2237 let offset = 10usize;
2238
2239 let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as i32).collect();
2240 let mut chunker = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2241 let chunks = chunker
2242 .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent, &array)
2243 .unwrap();
2244
2245 let sliced = array.slice(offset, n - offset);
2246 let mut chunker2 = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2247 let chunks2 = chunker2
2248 .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent, &sliced)
2249 .unwrap();
2250
2251 let values: Vec<usize> = chunks.iter().map(|c| c.num_values).collect();
2252 let values2: Vec<usize> = chunks2.iter().map(|c| c.num_values).collect();
2253
2254 assert!(values.len() > 1, "expected multiple chunks, got {values:?}");
2255 assert_eq!(values.len(), values2.len(), "chunk count must match");
2256
2257 assert_eq!(
2258 values[0] - values2[0],
2259 offset,
2260 "offsetted first chunk should be {offset} values shorter"
2261 );
2262 assert_eq!(
2263 &values[1..],
2264 &values2[1..],
2265 "all chunks after the first must be identical"
2266 );
2267 }
2268
2269 #[test]
2274 fn test_cdc_list_roundtrip() {
2275 let schema = Arc::new(Schema::new(vec![
2276 Field::new(
2277 "_1",
2278 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2279 true,
2280 ),
2281 Field::new(
2282 "_2",
2283 DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2284 true,
2285 ),
2286 Field::new(
2287 "_3",
2288 DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, true))),
2289 true,
2290 ),
2291 ]));
2292 let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2293 write_with_cdc_options(
2294 &[&batch],
2295 CDC_MIN_CHUNK_SIZE,
2296 CDC_MAX_CHUNK_SIZE,
2297 None,
2298 true,
2299 );
2300 }
2301
2302 #[test]
2304 fn test_cdc_deeply_nested_roundtrip() {
2305 let inner_field = Field::new_list_field(DataType::Int32, true);
2306 let inner_type = DataType::List(Arc::new(inner_field));
2307 let outer_field = Field::new_list_field(inner_type.clone(), true);
2308 let list_list_type = DataType::List(Arc::new(outer_field));
2309
2310 let struct_inner_field = Field::new_list_field(DataType::Int32, true);
2311 let struct_inner_type = DataType::List(Arc::new(struct_inner_field));
2312 let struct_fields = Fields::from(vec![Field::new("a", struct_inner_type, true)]);
2313 let struct_type = DataType::Struct(struct_fields);
2314 let struct_list_field = Field::new_list_field(struct_type, true);
2315 let list_struct_type = DataType::List(Arc::new(struct_list_field));
2316
2317 let schema = Arc::new(Schema::new(vec![
2318 Field::new("list_list", list_list_type, true),
2319 Field::new("list_struct_list", list_struct_type, true),
2320 ]));
2321 let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2322 write_with_cdc_options(
2323 &[&batch],
2324 CDC_MIN_CHUNK_SIZE,
2325 CDC_MAX_CHUNK_SIZE,
2326 None,
2327 true,
2328 );
2329 }
2330
2331 #[test]
2337 fn test_cdc_list_non_empty_null_segments() {
2338 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2345 let offsets = Buffer::from_iter([0_i32, 2, 5, 7, 9, 10]);
2346 let null_bitmap = Buffer::from([0b00010101]); let list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
2349 let list_data = unsafe {
2350 ArrayData::new_unchecked(
2351 list_type.clone(),
2352 5,
2353 None,
2354 Some(null_bitmap),
2355 0,
2356 vec![offsets],
2357 vec![values.to_data()],
2358 )
2359 };
2360 let list_array = arrow_array::make_array(list_data);
2361
2362 let schema = Arc::new(Schema::new(vec![Field::new("col", list_type, true)]));
2363 let batch = RecordBatch::try_new(schema, vec![list_array]).unwrap();
2364
2365 let buf = write_with_cdc_options(
2366 &[&batch],
2367 CDC_MIN_CHUNK_SIZE,
2368 CDC_MAX_CHUNK_SIZE,
2369 None,
2370 true,
2371 );
2372 let read = concat_batches(read_batches(&buf));
2373 let read_list = read.column(0).as_list::<i32>();
2374 assert_eq!(read_list.len(), 5);
2375 assert!(read_list.is_valid(0));
2376 assert!(read_list.is_null(1));
2377 assert!(read_list.is_valid(2));
2378 assert!(read_list.is_null(3));
2379 assert!(read_list.is_valid(4));
2380
2381 let get_vals = |i: usize| -> Vec<i32> {
2382 read_list
2383 .value(i)
2384 .as_primitive::<arrow_array::types::Int32Type>()
2385 .values()
2386 .iter()
2387 .copied()
2388 .collect()
2389 };
2390 assert_eq!(get_vals(0), vec![1, 2]);
2391 assert_eq!(get_vals(2), vec![6, 7]);
2392 assert_eq!(get_vals(4), vec![10]);
2393 }
2394}