1use crate::errors::{ParquetError, Result};
19use crate::file::properties::CdcOptions;
20use crate::schema::types::ColumnDescriptor;
21
22use super::CdcChunk;
23use super::cdc_generated::{GEARHASH_TABLE, NUM_GEARHASH_TABLES};
24
25#[derive(Debug)]
86pub(crate) struct ContentDefinedChunker {
87 max_def_level: i16,
89 max_rep_level: i16,
91 repeated_ancestor_def_level: i16,
93
94 min_chunk_size: i64,
99 max_chunk_size: i64,
107 rolling_hash_mask: u64,
109
110 rolling_hash: u64,
112 has_matched: bool,
114 nth_run: usize,
116 chunk_size: i64,
118}
119
120impl ContentDefinedChunker {
121 pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self> {
122 let rolling_hash_mask = Self::calculate_mask(
123 options.min_chunk_size as i64,
124 options.max_chunk_size as i64,
125 options.norm_level,
126 )?;
127 Ok(Self {
128 max_def_level: desc.max_def_level(),
129 max_rep_level: desc.max_rep_level(),
130 repeated_ancestor_def_level: desc.repeated_ancestor_def_level(),
131 min_chunk_size: options.min_chunk_size as i64,
132 max_chunk_size: options.max_chunk_size as i64,
133 rolling_hash_mask,
134 rolling_hash: 0,
135 has_matched: false,
136 nth_run: 0,
137 chunk_size: 0,
138 })
139 }
140
141 fn calculate_mask(min_chunk_size: i64, max_chunk_size: i64, norm_level: i32) -> Result<u64> {
146 if min_chunk_size < 0 {
147 return Err(ParquetError::General(
148 "min_chunk_size must be non-negative".to_string(),
149 ));
150 }
151 if max_chunk_size <= min_chunk_size {
152 return Err(ParquetError::General(
153 "max_chunk_size must be greater than min_chunk_size".to_string(),
154 ));
155 }
156
157 let avg_chunk_size = (min_chunk_size + max_chunk_size) / 2;
158 let target_size = (avg_chunk_size - min_chunk_size) / NUM_GEARHASH_TABLES as i64;
161
162 let mask_bits = if target_size > 0 {
164 63 - target_size.leading_zeros() as i32
165 } else {
166 0
167 };
168
169 let effective_bits = mask_bits - norm_level;
170
171 if !(1..=63).contains(&effective_bits) {
172 return Err(ParquetError::General(format!(
173 "The number of bits in the CDC mask must be between 1 and 63, got {effective_bits}"
174 )));
175 }
176
177 Ok(u64::MAX << (64 - effective_bits))
179 }
180
181 #[inline]
188 fn roll(&mut self, bytes: &[u8]) {
189 self.chunk_size += bytes.len() as i64;
190 if self.chunk_size < self.min_chunk_size {
191 return;
192 }
193 for &b in bytes {
194 self.rolling_hash = self
195 .rolling_hash
196 .wrapping_shl(1)
197 .wrapping_add(GEARHASH_TABLE[self.nth_run][b as usize]);
198 self.has_matched =
199 self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
200 }
201 }
202
203 #[inline(always)]
208 fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N]) {
209 self.chunk_size += N as i64;
210 if self.chunk_size < self.min_chunk_size {
211 return;
212 }
213 for j in 0..N {
214 self.rolling_hash = self
215 .rolling_hash
216 .wrapping_shl(1)
217 .wrapping_add(GEARHASH_TABLE[self.nth_run][bytes[j] as usize]);
218 self.has_matched =
219 self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
220 }
221 }
222
223 #[inline]
225 fn roll_level(&mut self, level: i16) {
226 self.roll_fixed(&level.to_le_bytes());
227 }
228
229 #[inline]
247 fn need_new_chunk(&mut self) -> bool {
248 if self.has_matched {
249 self.has_matched = false;
250 self.nth_run += 1;
251 if self.nth_run >= NUM_GEARHASH_TABLES {
252 self.nth_run = 0;
253 self.chunk_size = 0;
254 return true;
255 }
256 }
257 if self.chunk_size >= self.max_chunk_size {
258 self.chunk_size = 0;
259 return true;
260 }
261 false
262 }
263
264 fn calculate<F>(
277 &mut self,
278 def_levels: Option<&[i16]>,
279 rep_levels: Option<&[i16]>,
280 num_levels: usize,
281 mut roll_value: F,
282 ) -> Vec<CdcChunk>
283 where
284 F: FnMut(&mut Self, usize),
285 {
286 let has_def_levels = self.max_def_level > 0;
287 let has_rep_levels = self.max_rep_level > 0;
288
289 let mut chunks = Vec::new();
290 let mut prev_offset: usize = 0;
291 let mut prev_value_offset: usize = 0;
292 let mut value_offset: usize = 0;
293
294 if !has_rep_levels && !has_def_levels {
295 for offset in 0..num_levels {
303 roll_value(self, offset);
304 if self.need_new_chunk() {
305 chunks.push(CdcChunk {
306 level_offset: prev_offset,
307 num_levels: offset - prev_offset,
308 value_offset: prev_offset,
309 num_values: offset - prev_offset,
310 });
311 prev_offset = offset;
312 }
313 }
314 prev_value_offset = prev_offset;
315 value_offset = num_levels;
316 } else if !has_rep_levels {
317 let def_levels = def_levels.expect("def_levels required when max_def_level > 0");
326 #[allow(clippy::needless_range_loop)]
327 for offset in 0..num_levels {
328 let def_level = def_levels[offset];
329 self.roll_level(def_level);
330 if def_level == self.max_def_level {
331 roll_value(self, offset);
335 }
336 if self.need_new_chunk() {
339 chunks.push(CdcChunk {
340 level_offset: prev_offset,
341 num_levels: offset - prev_offset,
342 value_offset: prev_value_offset,
343 num_values: value_offset - prev_value_offset,
344 });
345 prev_offset = offset;
346 prev_value_offset = value_offset;
347 }
348 if def_level == self.max_def_level {
349 value_offset += 1;
350 }
351 }
352 } else {
353 let def_levels = def_levels.expect("def_levels required for nested data");
384 let rep_levels = rep_levels.expect("rep_levels required for nested data");
385 let mut leaf_offset: usize = 0;
386
387 for offset in 0..num_levels {
388 let def_level = def_levels[offset];
389 let rep_level = rep_levels[offset];
390
391 self.roll_level(def_level);
392 self.roll_level(rep_level);
393 if def_level == self.max_def_level {
394 roll_value(self, leaf_offset);
395 }
396
397 if rep_level == 0 && self.need_new_chunk() {
400 let levels_to_write = offset - prev_offset;
401 if levels_to_write > 0 {
402 chunks.push(CdcChunk {
403 level_offset: prev_offset,
404 num_levels: levels_to_write,
405 value_offset: prev_value_offset,
406 num_values: value_offset - prev_value_offset,
407 });
408 prev_offset = offset;
409 prev_value_offset = value_offset;
410 }
411 }
412 if def_level == self.max_def_level {
413 value_offset += 1;
414 }
415 if def_level >= self.repeated_ancestor_def_level {
416 leaf_offset += 1;
417 }
418 }
419 }
420
421 if prev_offset < num_levels {
423 chunks.push(CdcChunk {
424 level_offset: prev_offset,
425 num_levels: num_levels - prev_offset,
426 value_offset: prev_value_offset,
427 num_values: value_offset - prev_value_offset,
428 });
429 }
430
431 #[cfg(debug_assertions)]
432 self.validate_chunks(&chunks, num_levels, value_offset);
433
434 chunks
435 }
436
437 #[cfg(feature = "arrow")]
440 pub(crate) fn get_arrow_chunks(
441 &mut self,
442 def_levels: Option<&[i16]>,
443 rep_levels: Option<&[i16]>,
444 array: &dyn arrow_array::Array,
445 ) -> Result<Vec<CdcChunk>> {
446 use arrow_array::cast::AsArray;
447 use arrow_schema::DataType;
448
449 let num_levels = match def_levels {
450 Some(def_levels) => def_levels.len(),
451 None => array.len(),
452 };
453
454 macro_rules! fixed_width {
455 ($N:literal) => {{
456 let data = array.to_data();
457 let buffer = data.buffers()[0].as_slice();
458 let values = &buffer[data.offset() * $N..];
459 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
460 let offset = i * $N;
461 let slice = &values[offset..offset + $N];
462 c.roll_fixed::<$N>(slice.try_into().unwrap());
463 })
464 }};
465 }
466
467 macro_rules! binary_like {
468 ($a:expr) => {{
469 let a = $a;
470 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
471 c.roll(a.value(i).as_ref());
472 })
473 }};
474 }
475
476 let dtype = array.data_type();
477 let chunks = match dtype {
478 DataType::Null => self.calculate(def_levels, rep_levels, num_levels, |_, _| {}),
479 DataType::Boolean => {
480 let a = array.as_boolean();
481 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
482 c.roll_fixed(&[a.value(i) as u8]);
483 })
484 }
485 DataType::Int8 | DataType::UInt8 => fixed_width!(1),
486 DataType::Int16 | DataType::UInt16 | DataType::Float16 => fixed_width!(2),
487 DataType::Int32
488 | DataType::UInt32
489 | DataType::Float32
490 | DataType::Date32
491 | DataType::Time32(_)
492 | DataType::Interval(arrow_schema::IntervalUnit::YearMonth)
493 | DataType::Decimal32(_, _) => fixed_width!(4),
494 DataType::Int64
495 | DataType::UInt64
496 | DataType::Float64
497 | DataType::Date64
498 | DataType::Time64(_)
499 | DataType::Timestamp(_, _)
500 | DataType::Duration(_)
501 | DataType::Interval(arrow_schema::IntervalUnit::DayTime)
502 | DataType::Decimal64(_, _) => fixed_width!(8),
503 DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
504 | DataType::Decimal128(_, _) => fixed_width!(16),
505 DataType::Decimal256(_, _) => fixed_width!(32),
506 DataType::FixedSizeBinary(_) => binary_like!(array.as_fixed_size_binary()),
507 DataType::Binary => binary_like!(array.as_binary::<i32>()),
508 DataType::LargeBinary => binary_like!(array.as_binary::<i64>()),
509 DataType::Utf8 => binary_like!(array.as_string::<i32>()),
510 DataType::LargeUtf8 => binary_like!(array.as_string::<i64>()),
511 DataType::BinaryView => binary_like!(array.as_binary_view()),
512 DataType::Utf8View => binary_like!(array.as_string_view()),
513 DataType::Dictionary(_, _) => {
514 let dict = array.as_any_dictionary();
515 self.get_arrow_chunks(def_levels, rep_levels, dict.keys())?
516 }
517 _ => {
518 return Err(ParquetError::General(format!(
519 "content-defined chunking is not supported for data type {dtype:?}",
520 )));
521 }
522 };
523 Ok(chunks)
524 }
525
526 #[cfg(debug_assertions)]
527 fn validate_chunks(&self, chunks: &[CdcChunk], num_levels: usize, total_values: usize) {
528 assert!(!chunks.is_empty(), "chunks must be non-empty");
529
530 let first = &chunks[0];
531 assert_eq!(first.level_offset, 0, "first chunk must start at level 0");
532 assert_eq!(first.value_offset, 0, "first chunk must start at value 0");
533
534 let mut sum_levels = first.num_levels;
535 let mut sum_values = first.num_values;
536 for i in 1..chunks.len() {
537 let chunk = &chunks[i];
538 let prev = &chunks[i - 1];
539 assert!(chunk.num_levels > 0, "chunk must have levels");
540 assert_eq!(
541 chunk.level_offset,
542 prev.level_offset + prev.num_levels,
543 "level offsets must be contiguous"
544 );
545 assert_eq!(
546 chunk.value_offset,
547 prev.value_offset + prev.num_values,
548 "value offsets must be contiguous"
549 );
550 sum_levels += chunk.num_levels;
551 sum_values += chunk.num_values;
552 }
553 assert_eq!(sum_levels, num_levels, "chunks must cover all levels");
554 assert_eq!(sum_values, total_values, "chunks must cover all values");
555
556 let last = chunks.last().unwrap();
557 assert_eq!(
558 last.level_offset + last.num_levels,
559 num_levels,
560 "last chunk must end at num_levels"
561 );
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::basic::Type as PhysicalType;
569 use crate::schema::types::{ColumnPath, Type};
570 use std::sync::Arc;
571
572 fn make_desc(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
573 let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
574 .build()
575 .unwrap();
576 ColumnDescriptor::new(
577 Arc::new(tp),
578 max_def_level,
579 max_rep_level,
580 ColumnPath::new(vec![]),
581 )
582 }
583
584 #[test]
585 fn test_calculate_mask_defaults() {
586 let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 0).unwrap();
587 let expected = u64::MAX << (64 - 15);
590 assert_eq!(mask, expected);
591 }
592
593 #[test]
594 fn test_calculate_mask_with_norm_level() {
595 let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 1).unwrap();
596 let expected = u64::MAX << (64 - 14);
597 assert_eq!(mask, expected);
598 }
599
600 #[test]
601 fn test_calculate_mask_invalid() {
602 assert!(ContentDefinedChunker::calculate_mask(-1, 100, 0).is_err());
603 assert!(ContentDefinedChunker::calculate_mask(100, 50, 0).is_err());
604 assert!(ContentDefinedChunker::calculate_mask(100, 100, 0).is_err());
605 }
606
607 #[test]
608 fn test_non_nested_non_null_single_chunk() {
609 let options = CdcOptions {
610 min_chunk_size: 8,
611 max_chunk_size: 1024,
612 norm_level: 0,
613 };
614 let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
615
616 let num_values = 4;
618 let chunks = chunker.calculate(None, None, num_values, |c, i| {
619 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
620 });
621 assert_eq!(chunks.len(), 1);
622 assert_eq!(chunks[0].level_offset, 0);
623 assert_eq!(chunks[0].value_offset, 0);
624 assert_eq!(chunks[0].num_levels, 4);
625 }
626
627 #[test]
628 fn test_max_chunk_size_forces_boundary() {
629 let options = CdcOptions {
630 min_chunk_size: 256,
631 max_chunk_size: 1024,
632 norm_level: 0,
633 };
634 let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
635
636 let num_values = 2000;
639 let chunks = chunker.calculate(None, None, num_values, |c, i| {
640 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
641 });
642
643 assert!(chunks.len() > 1);
645
646 let mut total_levels = 0;
648 for (i, chunk) in chunks.iter().enumerate() {
649 assert_eq!(chunk.level_offset, total_levels);
650 if i < chunks.len() - 1 {
651 assert!(chunk.num_levels > 0);
652 }
653 total_levels += chunk.num_levels;
654 }
655 assert_eq!(total_levels, num_values);
656 }
657
658 #[test]
659 fn test_deterministic_chunks() {
660 let options = CdcOptions {
661 min_chunk_size: 4,
662 max_chunk_size: 64,
663 norm_level: 0,
664 };
665
666 let roll = |c: &mut ContentDefinedChunker, i: usize| {
667 c.roll_fixed::<8>(&(i as i64).to_le_bytes());
668 };
669
670 let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
671 let chunks1 = chunker1.calculate(None, None, 200, roll);
672
673 let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
674 let chunks2 = chunker2.calculate(None, None, 200, roll);
675
676 assert_eq!(chunks1.len(), chunks2.len());
677 for (a, b) in chunks1.iter().zip(chunks2.iter()) {
678 assert_eq!(a.level_offset, b.level_offset);
679 assert_eq!(a.num_levels, b.num_levels);
680 assert_eq!(a.value_offset, b.value_offset);
681 assert_eq!(a.num_values, b.num_values);
682 }
683 }
684
685 #[test]
686 fn test_nullable_non_nested() {
687 let options = CdcOptions {
688 min_chunk_size: 4,
689 max_chunk_size: 64,
690 norm_level: 0,
691 };
692 let mut chunker = ContentDefinedChunker::new(&make_desc(1, 0), &options).unwrap();
693
694 let num_levels = 20;
695 let def_levels: Vec<i16> = (0..num_levels)
698 .map(|i| if i % 3 == 0 { 0 } else { 1 })
699 .collect();
700 let expected_non_null: usize = def_levels.iter().filter(|&&d| d == 1).count();
701
702 let chunks = chunker.calculate(Some(&def_levels), None, num_levels, |c, i| {
703 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
704 });
705
706 assert!(!chunks.is_empty());
707 let total_levels: usize = chunks.iter().map(|c| c.num_levels).sum();
708 let total_values: usize = chunks.iter().map(|c| c.num_values).sum();
709 assert_eq!(total_levels, num_levels);
710 assert_eq!(total_values, expected_non_null);
711 assert!(total_values < total_levels);
713 }
714}
715
716#[cfg(all(test, feature = "arrow"))]
719mod arrow_tests {
720 use std::borrow::Borrow;
721 use std::sync::Arc;
722
723 use arrow::util::data_gen::create_random_batch;
724 use arrow_array::cast::AsArray;
725 use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
726 use arrow_buffer::Buffer;
727 use arrow_data::ArrayData;
728 use arrow_schema::{DataType, Field, Fields, Schema};
729
730 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
731 use crate::arrow::arrow_writer::ArrowWriter;
732 use crate::file::properties::{CdcOptions, WriterProperties};
733 use crate::file::reader::{FileReader, SerializedFileReader};
734
735 const CDC_MIN_CHUNK_SIZE: usize = 4 * 1024;
738 const CDC_MAX_CHUNK_SIZE: usize = 16 * 1024;
739 const CDC_PART_SIZE: usize = 128 * 1024;
740 const CDC_EDIT_SIZE: usize = 128;
741 const CDC_ROW_GROUP_LENGTH: usize = 1024 * 1024;
742
743 fn test_hash(seed: u64, index: u64) -> u64 {
747 let mut h = (index.wrapping_add(seed)).wrapping_mul(0xc4ceb9fe1a85ec53u64);
748 h ^= h >> 33;
749 h = h.wrapping_mul(0xff51afd7ed558ccdu64);
750 h ^= h >> 33;
751 h = h.wrapping_mul(0xc4ceb9fe1a85ec53u64);
752 h ^= h >> 33;
753 h
754 }
755
756 fn generate_array(dtype: &DataType, nullable: bool, length: usize, seed: u64) -> ArrayRef {
758 macro_rules! gen_primitive {
759 ($array_type:ty, $cast:expr) => {{
760 if nullable {
761 let arr: $array_type = (0..length)
762 .map(|i| {
763 let val = test_hash(seed, i as u64);
764 if val % 10 == 0 {
765 None
766 } else {
767 Some($cast(val))
768 }
769 })
770 .collect();
771 Arc::new(arr) as ArrayRef
772 } else {
773 let arr: $array_type = (0..length)
774 .map(|i| Some($cast(test_hash(seed, i as u64))))
775 .collect();
776 Arc::new(arr) as ArrayRef
777 }
778 }};
779 }
780
781 match dtype {
782 DataType::Boolean => {
783 if nullable {
784 let arr: BooleanArray = (0..length)
785 .map(|i| {
786 let val = test_hash(seed, i as u64);
787 if val % 10 == 0 {
788 None
789 } else {
790 Some(val % 2 == 0)
791 }
792 })
793 .collect();
794 Arc::new(arr)
795 } else {
796 let arr: BooleanArray = (0..length)
797 .map(|i| Some(test_hash(seed, i as u64) % 2 == 0))
798 .collect();
799 Arc::new(arr)
800 }
801 }
802 DataType::Int32 => gen_primitive!(Int32Array, |v: u64| v as i32),
803 DataType::Int64 => {
804 gen_primitive!(arrow_array::Int64Array, |v: u64| v as i64)
805 }
806 DataType::Float64 => {
807 gen_primitive!(arrow_array::Float64Array, |v: u64| (v % 100000) as f64
808 / 1000.0)
809 }
810 DataType::Utf8 => {
811 let arr: arrow_array::StringArray = if nullable {
812 (0..length)
813 .map(|i| {
814 let val = test_hash(seed, i as u64);
815 if val % 10 == 0 {
816 None
817 } else {
818 Some(format!("str_{val}"))
819 }
820 })
821 .collect()
822 } else {
823 (0..length)
824 .map(|i| Some(format!("str_{}", test_hash(seed, i as u64))))
825 .collect()
826 };
827 Arc::new(arr)
828 }
829 DataType::Binary => {
830 let arr: arrow_array::BinaryArray = if nullable {
831 (0..length)
832 .map(|i| {
833 let val = test_hash(seed, i as u64);
834 if val % 10 == 0 {
835 None
836 } else {
837 Some(format!("bin_{val}").into_bytes())
838 }
839 })
840 .collect()
841 } else {
842 (0..length)
843 .map(|i| Some(format!("bin_{}", test_hash(seed, i as u64)).into_bytes()))
844 .collect()
845 };
846 Arc::new(arr)
847 }
848 DataType::FixedSizeBinary(size) => {
849 let size = *size;
850 let mut builder = arrow_array::builder::FixedSizeBinaryBuilder::new(size);
851 for i in 0..length {
852 let val = test_hash(seed, i as u64);
853 if nullable && val % 10 == 0 {
854 builder.append_null();
855 } else {
856 let s = format!("bin_{val}");
857 let bytes = s.as_bytes();
858 let mut buf = vec![0u8; size as usize];
859 let copy_len = bytes.len().min(size as usize);
860 buf[..copy_len].copy_from_slice(&bytes[..copy_len]);
861 builder.append_value(&buf).unwrap();
862 }
863 }
864 Arc::new(builder.finish())
865 }
866 DataType::Date32 => {
867 gen_primitive!(arrow_array::Date32Array, |v: u64| v as i32)
868 }
869 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
870 gen_primitive!(arrow_array::TimestampNanosecondArray, |v: u64| v as i64)
871 }
872 _ => panic!("Unsupported test data type: {dtype:?}"),
873 }
874 }
875
876 fn generate_table(schema: &Arc<Schema>, length: usize, seed: u64) -> RecordBatch {
878 let arrays: Vec<ArrayRef> = schema
879 .fields()
880 .iter()
881 .enumerate()
882 .map(|(i, field)| {
883 generate_array(
884 field.data_type(),
885 field.is_nullable(),
886 length,
887 seed + i as u64 * 10,
888 )
889 })
890 .collect();
891 RecordBatch::try_new(schema.clone(), arrays).unwrap()
892 }
893
894 fn cdc_byte_width(dtype: &DataType) -> usize {
897 match dtype {
898 DataType::Boolean => 1,
899 DataType::Int8 | DataType::UInt8 => 1,
900 DataType::Int16 | DataType::UInt16 | DataType::Float16 => 2,
901 DataType::Int32
902 | DataType::UInt32
903 | DataType::Float32
904 | DataType::Date32
905 | DataType::Time32(_) => 4,
906 DataType::Int64
907 | DataType::UInt64
908 | DataType::Float64
909 | DataType::Date64
910 | DataType::Time64(_)
911 | DataType::Timestamp(_, _)
912 | DataType::Duration(_) => 8,
913 DataType::Decimal128(_, _) => 16,
914 DataType::Decimal256(_, _) => 32,
915 DataType::FixedSizeBinary(n) => *n as usize,
916 _ => 0, }
918 }
919
920 fn bytes_per_record(dtype: &DataType, nullable: bool) -> usize {
922 let bw = cdc_byte_width(dtype);
923 if bw > 0 {
924 if nullable { bw + 2 } else { bw }
925 } else {
926 16 }
928 }
929
930 fn calculate_cdc_size(array: &dyn Array, nullable: bool) -> i64 {
932 let dtype = array.data_type();
933 let bw = cdc_byte_width(dtype);
934 let result = if bw > 0 {
935 let valid_count = array.len() - array.null_count();
937 (valid_count * bw) as i64
938 } else {
939 match dtype {
941 DataType::Utf8 => {
942 let a = array.as_string::<i32>();
943 (0..a.len())
944 .filter(|&i| a.is_valid(i))
945 .map(|i| a.value(i).len() as i64)
946 .sum()
947 }
948 DataType::Binary => {
949 let a = array.as_binary::<i32>();
950 (0..a.len())
951 .filter(|&i| a.is_valid(i))
952 .map(|i| a.value(i).len() as i64)
953 .sum()
954 }
955 DataType::LargeBinary => {
956 let a = array.as_binary::<i64>();
957 (0..a.len())
958 .filter(|&i| a.is_valid(i))
959 .map(|i| a.value(i).len() as i64)
960 .sum()
961 }
962 _ => panic!("CDC size calculation not implemented for {dtype:?}"),
963 }
964 };
965
966 if nullable {
967 result + array.len() as i64 * 2
969 } else {
970 result
971 }
972 }
973
974 struct ColumnInfo {
976 page_lengths: Vec<i64>,
977 has_dictionary_page: bool,
978 }
979
980 fn get_column_info(data: &[u8], column_index: usize) -> Vec<ColumnInfo> {
982 let reader = SerializedFileReader::new(bytes::Bytes::from(data.to_vec())).unwrap();
983 let metadata = reader.metadata();
984 let mut result = Vec::new();
985 for rg in 0..metadata.num_row_groups() {
986 let rg_reader = reader.get_row_group(rg).unwrap();
987 let col_reader = rg_reader.get_column_page_reader(column_index).unwrap();
988 let mut info = ColumnInfo {
989 page_lengths: Vec::new(),
990 has_dictionary_page: false,
991 };
992 for page in col_reader {
993 let page = page.unwrap();
994 match page.page_type() {
995 crate::basic::PageType::DATA_PAGE | crate::basic::PageType::DATA_PAGE_V2 => {
996 info.page_lengths.push(page.num_values() as i64);
997 }
998 crate::basic::PageType::DICTIONARY_PAGE => {
999 info.has_dictionary_page = true;
1000 }
1001 _ => {}
1002 }
1003 }
1004 result.push(info);
1005 }
1006 result
1007 }
1008
1009 fn assert_cdc_chunk_sizes(
1012 array: &ArrayRef,
1013 info: &ColumnInfo,
1014 nullable: bool,
1015 min_chunk_size: usize,
1016 max_chunk_size: usize,
1017 expect_dictionary_page: bool,
1018 ) {
1019 let expect_dict = match array.data_type() {
1021 DataType::Boolean | DataType::FixedSizeBinary(_) => false,
1022 _ => expect_dictionary_page,
1023 };
1024 assert_eq!(
1025 info.has_dictionary_page,
1026 expect_dict,
1027 "dictionary page mismatch for {:?}",
1028 array.data_type()
1029 );
1030
1031 let page_lengths = &info.page_lengths;
1032 assert!(
1033 page_lengths.len() > 1,
1034 "CDC should produce multiple pages, got {page_lengths:?}"
1035 );
1036
1037 let bw = cdc_byte_width(array.data_type());
1038 if bw > 0
1040 || matches!(
1041 array.data_type(),
1042 DataType::Utf8 | DataType::Binary | DataType::LargeBinary
1043 )
1044 {
1045 let mut offset = 0i64;
1046 for (i, &page_len) in page_lengths.iter().enumerate() {
1047 let slice = array.slice(offset as usize, page_len as usize);
1048 let cdc_size = calculate_cdc_size(slice.as_ref(), nullable);
1049 if i < page_lengths.len() - 1 {
1050 assert!(
1051 cdc_size >= min_chunk_size as i64,
1052 "Page {i}: CDC size {cdc_size} < min {min_chunk_size}, pages={page_lengths:?}"
1053 );
1054 }
1055 assert!(
1056 cdc_size <= max_chunk_size as i64,
1057 "Page {i}: CDC size {cdc_size} > max {max_chunk_size}, pages={page_lengths:?}"
1058 );
1059 offset += page_len;
1060 }
1061 assert_eq!(
1062 offset,
1063 array.len() as i64,
1064 "page lengths must sum to array length"
1065 );
1066 }
1067 }
1068
1069 fn write_with_cdc_options(
1072 batches: &[&RecordBatch],
1073 min_chunk_size: usize,
1074 max_chunk_size: usize,
1075 max_row_group_rows: Option<usize>,
1076 enable_dictionary: bool,
1077 ) -> Vec<u8> {
1078 assert!(!batches.is_empty());
1079 let schema = batches[0].schema();
1080 let mut builder = WriterProperties::builder()
1081 .set_dictionary_enabled(enable_dictionary)
1082 .set_content_defined_chunking(Some(CdcOptions {
1083 min_chunk_size,
1084 max_chunk_size,
1085 norm_level: 0,
1086 }));
1087 if let Some(max_rows) = max_row_group_rows {
1088 builder = builder.set_max_row_group_row_count(Some(max_rows));
1089 }
1090 let props = builder.build();
1091 let mut buf = Vec::new();
1092 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
1093 for batch in batches {
1094 writer.write(batch).unwrap();
1095 }
1096 writer.close().unwrap();
1097
1098 let readback = read_batches(&buf);
1100 let original_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1101 let readback_rows: usize = readback.iter().map(|b| b.num_rows()).sum();
1102 assert_eq!(original_rows, readback_rows, "Roundtrip row count mismatch");
1103 if original_rows > 0 {
1104 let original = concat_batches(batches.iter().copied());
1105 let roundtrip = concat_batches(&readback);
1106 assert_eq!(original, roundtrip, "Roundtrip validation failed");
1107 }
1108
1109 buf
1110 }
1111
1112 fn read_batches(data: &[u8]) -> Vec<RecordBatch> {
1113 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(data.to_vec()))
1114 .unwrap()
1115 .build()
1116 .unwrap();
1117 reader.collect::<std::result::Result<Vec<_>, _>>().unwrap()
1118 }
1119
1120 fn concat_batches(batches: impl IntoIterator<Item = impl Borrow<RecordBatch>>) -> RecordBatch {
1121 let batches: Vec<_> = batches.into_iter().collect();
1122 let schema = batches[0].borrow().schema();
1123 let batches = batches.iter().map(|b| b.borrow());
1124 arrow_select::concat::concat_batches(&schema, batches).unwrap()
1125 }
1126
1127 fn find_differences(first: &[i64], second: &[i64]) -> Vec<(Vec<i64>, Vec<i64>)> {
1130 let n = first.len();
1131 let m = second.len();
1132 let mut dp = vec![vec![0usize; m + 1]; n + 1];
1133 for i in 0..n {
1134 for j in 0..m {
1135 if first[i] == second[j] {
1136 dp[i + 1][j + 1] = dp[i][j] + 1;
1137 } else {
1138 dp[i + 1][j + 1] = dp[i + 1][j].max(dp[i][j + 1]);
1139 }
1140 }
1141 }
1142 let mut common = Vec::new();
1143 let (mut i, mut j) = (n, m);
1144 while i > 0 && j > 0 {
1145 if first[i - 1] == second[j - 1] {
1146 common.push((i - 1, j - 1));
1147 i -= 1;
1148 j -= 1;
1149 } else if dp[i - 1][j] >= dp[i][j - 1] {
1150 i -= 1;
1151 } else {
1152 j -= 1;
1153 }
1154 }
1155 common.reverse();
1156
1157 let mut result = Vec::new();
1158 let (mut last_i, mut last_j) = (0usize, 0usize);
1159 for (ci, cj) in &common {
1160 if *ci > last_i || *cj > last_j {
1161 result.push((first[last_i..*ci].to_vec(), second[last_j..*cj].to_vec()));
1162 }
1163 last_i = ci + 1;
1164 last_j = cj + 1;
1165 }
1166 if last_i < n || last_j < m {
1167 result.push((first[last_i..].to_vec(), second[last_j..].to_vec()));
1168 }
1169
1170 let mut merged: Vec<(Vec<i64>, Vec<i64>)> = Vec::new();
1172 for diff in result {
1173 if let Some(prev) = merged.last_mut() {
1174 if prev.0.is_empty() && diff.1.is_empty() {
1175 prev.0 = diff.0;
1176 continue;
1177 } else if prev.1.is_empty() && diff.0.is_empty() {
1178 prev.1 = diff.1;
1179 continue;
1180 }
1181 }
1182 merged.push(diff);
1183 }
1184 merged
1185 }
1186
1187 fn assert_page_length_differences(
1190 original: &ColumnInfo,
1191 modified: &ColumnInfo,
1192 exact_equal_diffs: usize,
1193 exact_larger_diffs: usize,
1194 exact_smaller_diffs: usize,
1195 edit_length: i64,
1196 ) {
1197 let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1198 let expected = exact_equal_diffs + exact_larger_diffs + exact_smaller_diffs;
1199
1200 if diffs.len() != expected {
1201 eprintln!("Original: {:?}", original.page_lengths);
1202 eprintln!("Modified: {:?}", modified.page_lengths);
1203 for d in &diffs {
1204 eprintln!(" Diff: {:?} vs {:?}", d.0, d.1);
1205 }
1206 }
1207 assert_eq!(
1208 diffs.len(),
1209 expected,
1210 "Expected {expected} diffs, got {}",
1211 diffs.len()
1212 );
1213
1214 let (mut eq, mut larger, mut smaller) = (0usize, 0usize, 0usize);
1215 for (left, right) in &diffs {
1216 let left_sum: i64 = left.iter().sum();
1217 let right_sum: i64 = right.iter().sum();
1218 if left_sum == right_sum {
1219 eq += 1;
1220 } else if left_sum < right_sum {
1221 larger += 1;
1222 assert_eq!(
1223 left_sum + edit_length,
1224 right_sum,
1225 "Larger diff mismatch: {left_sum} + {edit_length} != {right_sum}"
1226 );
1227 } else {
1228 smaller += 1;
1229 assert_eq!(
1230 left_sum,
1231 right_sum + edit_length,
1232 "Smaller diff mismatch: {left_sum} != {right_sum} + {edit_length}"
1233 );
1234 }
1235 }
1236
1237 assert_eq!(eq, exact_equal_diffs, "equal diffs count");
1238 assert_eq!(larger, exact_larger_diffs, "larger diffs count");
1239 assert_eq!(smaller, exact_smaller_diffs, "smaller diffs count");
1240 }
1241
1242 fn assert_page_length_differences_update(
1245 original: &ColumnInfo,
1246 modified: &ColumnInfo,
1247 max_equal_diffs: usize,
1248 ) {
1249 let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1250 assert!(
1251 diffs.len() <= max_equal_diffs,
1252 "Expected at most {max_equal_diffs} diffs, got {}",
1253 diffs.len()
1254 );
1255 for (left, right) in &diffs {
1256 let left_sum: i64 = left.iter().sum();
1257 let right_sum: i64 = right.iter().sum();
1258 assert_eq!(
1259 left_sum, right_sum,
1260 "Update diff should not change total row count"
1261 );
1262 }
1263 }
1264
1265 #[test]
1268 fn test_find_differences_basic() {
1269 let diffs = find_differences(&[1, 2, 3, 4, 5], &[1, 7, 8, 4, 5]);
1270 assert_eq!(diffs.len(), 1);
1271 assert_eq!(diffs[0].0, vec![2, 3]);
1272 assert_eq!(diffs[0].1, vec![7, 8]);
1273 }
1274
1275 #[test]
1276 fn test_find_differences_multiple() {
1277 let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7], &[1, 8, 9, 4, 10, 6, 11]);
1278 assert_eq!(diffs.len(), 3);
1279 assert_eq!(diffs[0].0, vec![2, 3]);
1280 assert_eq!(diffs[0].1, vec![8, 9]);
1281 assert_eq!(diffs[1].0, vec![5]);
1282 assert_eq!(diffs[1].1, vec![10]);
1283 assert_eq!(diffs[2].0, vec![7]);
1284 assert_eq!(diffs[2].1, vec![11]);
1285 }
1286
1287 #[test]
1288 fn test_find_differences_different_lengths() {
1289 let diffs = find_differences(&[1, 2, 3], &[1, 2, 3, 4, 5]);
1290 assert_eq!(diffs.len(), 1);
1291 assert!(diffs[0].0.is_empty());
1292 assert_eq!(diffs[0].1, vec![4, 5]);
1293 }
1294
1295 #[test]
1296 fn test_find_differences_empty() {
1297 let diffs = find_differences(&[], &[]);
1298 assert!(diffs.is_empty());
1299 }
1300
1301 #[test]
1302 fn test_find_differences_changes_at_both_ends() {
1303 let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7, 8, 9], &[0, 0, 2, 3, 4, 5, 7, 7, 8]);
1304 assert_eq!(diffs.len(), 3);
1305 assert_eq!(diffs[0].0, vec![1]);
1306 assert_eq!(diffs[0].1, vec![0, 0]);
1307 assert_eq!(diffs[1].0, vec![6]);
1308 assert_eq!(diffs[1].1, vec![7]);
1309 assert_eq!(diffs[2].0, vec![9]);
1310 assert!(diffs[2].1.is_empty());
1311 }
1312
1313 #[test]
1314 fn test_find_differences_additional() {
1315 let diffs = find_differences(
1316 &[445, 312, 393, 401, 410, 138, 558, 457],
1317 &[445, 312, 393, 393, 410, 138, 558, 457],
1318 );
1319 assert_eq!(diffs.len(), 1);
1320 assert_eq!(diffs[0].0, vec![401]);
1321 assert_eq!(diffs[0].1, vec![393]);
1322 }
1323
1324 macro_rules! cdc_single_rg_tests {
1327 ($mod_name:ident, $dtype:expr, $nullable:expr) => {
1328 mod $mod_name {
1329 use super::*;
1330
1331 fn config() -> (DataType, bool, usize, usize) {
1332 let dtype: DataType = $dtype;
1333 let nullable: bool = $nullable;
1334 let bpr = bytes_per_record(&dtype, nullable);
1335 let part_length = CDC_PART_SIZE / bpr;
1336 let edit_length = CDC_EDIT_SIZE / bpr;
1337 (dtype, nullable, part_length, edit_length)
1338 }
1339
1340 fn make_schema(dtype: &DataType, nullable: bool) -> Arc<Schema> {
1341 Arc::new(Schema::new(vec![Field::new("f0", dtype.clone(), nullable)]))
1342 }
1343
1344 #[test]
1345 fn delete_once() {
1346 let (dtype, nullable, part_length, edit_length) = config();
1347 let schema = make_schema(&dtype, nullable);
1348
1349 let part1 = generate_table(&schema, part_length, 0);
1350 let part2 = generate_table(&schema, edit_length, 1);
1351 let part3 = generate_table(&schema, part_length, part_length as u64);
1352
1353 let base = concat_batches([&part1, &part2, &part3]);
1354 let modified = concat_batches([&part1, &part3]);
1355
1356 for enable_dictionary in [false, true] {
1357 let base_data = write_with_cdc_options(
1358 &[&base],
1359 CDC_MIN_CHUNK_SIZE,
1360 CDC_MAX_CHUNK_SIZE,
1361 Some(CDC_ROW_GROUP_LENGTH),
1362 enable_dictionary,
1363 );
1364 let mod_data = write_with_cdc_options(
1365 &[&modified],
1366 CDC_MIN_CHUNK_SIZE,
1367 CDC_MAX_CHUNK_SIZE,
1368 Some(CDC_ROW_GROUP_LENGTH),
1369 enable_dictionary,
1370 );
1371
1372 let base_info = get_column_info(&base_data, 0);
1373 let mod_info = get_column_info(&mod_data, 0);
1374 assert_eq!(base_info.len(), 1);
1375 assert_eq!(mod_info.len(), 1);
1376
1377 assert_cdc_chunk_sizes(
1378 &base.column(0).clone(),
1379 &base_info[0],
1380 nullable,
1381 CDC_MIN_CHUNK_SIZE,
1382 CDC_MAX_CHUNK_SIZE,
1383 enable_dictionary,
1384 );
1385 assert_cdc_chunk_sizes(
1386 &modified.column(0).clone(),
1387 &mod_info[0],
1388 nullable,
1389 CDC_MIN_CHUNK_SIZE,
1390 CDC_MAX_CHUNK_SIZE,
1391 enable_dictionary,
1392 );
1393
1394 assert_page_length_differences(
1395 &base_info[0],
1396 &mod_info[0],
1397 0,
1398 0,
1399 1,
1400 edit_length as i64,
1401 );
1402 }
1403 }
1404
1405 #[test]
1406 fn delete_twice() {
1407 let (dtype, nullable, part_length, edit_length) = config();
1408 let schema = make_schema(&dtype, nullable);
1409
1410 let part1 = generate_table(&schema, part_length, 0);
1411 let part2 = generate_table(&schema, edit_length, 1);
1412 let part3 = generate_table(&schema, part_length, part_length as u64);
1413 let part4 = generate_table(&schema, edit_length, 2);
1414 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1415
1416 let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1417 let modified = concat_batches([&part1, &part3, &part5]);
1418
1419 for enable_dictionary in [false, true] {
1420 let base_data = write_with_cdc_options(
1421 &[&base],
1422 CDC_MIN_CHUNK_SIZE,
1423 CDC_MAX_CHUNK_SIZE,
1424 Some(CDC_ROW_GROUP_LENGTH),
1425 enable_dictionary,
1426 );
1427 let mod_data = write_with_cdc_options(
1428 &[&modified],
1429 CDC_MIN_CHUNK_SIZE,
1430 CDC_MAX_CHUNK_SIZE,
1431 Some(CDC_ROW_GROUP_LENGTH),
1432 enable_dictionary,
1433 );
1434
1435 let base_info = get_column_info(&base_data, 0);
1436 let mod_info = get_column_info(&mod_data, 0);
1437 assert_eq!(base_info.len(), 1);
1438 assert_eq!(mod_info.len(), 1);
1439
1440 assert_cdc_chunk_sizes(
1441 &base.column(0).clone(),
1442 &base_info[0],
1443 nullable,
1444 CDC_MIN_CHUNK_SIZE,
1445 CDC_MAX_CHUNK_SIZE,
1446 enable_dictionary,
1447 );
1448 assert_cdc_chunk_sizes(
1449 &modified.column(0).clone(),
1450 &mod_info[0],
1451 nullable,
1452 CDC_MIN_CHUNK_SIZE,
1453 CDC_MAX_CHUNK_SIZE,
1454 enable_dictionary,
1455 );
1456
1457 assert_page_length_differences(
1458 &base_info[0],
1459 &mod_info[0],
1460 0,
1461 0,
1462 2,
1463 edit_length as i64,
1464 );
1465 }
1466 }
1467
1468 #[test]
1469 fn insert_once() {
1470 let (dtype, nullable, part_length, edit_length) = config();
1471 let schema = make_schema(&dtype, nullable);
1472
1473 let part1 = generate_table(&schema, part_length, 0);
1474 let part2 = generate_table(&schema, edit_length, 1);
1475 let part3 = generate_table(&schema, part_length, part_length as u64);
1476
1477 let base = concat_batches([&part1, &part3]);
1478 let modified = concat_batches([&part1, &part2, &part3]);
1479
1480 for enable_dictionary in [false, true] {
1481 let base_data = write_with_cdc_options(
1482 &[&base],
1483 CDC_MIN_CHUNK_SIZE,
1484 CDC_MAX_CHUNK_SIZE,
1485 Some(CDC_ROW_GROUP_LENGTH),
1486 enable_dictionary,
1487 );
1488 let mod_data = write_with_cdc_options(
1489 &[&modified],
1490 CDC_MIN_CHUNK_SIZE,
1491 CDC_MAX_CHUNK_SIZE,
1492 Some(CDC_ROW_GROUP_LENGTH),
1493 enable_dictionary,
1494 );
1495
1496 let base_info = get_column_info(&base_data, 0);
1497 let mod_info = get_column_info(&mod_data, 0);
1498 assert_eq!(base_info.len(), 1);
1499 assert_eq!(mod_info.len(), 1);
1500
1501 assert_cdc_chunk_sizes(
1502 &base.column(0).clone(),
1503 &base_info[0],
1504 nullable,
1505 CDC_MIN_CHUNK_SIZE,
1506 CDC_MAX_CHUNK_SIZE,
1507 enable_dictionary,
1508 );
1509 assert_cdc_chunk_sizes(
1510 &modified.column(0).clone(),
1511 &mod_info[0],
1512 nullable,
1513 CDC_MIN_CHUNK_SIZE,
1514 CDC_MAX_CHUNK_SIZE,
1515 enable_dictionary,
1516 );
1517
1518 assert_page_length_differences(
1519 &base_info[0],
1520 &mod_info[0],
1521 0,
1522 1,
1523 0,
1524 edit_length as i64,
1525 );
1526 }
1527 }
1528
1529 #[test]
1530 fn insert_twice() {
1531 let (dtype, nullable, part_length, edit_length) = config();
1532 let schema = make_schema(&dtype, nullable);
1533
1534 let part1 = generate_table(&schema, part_length, 0);
1535 let part2 = generate_table(&schema, edit_length, 1);
1536 let part3 = generate_table(&schema, part_length, part_length as u64);
1537 let part4 = generate_table(&schema, edit_length, 2);
1538 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1539
1540 let base = concat_batches([&part1, &part3, &part5]);
1541 let modified = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1542
1543 for enable_dictionary in [false, true] {
1544 let base_data = write_with_cdc_options(
1545 &[&base],
1546 CDC_MIN_CHUNK_SIZE,
1547 CDC_MAX_CHUNK_SIZE,
1548 Some(CDC_ROW_GROUP_LENGTH),
1549 enable_dictionary,
1550 );
1551 let mod_data = write_with_cdc_options(
1552 &[&modified],
1553 CDC_MIN_CHUNK_SIZE,
1554 CDC_MAX_CHUNK_SIZE,
1555 Some(CDC_ROW_GROUP_LENGTH),
1556 enable_dictionary,
1557 );
1558
1559 let base_info = get_column_info(&base_data, 0);
1560 let mod_info = get_column_info(&mod_data, 0);
1561 assert_eq!(base_info.len(), 1);
1562 assert_eq!(mod_info.len(), 1);
1563
1564 assert_cdc_chunk_sizes(
1565 &base.column(0).clone(),
1566 &base_info[0],
1567 nullable,
1568 CDC_MIN_CHUNK_SIZE,
1569 CDC_MAX_CHUNK_SIZE,
1570 enable_dictionary,
1571 );
1572 assert_cdc_chunk_sizes(
1573 &modified.column(0).clone(),
1574 &mod_info[0],
1575 nullable,
1576 CDC_MIN_CHUNK_SIZE,
1577 CDC_MAX_CHUNK_SIZE,
1578 enable_dictionary,
1579 );
1580
1581 assert_page_length_differences(
1582 &base_info[0],
1583 &mod_info[0],
1584 0,
1585 2,
1586 0,
1587 edit_length as i64,
1588 );
1589 }
1590 }
1591
1592 #[test]
1593 fn update_once() {
1594 let (dtype, nullable, part_length, edit_length) = config();
1595 let schema = make_schema(&dtype, nullable);
1596
1597 let part1 = generate_table(&schema, part_length, 0);
1598 let part2 = generate_table(&schema, edit_length, 1);
1599 let part3 = generate_table(&schema, part_length, part_length as u64);
1600 let part4 = generate_table(&schema, edit_length, 2);
1601
1602 let base = concat_batches([&part1, &part2, &part3]);
1603 let modified = concat_batches([&part1, &part4, &part3]);
1604
1605 for enable_dictionary in [false, true] {
1606 let base_data = write_with_cdc_options(
1607 &[&base],
1608 CDC_MIN_CHUNK_SIZE,
1609 CDC_MAX_CHUNK_SIZE,
1610 Some(CDC_ROW_GROUP_LENGTH),
1611 enable_dictionary,
1612 );
1613 let mod_data = write_with_cdc_options(
1614 &[&modified],
1615 CDC_MIN_CHUNK_SIZE,
1616 CDC_MAX_CHUNK_SIZE,
1617 Some(CDC_ROW_GROUP_LENGTH),
1618 enable_dictionary,
1619 );
1620
1621 let base_info = get_column_info(&base_data, 0);
1622 let mod_info = get_column_info(&mod_data, 0);
1623 assert_eq!(base_info.len(), 1);
1624 assert_eq!(mod_info.len(), 1);
1625
1626 assert_cdc_chunk_sizes(
1627 &base.column(0).clone(),
1628 &base_info[0],
1629 nullable,
1630 CDC_MIN_CHUNK_SIZE,
1631 CDC_MAX_CHUNK_SIZE,
1632 enable_dictionary,
1633 );
1634 assert_cdc_chunk_sizes(
1635 &modified.column(0).clone(),
1636 &mod_info[0],
1637 nullable,
1638 CDC_MIN_CHUNK_SIZE,
1639 CDC_MAX_CHUNK_SIZE,
1640 enable_dictionary,
1641 );
1642
1643 assert_page_length_differences_update(&base_info[0], &mod_info[0], 1);
1644 }
1645 }
1646
1647 #[test]
1648 fn update_twice() {
1649 let (dtype, nullable, part_length, edit_length) = config();
1650 let schema = make_schema(&dtype, nullable);
1651
1652 let part1 = generate_table(&schema, part_length, 0);
1653 let part2 = generate_table(&schema, edit_length, 1);
1654 let part3 = generate_table(&schema, part_length, part_length as u64);
1655 let part4 = generate_table(&schema, edit_length, 2);
1656 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1657 let part6 = generate_table(&schema, edit_length, 3);
1658 let part7 = generate_table(&schema, edit_length, 4);
1659
1660 let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1661 let modified = concat_batches([&part1, &part6, &part3, &part7, &part5]);
1662
1663 for enable_dictionary in [false, true] {
1664 let base_data = write_with_cdc_options(
1665 &[&base],
1666 CDC_MIN_CHUNK_SIZE,
1667 CDC_MAX_CHUNK_SIZE,
1668 Some(CDC_ROW_GROUP_LENGTH),
1669 enable_dictionary,
1670 );
1671 let mod_data = write_with_cdc_options(
1672 &[&modified],
1673 CDC_MIN_CHUNK_SIZE,
1674 CDC_MAX_CHUNK_SIZE,
1675 Some(CDC_ROW_GROUP_LENGTH),
1676 enable_dictionary,
1677 );
1678
1679 let base_info = get_column_info(&base_data, 0);
1680 let mod_info = get_column_info(&mod_data, 0);
1681 assert_eq!(base_info.len(), 1);
1682 assert_eq!(mod_info.len(), 1);
1683
1684 assert_cdc_chunk_sizes(
1685 &base.column(0).clone(),
1686 &base_info[0],
1687 nullable,
1688 CDC_MIN_CHUNK_SIZE,
1689 CDC_MAX_CHUNK_SIZE,
1690 enable_dictionary,
1691 );
1692 assert_cdc_chunk_sizes(
1693 &modified.column(0).clone(),
1694 &mod_info[0],
1695 nullable,
1696 CDC_MIN_CHUNK_SIZE,
1697 CDC_MAX_CHUNK_SIZE,
1698 enable_dictionary,
1699 );
1700
1701 assert_page_length_differences_update(&base_info[0], &mod_info[0], 2);
1702 }
1703 }
1704
1705 #[test]
1706 fn prepend() {
1707 let (dtype, nullable, part_length, edit_length) = config();
1708 let schema = make_schema(&dtype, nullable);
1709
1710 let part1 = generate_table(&schema, part_length, 0);
1711 let part2 = generate_table(&schema, edit_length, 1);
1712 let part3 = generate_table(&schema, part_length, part_length as u64);
1713 let part4 = generate_table(&schema, edit_length, 2);
1714
1715 let base = concat_batches([&part1, &part2, &part3]);
1716 let modified = concat_batches([&part4, &part1, &part2, &part3]);
1717
1718 for enable_dictionary in [false, true] {
1719 let base_data = write_with_cdc_options(
1720 &[&base],
1721 CDC_MIN_CHUNK_SIZE,
1722 CDC_MAX_CHUNK_SIZE,
1723 Some(CDC_ROW_GROUP_LENGTH),
1724 enable_dictionary,
1725 );
1726 let mod_data = write_with_cdc_options(
1727 &[&modified],
1728 CDC_MIN_CHUNK_SIZE,
1729 CDC_MAX_CHUNK_SIZE,
1730 Some(CDC_ROW_GROUP_LENGTH),
1731 enable_dictionary,
1732 );
1733
1734 let base_info = get_column_info(&base_data, 0);
1735 let mod_info = get_column_info(&mod_data, 0);
1736 assert_eq!(base_info.len(), 1);
1737 assert_eq!(mod_info.len(), 1);
1738
1739 assert_cdc_chunk_sizes(
1740 &base.column(0).clone(),
1741 &base_info[0],
1742 nullable,
1743 CDC_MIN_CHUNK_SIZE,
1744 CDC_MAX_CHUNK_SIZE,
1745 enable_dictionary,
1746 );
1747 assert_cdc_chunk_sizes(
1748 &modified.column(0).clone(),
1749 &mod_info[0],
1750 nullable,
1751 CDC_MIN_CHUNK_SIZE,
1752 CDC_MAX_CHUNK_SIZE,
1753 enable_dictionary,
1754 );
1755
1756 assert!(
1757 mod_info[0].page_lengths.len() >= base_info[0].page_lengths.len(),
1758 "Modified should have same or more pages"
1759 );
1760
1761 assert_page_length_differences(
1762 &base_info[0],
1763 &mod_info[0],
1764 0,
1765 1,
1766 0,
1767 edit_length as i64,
1768 );
1769 }
1770 }
1771
1772 #[test]
1773 fn append() {
1774 let (dtype, nullable, part_length, edit_length) = config();
1775 let schema = make_schema(&dtype, nullable);
1776
1777 let part1 = generate_table(&schema, part_length, 0);
1778 let part2 = generate_table(&schema, edit_length, 1);
1779 let part3 = generate_table(&schema, part_length, part_length as u64);
1780 let part4 = generate_table(&schema, edit_length, 2);
1781
1782 let base = concat_batches([&part1, &part2, &part3]);
1783 let modified = concat_batches([&part1, &part2, &part3, &part4]);
1784
1785 for enable_dictionary in [false, true] {
1786 let base_data = write_with_cdc_options(
1787 &[&base],
1788 CDC_MIN_CHUNK_SIZE,
1789 CDC_MAX_CHUNK_SIZE,
1790 Some(CDC_ROW_GROUP_LENGTH),
1791 enable_dictionary,
1792 );
1793 let mod_data = write_with_cdc_options(
1794 &[&modified],
1795 CDC_MIN_CHUNK_SIZE,
1796 CDC_MAX_CHUNK_SIZE,
1797 Some(CDC_ROW_GROUP_LENGTH),
1798 enable_dictionary,
1799 );
1800
1801 let base_info = get_column_info(&base_data, 0);
1802 let mod_info = get_column_info(&mod_data, 0);
1803 assert_eq!(base_info.len(), 1);
1804 assert_eq!(mod_info.len(), 1);
1805
1806 assert_cdc_chunk_sizes(
1807 &base.column(0).clone(),
1808 &base_info[0],
1809 nullable,
1810 CDC_MIN_CHUNK_SIZE,
1811 CDC_MAX_CHUNK_SIZE,
1812 enable_dictionary,
1813 );
1814 assert_cdc_chunk_sizes(
1815 &modified.column(0).clone(),
1816 &mod_info[0],
1817 nullable,
1818 CDC_MIN_CHUNK_SIZE,
1819 CDC_MAX_CHUNK_SIZE,
1820 enable_dictionary,
1821 );
1822
1823 let bp = &base_info[0].page_lengths;
1824 let mp = &mod_info[0].page_lengths;
1825 assert!(mp.len() >= bp.len());
1826 for i in 0..bp.len() - 1 {
1827 assert_eq!(bp[i], mp[i], "Page {i} should be identical");
1828 }
1829 assert!(mp[bp.len() - 1] >= bp[bp.len() - 1]);
1830 }
1831 }
1832
1833 #[test]
1834 fn empty_table() {
1835 let (dtype, nullable, _, _) = config();
1836 let schema = make_schema(&dtype, nullable);
1837
1838 let empty = RecordBatch::new_empty(schema);
1839 for enable_dictionary in [false, true] {
1840 let data = write_with_cdc_options(
1841 &[&empty],
1842 CDC_MIN_CHUNK_SIZE,
1843 CDC_MAX_CHUNK_SIZE,
1844 Some(CDC_ROW_GROUP_LENGTH),
1845 enable_dictionary,
1846 );
1847 let info = get_column_info(&data, 0);
1848 if !info.is_empty() {
1850 assert!(info[0].page_lengths.is_empty());
1851 }
1852 }
1853 }
1854
1855 #[test]
1856 fn array_offsets() {
1857 let (dtype, nullable, part_length, edit_length) = config();
1858 let schema = make_schema(&dtype, nullable);
1859
1860 let table = concat_batches([
1861 &generate_table(&schema, part_length, 0),
1862 &generate_table(&schema, edit_length, 1),
1863 &generate_table(&schema, part_length, part_length as u64),
1864 ]);
1865
1866 for offset in [0usize, 512, 1024] {
1867 if offset >= table.num_rows() {
1868 continue;
1869 }
1870 let sliced = table.slice(offset, table.num_rows() - offset);
1871 let data = write_with_cdc_options(
1872 &[&sliced],
1873 CDC_MIN_CHUNK_SIZE,
1874 CDC_MAX_CHUNK_SIZE,
1875 Some(CDC_ROW_GROUP_LENGTH),
1876 true,
1877 );
1878 let info = get_column_info(&data, 0);
1879 assert_eq!(info.len(), 1);
1880
1881 assert_cdc_chunk_sizes(
1883 &sliced.column(0).clone(),
1884 &info[0],
1885 nullable,
1886 CDC_MIN_CHUNK_SIZE,
1887 CDC_MAX_CHUNK_SIZE,
1888 true,
1889 );
1890 }
1891 }
1892 }
1893 };
1894 }
1895
1896 cdc_single_rg_tests!(cdc_bool_non_null, DataType::Boolean, false);
1898 cdc_single_rg_tests!(cdc_i32_non_null, DataType::Int32, false);
1899 cdc_single_rg_tests!(cdc_i64_nullable, DataType::Int64, true);
1900 cdc_single_rg_tests!(cdc_f64_nullable, DataType::Float64, true);
1901 cdc_single_rg_tests!(cdc_utf8_non_null, DataType::Utf8, false);
1902 cdc_single_rg_tests!(cdc_binary_nullable, DataType::Binary, true);
1903 cdc_single_rg_tests!(cdc_fsb16_nullable, DataType::FixedSizeBinary(16), true);
1904 cdc_single_rg_tests!(cdc_date32_non_null, DataType::Date32, false);
1905 cdc_single_rg_tests!(
1906 cdc_timestamp_nullable,
1907 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1908 true
1909 );
1910
1911 mod cdc_multiple_row_groups {
1914 use super::*;
1915
1916 const PART_LENGTH: usize = 128 * 1024;
1917 const EDIT_LENGTH: usize = 128;
1918 const ROW_GROUP_LENGTH: usize = 64 * 1024;
1919
1920 fn schema() -> Arc<Schema> {
1921 Arc::new(Schema::new(vec![
1922 Field::new("int32", DataType::Int32, true),
1923 Field::new("float64", DataType::Float64, true),
1924 Field::new("bool", DataType::Boolean, false),
1925 ]))
1926 }
1927
1928 #[test]
1929 fn insert_once() {
1930 let s = schema();
1931 let part1 = generate_table(&s, PART_LENGTH, 0);
1932 let part2 = generate_table(&s, PART_LENGTH, 2);
1933 let part3 = generate_table(&s, PART_LENGTH, 4);
1934 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1935 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1936
1937 let base = concat_batches([&part1, &edit1, &part2, &part3]);
1938 let modified = concat_batches([&part1, &edit1, &edit2, &part2, &part3]);
1939 assert_eq!(modified.num_rows(), base.num_rows() + EDIT_LENGTH);
1940
1941 let base_data = write_with_cdc_options(
1942 &[&base],
1943 CDC_MIN_CHUNK_SIZE,
1944 CDC_MAX_CHUNK_SIZE,
1945 Some(ROW_GROUP_LENGTH),
1946 false,
1947 );
1948 let mod_data = write_with_cdc_options(
1949 &[&modified],
1950 CDC_MIN_CHUNK_SIZE,
1951 CDC_MAX_CHUNK_SIZE,
1952 Some(ROW_GROUP_LENGTH),
1953 false,
1954 );
1955
1956 for col in 0..s.fields().len() {
1957 let base_info = get_column_info(&base_data, col);
1958 let mod_info = get_column_info(&mod_data, col);
1959
1960 assert_eq!(base_info.len(), 7, "expected 7 row groups for col {col}");
1961 assert_eq!(mod_info.len(), 7);
1962
1963 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
1965 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
1966
1967 for i in 2..mod_info.len() - 1 {
1969 assert_page_length_differences(
1970 &base_info[i],
1971 &mod_info[i],
1972 0,
1973 1,
1974 1,
1975 EDIT_LENGTH as i64,
1976 );
1977 }
1978 assert_page_length_differences(
1980 base_info.last().unwrap(),
1981 mod_info.last().unwrap(),
1982 0,
1983 1,
1984 0,
1985 EDIT_LENGTH as i64,
1986 );
1987 }
1988 }
1989
1990 #[test]
1991 fn delete_once() {
1992 let s = schema();
1993 let part1 = generate_table(&s, PART_LENGTH, 0);
1994 let part2 = generate_table(&s, PART_LENGTH, 2);
1995 let part3 = generate_table(&s, PART_LENGTH, 4);
1996 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1997 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1998
1999 let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2000 let modified = concat_batches([&part1, &part2, &part3, &edit2]);
2001
2002 let base_data = write_with_cdc_options(
2003 &[&base],
2004 CDC_MIN_CHUNK_SIZE,
2005 CDC_MAX_CHUNK_SIZE,
2006 Some(ROW_GROUP_LENGTH),
2007 false,
2008 );
2009 let mod_data = write_with_cdc_options(
2010 &[&modified],
2011 CDC_MIN_CHUNK_SIZE,
2012 CDC_MAX_CHUNK_SIZE,
2013 Some(ROW_GROUP_LENGTH),
2014 false,
2015 );
2016
2017 for col in 0..s.fields().len() {
2018 let base_info = get_column_info(&base_data, col);
2019 let mod_info = get_column_info(&mod_data, col);
2020
2021 assert_eq!(base_info.len(), 7);
2022 assert_eq!(mod_info.len(), 7);
2023
2024 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2025 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2026
2027 for i in 2..mod_info.len() - 1 {
2028 assert_page_length_differences(
2029 &base_info[i],
2030 &mod_info[i],
2031 0,
2032 1,
2033 1,
2034 EDIT_LENGTH as i64,
2035 );
2036 }
2037 assert_page_length_differences(
2038 base_info.last().unwrap(),
2039 mod_info.last().unwrap(),
2040 0,
2041 0,
2042 1,
2043 EDIT_LENGTH as i64,
2044 );
2045 }
2046 }
2047
2048 #[test]
2049 fn update_once() {
2050 let s = schema();
2051 let part1 = generate_table(&s, PART_LENGTH, 0);
2052 let part2 = generate_table(&s, PART_LENGTH, 2);
2053 let part3 = generate_table(&s, PART_LENGTH, 4);
2054 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2055 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2056 let edit3 = generate_table(&s, EDIT_LENGTH, 5);
2057
2058 let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2059 let modified = concat_batches([&part1, &edit3, &part2, &part3, &edit2]);
2060
2061 let base_data = write_with_cdc_options(
2062 &[&base],
2063 CDC_MIN_CHUNK_SIZE,
2064 CDC_MAX_CHUNK_SIZE,
2065 Some(ROW_GROUP_LENGTH),
2066 false,
2067 );
2068 let mod_data = write_with_cdc_options(
2069 &[&modified],
2070 CDC_MIN_CHUNK_SIZE,
2071 CDC_MAX_CHUNK_SIZE,
2072 Some(ROW_GROUP_LENGTH),
2073 false,
2074 );
2075
2076 for col in 0..s.fields().len() {
2077 let nullable = s.field(col).is_nullable();
2078 let base_info = get_column_info(&base_data, col);
2079 let mod_info = get_column_info(&mod_data, col);
2080
2081 assert_eq!(base_info.len(), 7);
2082 assert_eq!(mod_info.len(), 7);
2083
2084 assert_cdc_chunk_sizes(
2086 &base.column(col).slice(0, ROW_GROUP_LENGTH),
2087 &base_info[0],
2088 nullable,
2089 CDC_MIN_CHUNK_SIZE,
2090 CDC_MAX_CHUNK_SIZE,
2091 false,
2092 );
2093
2094 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2095 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2096
2097 assert_page_length_differences_update(&base_info[2], &mod_info[2], 1);
2099
2100 for i in 3..mod_info.len() {
2102 assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2103 }
2104 }
2105 }
2106
2107 #[test]
2108 fn append() {
2109 let s = schema();
2110 let part1 = generate_table(&s, PART_LENGTH, 0);
2111 let part2 = generate_table(&s, PART_LENGTH, 2);
2112 let part3 = generate_table(&s, PART_LENGTH, 4);
2113 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2114 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2115
2116 let base = concat_batches([&part1, &edit1, &part2, &part3]);
2117 let modified = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2118
2119 let base_data = write_with_cdc_options(
2120 &[&base],
2121 CDC_MIN_CHUNK_SIZE,
2122 CDC_MAX_CHUNK_SIZE,
2123 Some(ROW_GROUP_LENGTH),
2124 false,
2125 );
2126 let mod_data = write_with_cdc_options(
2127 &[&modified],
2128 CDC_MIN_CHUNK_SIZE,
2129 CDC_MAX_CHUNK_SIZE,
2130 Some(ROW_GROUP_LENGTH),
2131 false,
2132 );
2133
2134 for col in 0..s.fields().len() {
2135 let nullable = s.field(col).is_nullable();
2136 let base_info = get_column_info(&base_data, col);
2137 let mod_info = get_column_info(&mod_data, col);
2138
2139 assert_eq!(base_info.len(), 7);
2140 assert_eq!(mod_info.len(), 7);
2141
2142 assert_cdc_chunk_sizes(
2144 &base.column(col).slice(0, ROW_GROUP_LENGTH),
2145 &base_info[0],
2146 nullable,
2147 CDC_MIN_CHUNK_SIZE,
2148 CDC_MAX_CHUNK_SIZE,
2149 false,
2150 );
2151
2152 for i in 0..base_info.len() - 1 {
2154 assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2155 }
2156
2157 let bp = &base_info.last().unwrap().page_lengths;
2159 let mp = &mod_info.last().unwrap().page_lengths;
2160 assert!(mp.len() >= bp.len());
2161 for i in 0..bp.len() - 1 {
2162 assert_eq!(bp[i], mp[i]);
2163 }
2164 }
2165 }
2166 }
2167
2168 #[test]
2171 fn test_cdc_array_offsets_direct() {
2172 use crate::basic::Type as PhysicalType;
2173 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
2174
2175 let options = CdcOptions {
2176 min_chunk_size: CDC_MIN_CHUNK_SIZE,
2177 max_chunk_size: CDC_MAX_CHUNK_SIZE,
2178 norm_level: 0,
2179 };
2180 let desc = {
2181 let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
2182 .build()
2183 .unwrap();
2184 ColumnDescriptor::new(Arc::new(tp), 0, 0, ColumnPath::new(vec![]))
2185 };
2186
2187 let bpr = bytes_per_record(&DataType::Int32, false);
2188 let n = CDC_PART_SIZE / bpr;
2189 let offset = 10usize;
2190
2191 let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as i32).collect();
2192 let mut chunker = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2193 let chunks = chunker.get_arrow_chunks(None, None, &array).unwrap();
2194
2195 let sliced = array.slice(offset, n - offset);
2196 let mut chunker2 = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2197 let chunks2 = chunker2.get_arrow_chunks(None, None, &sliced).unwrap();
2198
2199 let values: Vec<usize> = chunks.iter().map(|c| c.num_values).collect();
2200 let values2: Vec<usize> = chunks2.iter().map(|c| c.num_values).collect();
2201
2202 assert!(values.len() > 1, "expected multiple chunks, got {values:?}");
2203 assert_eq!(values.len(), values2.len(), "chunk count must match");
2204
2205 assert_eq!(
2206 values[0] - values2[0],
2207 offset,
2208 "offsetted first chunk should be {offset} values shorter"
2209 );
2210 assert_eq!(
2211 &values[1..],
2212 &values2[1..],
2213 "all chunks after the first must be identical"
2214 );
2215 }
2216
2217 #[test]
2222 fn test_cdc_list_roundtrip() {
2223 let schema = Arc::new(Schema::new(vec![
2224 Field::new(
2225 "_1",
2226 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2227 true,
2228 ),
2229 Field::new(
2230 "_2",
2231 DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2232 true,
2233 ),
2234 Field::new(
2235 "_3",
2236 DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, true))),
2237 true,
2238 ),
2239 ]));
2240 let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2241 write_with_cdc_options(
2242 &[&batch],
2243 CDC_MIN_CHUNK_SIZE,
2244 CDC_MAX_CHUNK_SIZE,
2245 None,
2246 true,
2247 );
2248 }
2249
2250 #[test]
2252 fn test_cdc_deeply_nested_roundtrip() {
2253 let inner_field = Field::new_list_field(DataType::Int32, true);
2254 let inner_type = DataType::List(Arc::new(inner_field));
2255 let outer_field = Field::new_list_field(inner_type.clone(), true);
2256 let list_list_type = DataType::List(Arc::new(outer_field));
2257
2258 let struct_inner_field = Field::new_list_field(DataType::Int32, true);
2259 let struct_inner_type = DataType::List(Arc::new(struct_inner_field));
2260 let struct_fields = Fields::from(vec![Field::new("a", struct_inner_type, true)]);
2261 let struct_type = DataType::Struct(struct_fields);
2262 let struct_list_field = Field::new_list_field(struct_type, true);
2263 let list_struct_type = DataType::List(Arc::new(struct_list_field));
2264
2265 let schema = Arc::new(Schema::new(vec![
2266 Field::new("list_list", list_list_type, true),
2267 Field::new("list_struct_list", list_struct_type, true),
2268 ]));
2269 let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2270 write_with_cdc_options(
2271 &[&batch],
2272 CDC_MIN_CHUNK_SIZE,
2273 CDC_MAX_CHUNK_SIZE,
2274 None,
2275 true,
2276 );
2277 }
2278
2279 #[test]
2285 fn test_cdc_list_non_empty_null_segments() {
2286 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2293 let offsets = Buffer::from_iter([0_i32, 2, 5, 7, 9, 10]);
2294 let null_bitmap = Buffer::from([0b00010101]); let list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
2297 let list_data = unsafe {
2298 ArrayData::new_unchecked(
2299 list_type.clone(),
2300 5,
2301 None,
2302 Some(null_bitmap),
2303 0,
2304 vec![offsets],
2305 vec![values.to_data()],
2306 )
2307 };
2308 let list_array = arrow_array::make_array(list_data);
2309
2310 let schema = Arc::new(Schema::new(vec![Field::new("col", list_type, true)]));
2311 let batch = RecordBatch::try_new(schema, vec![list_array]).unwrap();
2312
2313 let buf = write_with_cdc_options(
2314 &[&batch],
2315 CDC_MIN_CHUNK_SIZE,
2316 CDC_MAX_CHUNK_SIZE,
2317 None,
2318 true,
2319 );
2320 let read = concat_batches(read_batches(&buf));
2321 let read_list = read.column(0).as_list::<i32>();
2322 assert_eq!(read_list.len(), 5);
2323 assert!(read_list.is_valid(0));
2324 assert!(read_list.is_null(1));
2325 assert!(read_list.is_valid(2));
2326 assert!(read_list.is_null(3));
2327 assert!(read_list.is_valid(4));
2328
2329 let get_vals = |i: usize| -> Vec<i32> {
2330 read_list
2331 .value(i)
2332 .as_primitive::<arrow_array::types::Int32Type>()
2333 .values()
2334 .iter()
2335 .copied()
2336 .collect()
2337 };
2338 assert_eq!(get_vals(0), vec![1, 2]);
2339 assert_eq!(get_vals(2), vec![6, 7]);
2340 assert_eq!(get_vals(4), vec![10]);
2341 }
2342}