1use crate::errors::{ParquetError, Result};
19use crate::file::properties::CdcOptions;
20use crate::schema::types::ColumnDescriptor;
21
22use super::CdcChunk;
23use super::cdc_generated::{GEARHASH_TABLE, NUM_GEARHASH_TABLES};
24
25#[derive(Debug)]
86pub(crate) struct ContentDefinedChunker {
87 max_def_level: i16,
89 max_rep_level: i16,
91 repeated_ancestor_def_level: i16,
93
94 min_chunk_size: i64,
99 max_chunk_size: i64,
107 rolling_hash_mask: u64,
109
110 rolling_hash: u64,
112 has_matched: bool,
114 nth_run: usize,
116 chunk_size: i64,
118}
119
120impl ContentDefinedChunker {
121 pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self> {
122 let rolling_hash_mask = Self::calculate_mask(
123 options.min_chunk_size as i64,
124 options.max_chunk_size as i64,
125 options.norm_level,
126 )?;
127 Ok(Self {
128 max_def_level: desc.max_def_level(),
129 max_rep_level: desc.max_rep_level(),
130 repeated_ancestor_def_level: desc.repeated_ancestor_def_level(),
131 min_chunk_size: options.min_chunk_size as i64,
132 max_chunk_size: options.max_chunk_size as i64,
133 rolling_hash_mask,
134 rolling_hash: 0,
135 has_matched: false,
136 nth_run: 0,
137 chunk_size: 0,
138 })
139 }
140
141 fn calculate_mask(min_chunk_size: i64, max_chunk_size: i64, norm_level: i32) -> Result<u64> {
146 if min_chunk_size < 0 {
147 return Err(ParquetError::General(
148 "min_chunk_size must be non-negative".to_string(),
149 ));
150 }
151 if max_chunk_size <= min_chunk_size {
152 return Err(ParquetError::General(
153 "max_chunk_size must be greater than min_chunk_size".to_string(),
154 ));
155 }
156
157 let avg_chunk_size = (min_chunk_size + max_chunk_size) / 2;
158 let target_size = (avg_chunk_size - min_chunk_size) / NUM_GEARHASH_TABLES as i64;
161
162 let mask_bits = if target_size > 0 {
164 63 - target_size.leading_zeros() as i32
165 } else {
166 0
167 };
168
169 let effective_bits = mask_bits - norm_level;
170
171 if !(1..=63).contains(&effective_bits) {
172 return Err(ParquetError::General(format!(
173 "The number of bits in the CDC mask must be between 1 and 63, got {effective_bits}"
174 )));
175 }
176
177 Ok(u64::MAX << (64 - effective_bits))
179 }
180
181 #[inline]
188 fn roll(&mut self, bytes: &[u8]) {
189 self.chunk_size += bytes.len() as i64;
190 if self.chunk_size < self.min_chunk_size {
191 return;
192 }
193 for &b in bytes {
194 self.rolling_hash = self
195 .rolling_hash
196 .wrapping_shl(1)
197 .wrapping_add(GEARHASH_TABLE[self.nth_run][b as usize]);
198 self.has_matched =
199 self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
200 }
201 }
202
203 #[inline(always)]
208 fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N]) {
209 self.chunk_size += N as i64;
210 if self.chunk_size < self.min_chunk_size {
211 return;
212 }
213 for j in 0..N {
214 self.rolling_hash = self
215 .rolling_hash
216 .wrapping_shl(1)
217 .wrapping_add(GEARHASH_TABLE[self.nth_run][bytes[j] as usize]);
218 self.has_matched =
219 self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
220 }
221 }
222
223 #[inline]
225 fn roll_level(&mut self, level: i16) {
226 self.roll_fixed(&level.to_le_bytes());
227 }
228
229 #[inline]
247 fn need_new_chunk(&mut self) -> bool {
248 if self.has_matched {
249 self.has_matched = false;
250 self.nth_run += 1;
251 if self.nth_run >= NUM_GEARHASH_TABLES {
252 self.nth_run = 0;
253 self.chunk_size = 0;
254 return true;
255 }
256 }
257 if self.chunk_size >= self.max_chunk_size {
258 self.chunk_size = 0;
259 return true;
260 }
261 false
262 }
263
264 fn calculate<F>(
277 &mut self,
278 def_levels: Option<&[i16]>,
279 rep_levels: Option<&[i16]>,
280 num_levels: usize,
281 mut roll_value: F,
282 ) -> Vec<CdcChunk>
283 where
284 F: FnMut(&mut Self, usize),
285 {
286 let has_def_levels = self.max_def_level > 0;
287 let has_rep_levels = self.max_rep_level > 0;
288
289 let mut chunks = Vec::new();
290 let mut prev_offset: usize = 0;
291 let mut prev_value_offset: usize = 0;
292 let mut total_values: usize = num_levels;
294
295 if !has_rep_levels && !has_def_levels {
296 for offset in 0..num_levels {
298 roll_value(self, offset);
299 if self.need_new_chunk() {
300 chunks.push(CdcChunk {
301 level_offset: prev_offset,
302 value_offset: prev_offset,
303 num_levels: offset - prev_offset,
304 num_values: offset - prev_offset,
305 });
306 prev_offset = offset;
307 }
308 }
309 prev_value_offset = prev_offset;
311 } else if !has_rep_levels {
312 let def_levels = def_levels.expect("def_levels required when max_def_level > 0");
314 #[allow(clippy::needless_range_loop)]
315 for offset in 0..num_levels {
316 let def_level = def_levels[offset];
317 self.roll_level(def_level);
318 if def_level == self.max_def_level {
319 roll_value(self, offset);
320 }
321 if self.need_new_chunk() {
322 chunks.push(CdcChunk {
323 level_offset: prev_offset,
324 value_offset: prev_offset,
325 num_levels: offset - prev_offset,
326 num_values: offset - prev_offset,
327 });
328 prev_offset = offset;
329 }
330 }
331 prev_value_offset = prev_offset;
333 } else {
334 let def_levels = def_levels.expect("def_levels required for nested data");
336 let rep_levels = rep_levels.expect("rep_levels required for nested data");
337 let mut value_offset: usize = 0;
338
339 for offset in 0..num_levels {
340 let def_level = def_levels[offset];
341 let rep_level = rep_levels[offset];
342
343 self.roll_level(def_level);
344 self.roll_level(rep_level);
345 if def_level == self.max_def_level {
346 roll_value(self, value_offset);
347 }
348
349 if rep_level == 0 && self.need_new_chunk() {
350 let levels_to_write = offset - prev_offset;
352 if levels_to_write > 0 {
353 chunks.push(CdcChunk {
354 level_offset: prev_offset,
355 value_offset: prev_value_offset,
356 num_levels: levels_to_write,
357 num_values: value_offset - prev_value_offset,
358 });
359 prev_offset = offset;
360 prev_value_offset = value_offset;
361 }
362 }
363 if def_level >= self.repeated_ancestor_def_level {
364 value_offset += 1;
366 }
367 }
368 total_values = value_offset;
369 }
370
371 if prev_offset < num_levels {
373 chunks.push(CdcChunk {
374 level_offset: prev_offset,
375 value_offset: prev_value_offset,
376 num_levels: num_levels - prev_offset,
377 num_values: total_values - prev_value_offset,
378 });
379 }
380
381 #[cfg(debug_assertions)]
382 self.validate_chunks(&chunks, num_levels, total_values);
383
384 chunks
385 }
386
387 #[cfg(feature = "arrow")]
390 pub(crate) fn get_arrow_chunks(
391 &mut self,
392 def_levels: Option<&[i16]>,
393 rep_levels: Option<&[i16]>,
394 array: &dyn arrow_array::Array,
395 ) -> Result<Vec<CdcChunk>> {
396 use arrow_array::cast::AsArray;
397 use arrow_schema::DataType;
398
399 let num_levels = match def_levels {
400 Some(def_levels) => def_levels.len(),
401 None => array.len(),
402 };
403
404 macro_rules! fixed_width {
405 ($N:literal) => {{
406 let data = array.to_data();
407 let buffer = data.buffers()[0].as_slice();
408 let values = &buffer[data.offset() * $N..];
409 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
410 let offset = i * $N;
411 let slice = &values[offset..offset + $N];
412 c.roll_fixed::<$N>(slice.try_into().unwrap());
413 })
414 }};
415 }
416
417 macro_rules! binary_like {
418 ($a:expr) => {{
419 let a = $a;
420 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
421 c.roll(a.value(i).as_ref());
422 })
423 }};
424 }
425
426 let dtype = array.data_type();
427 let chunks = match dtype {
428 DataType::Null => self.calculate(def_levels, rep_levels, num_levels, |_, _| {}),
429 DataType::Boolean => {
430 let a = array.as_boolean();
431 self.calculate(def_levels, rep_levels, num_levels, |c, i| {
432 c.roll_fixed(&[a.value(i) as u8]);
433 })
434 }
435 DataType::Int8 | DataType::UInt8 => fixed_width!(1),
436 DataType::Int16 | DataType::UInt16 | DataType::Float16 => fixed_width!(2),
437 DataType::Int32
438 | DataType::UInt32
439 | DataType::Float32
440 | DataType::Date32
441 | DataType::Time32(_)
442 | DataType::Interval(arrow_schema::IntervalUnit::YearMonth)
443 | DataType::Decimal32(_, _) => fixed_width!(4),
444 DataType::Int64
445 | DataType::UInt64
446 | DataType::Float64
447 | DataType::Date64
448 | DataType::Time64(_)
449 | DataType::Timestamp(_, _)
450 | DataType::Duration(_)
451 | DataType::Interval(arrow_schema::IntervalUnit::DayTime)
452 | DataType::Decimal64(_, _) => fixed_width!(8),
453 DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
454 | DataType::Decimal128(_, _) => fixed_width!(16),
455 DataType::Decimal256(_, _) => fixed_width!(32),
456 DataType::FixedSizeBinary(_) => binary_like!(array.as_fixed_size_binary()),
457 DataType::Binary => binary_like!(array.as_binary::<i32>()),
458 DataType::LargeBinary => binary_like!(array.as_binary::<i64>()),
459 DataType::Utf8 => binary_like!(array.as_string::<i32>()),
460 DataType::LargeUtf8 => binary_like!(array.as_string::<i64>()),
461 DataType::BinaryView => binary_like!(array.as_binary_view()),
462 DataType::Utf8View => binary_like!(array.as_string_view()),
463 DataType::Dictionary(_, _) => {
464 let dict = array.as_any_dictionary();
465 self.get_arrow_chunks(def_levels, rep_levels, dict.keys())?
466 }
467 _ => {
468 return Err(ParquetError::General(format!(
469 "content-defined chunking is not supported for data type {dtype:?}",
470 )));
471 }
472 };
473 Ok(chunks)
474 }
475
476 #[cfg(debug_assertions)]
477 fn validate_chunks(&self, chunks: &[CdcChunk], num_levels: usize, total_values: usize) {
478 assert!(!chunks.is_empty(), "chunks must be non-empty");
479
480 let first = &chunks[0];
481 assert_eq!(first.level_offset, 0, "first chunk must start at level 0");
482 assert_eq!(first.value_offset, 0, "first chunk must start at value 0");
483
484 let mut sum_levels = first.num_levels;
485 let mut sum_values = first.num_values;
486 for i in 1..chunks.len() {
487 let chunk = &chunks[i];
488 let prev = &chunks[i - 1];
489 assert!(chunk.num_levels > 0, "chunk must have levels");
490 assert_eq!(
491 chunk.level_offset,
492 prev.level_offset + prev.num_levels,
493 "level offsets must be contiguous"
494 );
495 assert_eq!(
496 chunk.value_offset,
497 prev.value_offset + prev.num_values,
498 "value offsets must be contiguous"
499 );
500 sum_levels += chunk.num_levels;
501 sum_values += chunk.num_values;
502 }
503 assert_eq!(sum_levels, num_levels, "chunks must cover all levels");
504 assert_eq!(sum_values, total_values, "chunks must cover all values");
505
506 let last = chunks.last().unwrap();
507 assert_eq!(
508 last.level_offset + last.num_levels,
509 num_levels,
510 "last chunk must end at num_levels"
511 );
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::basic::Type as PhysicalType;
519 use crate::schema::types::{ColumnPath, Type};
520 use std::sync::Arc;
521
522 fn make_desc(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
523 let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
524 .build()
525 .unwrap();
526 ColumnDescriptor::new(
527 Arc::new(tp),
528 max_def_level,
529 max_rep_level,
530 ColumnPath::new(vec![]),
531 )
532 }
533
534 #[test]
535 fn test_calculate_mask_defaults() {
536 let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 0).unwrap();
537 let expected = u64::MAX << (64 - 15);
540 assert_eq!(mask, expected);
541 }
542
543 #[test]
544 fn test_calculate_mask_with_norm_level() {
545 let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 1).unwrap();
546 let expected = u64::MAX << (64 - 14);
547 assert_eq!(mask, expected);
548 }
549
550 #[test]
551 fn test_calculate_mask_invalid() {
552 assert!(ContentDefinedChunker::calculate_mask(-1, 100, 0).is_err());
553 assert!(ContentDefinedChunker::calculate_mask(100, 50, 0).is_err());
554 assert!(ContentDefinedChunker::calculate_mask(100, 100, 0).is_err());
555 }
556
557 #[test]
558 fn test_non_nested_non_null_single_chunk() {
559 let options = CdcOptions {
560 min_chunk_size: 8,
561 max_chunk_size: 1024,
562 norm_level: 0,
563 };
564 let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
565
566 let num_values = 4;
568 let chunks = chunker.calculate(None, None, num_values, |c, i| {
569 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
570 });
571 assert_eq!(chunks.len(), 1);
572 assert_eq!(chunks[0].level_offset, 0);
573 assert_eq!(chunks[0].value_offset, 0);
574 assert_eq!(chunks[0].num_levels, 4);
575 }
576
577 #[test]
578 fn test_max_chunk_size_forces_boundary() {
579 let options = CdcOptions {
580 min_chunk_size: 256,
581 max_chunk_size: 1024,
582 norm_level: 0,
583 };
584 let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
585
586 let num_values = 2000;
589 let chunks = chunker.calculate(None, None, num_values, |c, i| {
590 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
591 });
592
593 assert!(chunks.len() > 1);
595
596 let mut total_levels = 0;
598 for (i, chunk) in chunks.iter().enumerate() {
599 assert_eq!(chunk.level_offset, total_levels);
600 if i < chunks.len() - 1 {
601 assert!(chunk.num_levels > 0);
602 }
603 total_levels += chunk.num_levels;
604 }
605 assert_eq!(total_levels, num_values);
606 }
607
608 #[test]
609 fn test_deterministic_chunks() {
610 let options = CdcOptions {
611 min_chunk_size: 4,
612 max_chunk_size: 64,
613 norm_level: 0,
614 };
615
616 let roll = |c: &mut ContentDefinedChunker, i: usize| {
617 c.roll_fixed::<8>(&(i as i64).to_le_bytes());
618 };
619
620 let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
621 let chunks1 = chunker1.calculate(None, None, 200, roll);
622
623 let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
624 let chunks2 = chunker2.calculate(None, None, 200, roll);
625
626 assert_eq!(chunks1.len(), chunks2.len());
627 for (a, b) in chunks1.iter().zip(chunks2.iter()) {
628 assert_eq!(a.level_offset, b.level_offset);
629 assert_eq!(a.value_offset, b.value_offset);
630 assert_eq!(a.num_levels, b.num_levels);
631 }
632 }
633
634 #[test]
635 fn test_nullable_non_nested() {
636 let options = CdcOptions {
637 min_chunk_size: 4,
638 max_chunk_size: 64,
639 norm_level: 0,
640 };
641 let mut chunker = ContentDefinedChunker::new(&make_desc(1, 0), &options).unwrap();
642
643 let num_levels = 20;
644 let def_levels: Vec<i16> = (0..num_levels)
646 .map(|i| if i % 3 == 0 { 0 } else { 1 })
647 .collect();
648
649 let chunks = chunker.calculate(Some(&def_levels), None, num_levels, |c, i| {
650 c.roll_fixed::<4>(&(i as i32).to_le_bytes());
651 });
652
653 assert!(!chunks.is_empty());
654 let total: usize = chunks.iter().map(|c| c.num_levels).sum();
655 assert_eq!(total, num_levels);
656 }
657}
658
659#[cfg(all(test, feature = "arrow"))]
662mod arrow_tests {
663 use std::borrow::Borrow;
664 use std::sync::Arc;
665
666 use arrow_array::cast::AsArray;
667 use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
668 use arrow_schema::{DataType, Field, Schema};
669
670 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
671 use crate::arrow::arrow_writer::ArrowWriter;
672 use crate::file::properties::{CdcOptions, WriterProperties};
673 use crate::file::reader::{FileReader, SerializedFileReader};
674
675 const CDC_MIN_CHUNK_SIZE: usize = 4 * 1024;
678 const CDC_MAX_CHUNK_SIZE: usize = 16 * 1024;
679 const CDC_PART_SIZE: usize = 128 * 1024;
680 const CDC_EDIT_SIZE: usize = 128;
681 const CDC_ROW_GROUP_LENGTH: usize = 1024 * 1024;
682
683 fn test_hash(seed: u64, index: u64) -> u64 {
687 let mut h = (index.wrapping_add(seed)).wrapping_mul(0xc4ceb9fe1a85ec53u64);
688 h ^= h >> 33;
689 h = h.wrapping_mul(0xff51afd7ed558ccdu64);
690 h ^= h >> 33;
691 h = h.wrapping_mul(0xc4ceb9fe1a85ec53u64);
692 h ^= h >> 33;
693 h
694 }
695
696 fn generate_array(dtype: &DataType, nullable: bool, length: usize, seed: u64) -> ArrayRef {
698 macro_rules! gen_primitive {
699 ($array_type:ty, $cast:expr) => {{
700 if nullable {
701 let arr: $array_type = (0..length)
702 .map(|i| {
703 let val = test_hash(seed, i as u64);
704 if val % 10 == 0 {
705 None
706 } else {
707 Some($cast(val))
708 }
709 })
710 .collect();
711 Arc::new(arr) as ArrayRef
712 } else {
713 let arr: $array_type = (0..length)
714 .map(|i| Some($cast(test_hash(seed, i as u64))))
715 .collect();
716 Arc::new(arr) as ArrayRef
717 }
718 }};
719 }
720
721 match dtype {
722 DataType::Boolean => {
723 if nullable {
724 let arr: BooleanArray = (0..length)
725 .map(|i| {
726 let val = test_hash(seed, i as u64);
727 if val % 10 == 0 {
728 None
729 } else {
730 Some(val % 2 == 0)
731 }
732 })
733 .collect();
734 Arc::new(arr)
735 } else {
736 let arr: BooleanArray = (0..length)
737 .map(|i| Some(test_hash(seed, i as u64) % 2 == 0))
738 .collect();
739 Arc::new(arr)
740 }
741 }
742 DataType::Int32 => gen_primitive!(Int32Array, |v: u64| v as i32),
743 DataType::Int64 => {
744 gen_primitive!(arrow_array::Int64Array, |v: u64| v as i64)
745 }
746 DataType::Float64 => {
747 gen_primitive!(arrow_array::Float64Array, |v: u64| (v % 100000) as f64
748 / 1000.0)
749 }
750 DataType::Utf8 => {
751 let arr: arrow_array::StringArray = if nullable {
752 (0..length)
753 .map(|i| {
754 let val = test_hash(seed, i as u64);
755 if val % 10 == 0 {
756 None
757 } else {
758 Some(format!("str_{val}"))
759 }
760 })
761 .collect()
762 } else {
763 (0..length)
764 .map(|i| Some(format!("str_{}", test_hash(seed, i as u64))))
765 .collect()
766 };
767 Arc::new(arr)
768 }
769 DataType::Binary => {
770 let arr: arrow_array::BinaryArray = if nullable {
771 (0..length)
772 .map(|i| {
773 let val = test_hash(seed, i as u64);
774 if val % 10 == 0 {
775 None
776 } else {
777 Some(format!("bin_{val}").into_bytes())
778 }
779 })
780 .collect()
781 } else {
782 (0..length)
783 .map(|i| Some(format!("bin_{}", test_hash(seed, i as u64)).into_bytes()))
784 .collect()
785 };
786 Arc::new(arr)
787 }
788 DataType::FixedSizeBinary(size) => {
789 let size = *size;
790 let mut builder = arrow_array::builder::FixedSizeBinaryBuilder::new(size);
791 for i in 0..length {
792 let val = test_hash(seed, i as u64);
793 if nullable && val % 10 == 0 {
794 builder.append_null();
795 } else {
796 let s = format!("bin_{val}");
797 let bytes = s.as_bytes();
798 let mut buf = vec![0u8; size as usize];
799 let copy_len = bytes.len().min(size as usize);
800 buf[..copy_len].copy_from_slice(&bytes[..copy_len]);
801 builder.append_value(&buf).unwrap();
802 }
803 }
804 Arc::new(builder.finish())
805 }
806 DataType::Date32 => {
807 gen_primitive!(arrow_array::Date32Array, |v: u64| v as i32)
808 }
809 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
810 gen_primitive!(arrow_array::TimestampNanosecondArray, |v: u64| v as i64)
811 }
812 _ => panic!("Unsupported test data type: {dtype:?}"),
813 }
814 }
815
816 fn generate_table(schema: &Arc<Schema>, length: usize, seed: u64) -> RecordBatch {
818 let arrays: Vec<ArrayRef> = schema
819 .fields()
820 .iter()
821 .enumerate()
822 .map(|(i, field)| {
823 generate_array(
824 field.data_type(),
825 field.is_nullable(),
826 length,
827 seed + i as u64 * 10,
828 )
829 })
830 .collect();
831 RecordBatch::try_new(schema.clone(), arrays).unwrap()
832 }
833
834 fn cdc_byte_width(dtype: &DataType) -> usize {
837 match dtype {
838 DataType::Boolean => 1,
839 DataType::Int8 | DataType::UInt8 => 1,
840 DataType::Int16 | DataType::UInt16 | DataType::Float16 => 2,
841 DataType::Int32
842 | DataType::UInt32
843 | DataType::Float32
844 | DataType::Date32
845 | DataType::Time32(_) => 4,
846 DataType::Int64
847 | DataType::UInt64
848 | DataType::Float64
849 | DataType::Date64
850 | DataType::Time64(_)
851 | DataType::Timestamp(_, _)
852 | DataType::Duration(_) => 8,
853 DataType::Decimal128(_, _) => 16,
854 DataType::Decimal256(_, _) => 32,
855 DataType::FixedSizeBinary(n) => *n as usize,
856 _ => 0, }
858 }
859
860 fn bytes_per_record(dtype: &DataType, nullable: bool) -> usize {
862 let bw = cdc_byte_width(dtype);
863 if bw > 0 {
864 if nullable { bw + 2 } else { bw }
865 } else {
866 16 }
868 }
869
870 fn calculate_cdc_size(array: &dyn Array, nullable: bool) -> i64 {
872 let dtype = array.data_type();
873 let bw = cdc_byte_width(dtype);
874 let result = if bw > 0 {
875 let valid_count = array.len() - array.null_count();
877 (valid_count * bw) as i64
878 } else {
879 match dtype {
881 DataType::Utf8 => {
882 let a = array.as_string::<i32>();
883 (0..a.len())
884 .filter(|&i| a.is_valid(i))
885 .map(|i| a.value(i).len() as i64)
886 .sum()
887 }
888 DataType::Binary => {
889 let a = array.as_binary::<i32>();
890 (0..a.len())
891 .filter(|&i| a.is_valid(i))
892 .map(|i| a.value(i).len() as i64)
893 .sum()
894 }
895 DataType::LargeBinary => {
896 let a = array.as_binary::<i64>();
897 (0..a.len())
898 .filter(|&i| a.is_valid(i))
899 .map(|i| a.value(i).len() as i64)
900 .sum()
901 }
902 _ => panic!("CDC size calculation not implemented for {dtype:?}"),
903 }
904 };
905
906 if nullable {
907 result + array.len() as i64 * 2
909 } else {
910 result
911 }
912 }
913
914 struct ColumnInfo {
916 page_lengths: Vec<i64>,
917 has_dictionary_page: bool,
918 }
919
920 fn get_column_info(data: &[u8], column_index: usize) -> Vec<ColumnInfo> {
922 let reader = SerializedFileReader::new(bytes::Bytes::from(data.to_vec())).unwrap();
923 let metadata = reader.metadata();
924 let mut result = Vec::new();
925 for rg in 0..metadata.num_row_groups() {
926 let rg_reader = reader.get_row_group(rg).unwrap();
927 let col_reader = rg_reader.get_column_page_reader(column_index).unwrap();
928 let mut info = ColumnInfo {
929 page_lengths: Vec::new(),
930 has_dictionary_page: false,
931 };
932 for page in col_reader {
933 let page = page.unwrap();
934 match page.page_type() {
935 crate::basic::PageType::DATA_PAGE | crate::basic::PageType::DATA_PAGE_V2 => {
936 info.page_lengths.push(page.num_values() as i64);
937 }
938 crate::basic::PageType::DICTIONARY_PAGE => {
939 info.has_dictionary_page = true;
940 }
941 _ => {}
942 }
943 }
944 result.push(info);
945 }
946 result
947 }
948
949 fn assert_cdc_chunk_sizes(
952 array: &ArrayRef,
953 info: &ColumnInfo,
954 nullable: bool,
955 min_chunk_size: usize,
956 max_chunk_size: usize,
957 expect_dictionary_page: bool,
958 ) {
959 let expect_dict = match array.data_type() {
961 DataType::Boolean | DataType::FixedSizeBinary(_) => false,
962 _ => expect_dictionary_page,
963 };
964 assert_eq!(
965 info.has_dictionary_page,
966 expect_dict,
967 "dictionary page mismatch for {:?}",
968 array.data_type()
969 );
970
971 let page_lengths = &info.page_lengths;
972 assert!(
973 page_lengths.len() > 1,
974 "CDC should produce multiple pages, got {page_lengths:?}"
975 );
976
977 let bw = cdc_byte_width(array.data_type());
978 if bw > 0
980 || matches!(
981 array.data_type(),
982 DataType::Utf8 | DataType::Binary | DataType::LargeBinary
983 )
984 {
985 let mut offset = 0i64;
986 for (i, &page_len) in page_lengths.iter().enumerate() {
987 let slice = array.slice(offset as usize, page_len as usize);
988 let cdc_size = calculate_cdc_size(slice.as_ref(), nullable);
989 if i < page_lengths.len() - 1 {
990 assert!(
991 cdc_size >= min_chunk_size as i64,
992 "Page {i}: CDC size {cdc_size} < min {min_chunk_size}, pages={page_lengths:?}"
993 );
994 }
995 assert!(
996 cdc_size <= max_chunk_size as i64,
997 "Page {i}: CDC size {cdc_size} > max {max_chunk_size}, pages={page_lengths:?}"
998 );
999 offset += page_len;
1000 }
1001 assert_eq!(
1002 offset,
1003 array.len() as i64,
1004 "page lengths must sum to array length"
1005 );
1006 }
1007 }
1008
1009 fn write_with_cdc_options(
1012 batches: &[&RecordBatch],
1013 min_chunk_size: usize,
1014 max_chunk_size: usize,
1015 max_row_group_rows: Option<usize>,
1016 enable_dictionary: bool,
1017 ) -> Vec<u8> {
1018 assert!(!batches.is_empty());
1019 let schema = batches[0].schema();
1020 let mut builder = WriterProperties::builder()
1021 .set_dictionary_enabled(enable_dictionary)
1022 .set_content_defined_chunking(Some(CdcOptions {
1023 min_chunk_size,
1024 max_chunk_size,
1025 norm_level: 0,
1026 }));
1027 if let Some(max_rows) = max_row_group_rows {
1028 builder = builder.set_max_row_group_row_count(Some(max_rows));
1029 }
1030 let props = builder.build();
1031 let mut buf = Vec::new();
1032 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
1033 for batch in batches {
1034 writer.write(batch).unwrap();
1035 }
1036 writer.close().unwrap();
1037
1038 let readback = read_batches(&buf);
1040 let original_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1041 let readback_rows: usize = readback.iter().map(|b| b.num_rows()).sum();
1042 assert_eq!(original_rows, readback_rows, "Roundtrip row count mismatch");
1043 if original_rows > 0 {
1044 let original = concat_batches(batches.iter().copied());
1045 let roundtrip = concat_batches(&readback);
1046 assert_eq!(original, roundtrip, "Roundtrip validation failed");
1047 }
1048
1049 buf
1050 }
1051
1052 fn read_batches(data: &[u8]) -> Vec<RecordBatch> {
1053 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(data.to_vec()))
1054 .unwrap()
1055 .build()
1056 .unwrap();
1057 reader.collect::<std::result::Result<Vec<_>, _>>().unwrap()
1058 }
1059
1060 fn concat_batches(batches: impl IntoIterator<Item = impl Borrow<RecordBatch>>) -> RecordBatch {
1061 let batches: Vec<_> = batches.into_iter().collect();
1062 let schema = batches[0].borrow().schema();
1063 let batches = batches.iter().map(|b| b.borrow());
1064 arrow_select::concat::concat_batches(&schema, batches).unwrap()
1065 }
1066
1067 fn find_differences(first: &[i64], second: &[i64]) -> Vec<(Vec<i64>, Vec<i64>)> {
1070 let n = first.len();
1071 let m = second.len();
1072 let mut dp = vec![vec![0usize; m + 1]; n + 1];
1073 for i in 0..n {
1074 for j in 0..m {
1075 if first[i] == second[j] {
1076 dp[i + 1][j + 1] = dp[i][j] + 1;
1077 } else {
1078 dp[i + 1][j + 1] = dp[i + 1][j].max(dp[i][j + 1]);
1079 }
1080 }
1081 }
1082 let mut common = Vec::new();
1083 let (mut i, mut j) = (n, m);
1084 while i > 0 && j > 0 {
1085 if first[i - 1] == second[j - 1] {
1086 common.push((i - 1, j - 1));
1087 i -= 1;
1088 j -= 1;
1089 } else if dp[i - 1][j] >= dp[i][j - 1] {
1090 i -= 1;
1091 } else {
1092 j -= 1;
1093 }
1094 }
1095 common.reverse();
1096
1097 let mut result = Vec::new();
1098 let (mut last_i, mut last_j) = (0usize, 0usize);
1099 for (ci, cj) in &common {
1100 if *ci > last_i || *cj > last_j {
1101 result.push((first[last_i..*ci].to_vec(), second[last_j..*cj].to_vec()));
1102 }
1103 last_i = ci + 1;
1104 last_j = cj + 1;
1105 }
1106 if last_i < n || last_j < m {
1107 result.push((first[last_i..].to_vec(), second[last_j..].to_vec()));
1108 }
1109
1110 let mut merged: Vec<(Vec<i64>, Vec<i64>)> = Vec::new();
1112 for diff in result {
1113 if let Some(prev) = merged.last_mut() {
1114 if prev.0.is_empty() && diff.1.is_empty() {
1115 prev.0 = diff.0;
1116 continue;
1117 } else if prev.1.is_empty() && diff.0.is_empty() {
1118 prev.1 = diff.1;
1119 continue;
1120 }
1121 }
1122 merged.push(diff);
1123 }
1124 merged
1125 }
1126
1127 fn assert_page_length_differences(
1130 original: &ColumnInfo,
1131 modified: &ColumnInfo,
1132 exact_equal_diffs: usize,
1133 exact_larger_diffs: usize,
1134 exact_smaller_diffs: usize,
1135 edit_length: i64,
1136 ) {
1137 let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1138 let expected = exact_equal_diffs + exact_larger_diffs + exact_smaller_diffs;
1139
1140 if diffs.len() != expected {
1141 eprintln!("Original: {:?}", original.page_lengths);
1142 eprintln!("Modified: {:?}", modified.page_lengths);
1143 for d in &diffs {
1144 eprintln!(" Diff: {:?} vs {:?}", d.0, d.1);
1145 }
1146 }
1147 assert_eq!(
1148 diffs.len(),
1149 expected,
1150 "Expected {expected} diffs, got {}",
1151 diffs.len()
1152 );
1153
1154 let (mut eq, mut larger, mut smaller) = (0usize, 0usize, 0usize);
1155 for (left, right) in &diffs {
1156 let left_sum: i64 = left.iter().sum();
1157 let right_sum: i64 = right.iter().sum();
1158 if left_sum == right_sum {
1159 eq += 1;
1160 } else if left_sum < right_sum {
1161 larger += 1;
1162 assert_eq!(
1163 left_sum + edit_length,
1164 right_sum,
1165 "Larger diff mismatch: {left_sum} + {edit_length} != {right_sum}"
1166 );
1167 } else {
1168 smaller += 1;
1169 assert_eq!(
1170 left_sum,
1171 right_sum + edit_length,
1172 "Smaller diff mismatch: {left_sum} != {right_sum} + {edit_length}"
1173 );
1174 }
1175 }
1176
1177 assert_eq!(eq, exact_equal_diffs, "equal diffs count");
1178 assert_eq!(larger, exact_larger_diffs, "larger diffs count");
1179 assert_eq!(smaller, exact_smaller_diffs, "smaller diffs count");
1180 }
1181
1182 fn assert_page_length_differences_update(
1185 original: &ColumnInfo,
1186 modified: &ColumnInfo,
1187 max_equal_diffs: usize,
1188 ) {
1189 let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1190 assert!(
1191 diffs.len() <= max_equal_diffs,
1192 "Expected at most {max_equal_diffs} diffs, got {}",
1193 diffs.len()
1194 );
1195 for (left, right) in &diffs {
1196 let left_sum: i64 = left.iter().sum();
1197 let right_sum: i64 = right.iter().sum();
1198 assert_eq!(
1199 left_sum, right_sum,
1200 "Update diff should not change total row count"
1201 );
1202 }
1203 }
1204
1205 #[test]
1208 fn test_find_differences_basic() {
1209 let diffs = find_differences(&[1, 2, 3, 4, 5], &[1, 7, 8, 4, 5]);
1210 assert_eq!(diffs.len(), 1);
1211 assert_eq!(diffs[0].0, vec![2, 3]);
1212 assert_eq!(diffs[0].1, vec![7, 8]);
1213 }
1214
1215 #[test]
1216 fn test_find_differences_multiple() {
1217 let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7], &[1, 8, 9, 4, 10, 6, 11]);
1218 assert_eq!(diffs.len(), 3);
1219 assert_eq!(diffs[0].0, vec![2, 3]);
1220 assert_eq!(diffs[0].1, vec![8, 9]);
1221 assert_eq!(diffs[1].0, vec![5]);
1222 assert_eq!(diffs[1].1, vec![10]);
1223 assert_eq!(diffs[2].0, vec![7]);
1224 assert_eq!(diffs[2].1, vec![11]);
1225 }
1226
1227 #[test]
1228 fn test_find_differences_different_lengths() {
1229 let diffs = find_differences(&[1, 2, 3], &[1, 2, 3, 4, 5]);
1230 assert_eq!(diffs.len(), 1);
1231 assert!(diffs[0].0.is_empty());
1232 assert_eq!(diffs[0].1, vec![4, 5]);
1233 }
1234
1235 #[test]
1236 fn test_find_differences_empty() {
1237 let diffs = find_differences(&[], &[]);
1238 assert!(diffs.is_empty());
1239 }
1240
1241 #[test]
1242 fn test_find_differences_changes_at_both_ends() {
1243 let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7, 8, 9], &[0, 0, 2, 3, 4, 5, 7, 7, 8]);
1244 assert_eq!(diffs.len(), 3);
1245 assert_eq!(diffs[0].0, vec![1]);
1246 assert_eq!(diffs[0].1, vec![0, 0]);
1247 assert_eq!(diffs[1].0, vec![6]);
1248 assert_eq!(diffs[1].1, vec![7]);
1249 assert_eq!(diffs[2].0, vec![9]);
1250 assert!(diffs[2].1.is_empty());
1251 }
1252
1253 #[test]
1254 fn test_find_differences_additional() {
1255 let diffs = find_differences(
1256 &[445, 312, 393, 401, 410, 138, 558, 457],
1257 &[445, 312, 393, 393, 410, 138, 558, 457],
1258 );
1259 assert_eq!(diffs.len(), 1);
1260 assert_eq!(diffs[0].0, vec![401]);
1261 assert_eq!(diffs[0].1, vec![393]);
1262 }
1263
1264 macro_rules! cdc_single_rg_tests {
1267 ($mod_name:ident, $dtype:expr, $nullable:expr) => {
1268 mod $mod_name {
1269 use super::*;
1270
1271 fn config() -> (DataType, bool, usize, usize) {
1272 let dtype: DataType = $dtype;
1273 let nullable: bool = $nullable;
1274 let bpr = bytes_per_record(&dtype, nullable);
1275 let part_length = CDC_PART_SIZE / bpr;
1276 let edit_length = CDC_EDIT_SIZE / bpr;
1277 (dtype, nullable, part_length, edit_length)
1278 }
1279
1280 fn make_schema(dtype: &DataType, nullable: bool) -> Arc<Schema> {
1281 Arc::new(Schema::new(vec![Field::new("f0", dtype.clone(), nullable)]))
1282 }
1283
1284 #[test]
1285 fn delete_once() {
1286 let (dtype, nullable, part_length, edit_length) = config();
1287 let schema = make_schema(&dtype, nullable);
1288
1289 let part1 = generate_table(&schema, part_length, 0);
1290 let part2 = generate_table(&schema, edit_length, 1);
1291 let part3 = generate_table(&schema, part_length, part_length as u64);
1292
1293 let base = concat_batches([&part1, &part2, &part3]);
1294 let modified = concat_batches([&part1, &part3]);
1295
1296 for enable_dictionary in [false, true] {
1297 let base_data = write_with_cdc_options(
1298 &[&base],
1299 CDC_MIN_CHUNK_SIZE,
1300 CDC_MAX_CHUNK_SIZE,
1301 Some(CDC_ROW_GROUP_LENGTH),
1302 enable_dictionary,
1303 );
1304 let mod_data = write_with_cdc_options(
1305 &[&modified],
1306 CDC_MIN_CHUNK_SIZE,
1307 CDC_MAX_CHUNK_SIZE,
1308 Some(CDC_ROW_GROUP_LENGTH),
1309 enable_dictionary,
1310 );
1311
1312 let base_info = get_column_info(&base_data, 0);
1313 let mod_info = get_column_info(&mod_data, 0);
1314 assert_eq!(base_info.len(), 1);
1315 assert_eq!(mod_info.len(), 1);
1316
1317 assert_cdc_chunk_sizes(
1318 &base.column(0).clone(),
1319 &base_info[0],
1320 nullable,
1321 CDC_MIN_CHUNK_SIZE,
1322 CDC_MAX_CHUNK_SIZE,
1323 enable_dictionary,
1324 );
1325 assert_cdc_chunk_sizes(
1326 &modified.column(0).clone(),
1327 &mod_info[0],
1328 nullable,
1329 CDC_MIN_CHUNK_SIZE,
1330 CDC_MAX_CHUNK_SIZE,
1331 enable_dictionary,
1332 );
1333
1334 assert_page_length_differences(
1335 &base_info[0],
1336 &mod_info[0],
1337 0,
1338 0,
1339 1,
1340 edit_length as i64,
1341 );
1342 }
1343 }
1344
1345 #[test]
1346 fn delete_twice() {
1347 let (dtype, nullable, part_length, edit_length) = config();
1348 let schema = make_schema(&dtype, nullable);
1349
1350 let part1 = generate_table(&schema, part_length, 0);
1351 let part2 = generate_table(&schema, edit_length, 1);
1352 let part3 = generate_table(&schema, part_length, part_length as u64);
1353 let part4 = generate_table(&schema, edit_length, 2);
1354 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1355
1356 let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1357 let modified = concat_batches([&part1, &part3, &part5]);
1358
1359 for enable_dictionary in [false, true] {
1360 let base_data = write_with_cdc_options(
1361 &[&base],
1362 CDC_MIN_CHUNK_SIZE,
1363 CDC_MAX_CHUNK_SIZE,
1364 Some(CDC_ROW_GROUP_LENGTH),
1365 enable_dictionary,
1366 );
1367 let mod_data = write_with_cdc_options(
1368 &[&modified],
1369 CDC_MIN_CHUNK_SIZE,
1370 CDC_MAX_CHUNK_SIZE,
1371 Some(CDC_ROW_GROUP_LENGTH),
1372 enable_dictionary,
1373 );
1374
1375 let base_info = get_column_info(&base_data, 0);
1376 let mod_info = get_column_info(&mod_data, 0);
1377 assert_eq!(base_info.len(), 1);
1378 assert_eq!(mod_info.len(), 1);
1379
1380 assert_cdc_chunk_sizes(
1381 &base.column(0).clone(),
1382 &base_info[0],
1383 nullable,
1384 CDC_MIN_CHUNK_SIZE,
1385 CDC_MAX_CHUNK_SIZE,
1386 enable_dictionary,
1387 );
1388 assert_cdc_chunk_sizes(
1389 &modified.column(0).clone(),
1390 &mod_info[0],
1391 nullable,
1392 CDC_MIN_CHUNK_SIZE,
1393 CDC_MAX_CHUNK_SIZE,
1394 enable_dictionary,
1395 );
1396
1397 assert_page_length_differences(
1398 &base_info[0],
1399 &mod_info[0],
1400 0,
1401 0,
1402 2,
1403 edit_length as i64,
1404 );
1405 }
1406 }
1407
1408 #[test]
1409 fn insert_once() {
1410 let (dtype, nullable, part_length, edit_length) = config();
1411 let schema = make_schema(&dtype, nullable);
1412
1413 let part1 = generate_table(&schema, part_length, 0);
1414 let part2 = generate_table(&schema, edit_length, 1);
1415 let part3 = generate_table(&schema, part_length, part_length as u64);
1416
1417 let base = concat_batches([&part1, &part3]);
1418 let modified = concat_batches([&part1, &part2, &part3]);
1419
1420 for enable_dictionary in [false, true] {
1421 let base_data = write_with_cdc_options(
1422 &[&base],
1423 CDC_MIN_CHUNK_SIZE,
1424 CDC_MAX_CHUNK_SIZE,
1425 Some(CDC_ROW_GROUP_LENGTH),
1426 enable_dictionary,
1427 );
1428 let mod_data = write_with_cdc_options(
1429 &[&modified],
1430 CDC_MIN_CHUNK_SIZE,
1431 CDC_MAX_CHUNK_SIZE,
1432 Some(CDC_ROW_GROUP_LENGTH),
1433 enable_dictionary,
1434 );
1435
1436 let base_info = get_column_info(&base_data, 0);
1437 let mod_info = get_column_info(&mod_data, 0);
1438 assert_eq!(base_info.len(), 1);
1439 assert_eq!(mod_info.len(), 1);
1440
1441 assert_cdc_chunk_sizes(
1442 &base.column(0).clone(),
1443 &base_info[0],
1444 nullable,
1445 CDC_MIN_CHUNK_SIZE,
1446 CDC_MAX_CHUNK_SIZE,
1447 enable_dictionary,
1448 );
1449 assert_cdc_chunk_sizes(
1450 &modified.column(0).clone(),
1451 &mod_info[0],
1452 nullable,
1453 CDC_MIN_CHUNK_SIZE,
1454 CDC_MAX_CHUNK_SIZE,
1455 enable_dictionary,
1456 );
1457
1458 assert_page_length_differences(
1459 &base_info[0],
1460 &mod_info[0],
1461 0,
1462 1,
1463 0,
1464 edit_length as i64,
1465 );
1466 }
1467 }
1468
1469 #[test]
1470 fn insert_twice() {
1471 let (dtype, nullable, part_length, edit_length) = config();
1472 let schema = make_schema(&dtype, nullable);
1473
1474 let part1 = generate_table(&schema, part_length, 0);
1475 let part2 = generate_table(&schema, edit_length, 1);
1476 let part3 = generate_table(&schema, part_length, part_length as u64);
1477 let part4 = generate_table(&schema, edit_length, 2);
1478 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1479
1480 let base = concat_batches([&part1, &part3, &part5]);
1481 let modified = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1482
1483 for enable_dictionary in [false, true] {
1484 let base_data = write_with_cdc_options(
1485 &[&base],
1486 CDC_MIN_CHUNK_SIZE,
1487 CDC_MAX_CHUNK_SIZE,
1488 Some(CDC_ROW_GROUP_LENGTH),
1489 enable_dictionary,
1490 );
1491 let mod_data = write_with_cdc_options(
1492 &[&modified],
1493 CDC_MIN_CHUNK_SIZE,
1494 CDC_MAX_CHUNK_SIZE,
1495 Some(CDC_ROW_GROUP_LENGTH),
1496 enable_dictionary,
1497 );
1498
1499 let base_info = get_column_info(&base_data, 0);
1500 let mod_info = get_column_info(&mod_data, 0);
1501 assert_eq!(base_info.len(), 1);
1502 assert_eq!(mod_info.len(), 1);
1503
1504 assert_cdc_chunk_sizes(
1505 &base.column(0).clone(),
1506 &base_info[0],
1507 nullable,
1508 CDC_MIN_CHUNK_SIZE,
1509 CDC_MAX_CHUNK_SIZE,
1510 enable_dictionary,
1511 );
1512 assert_cdc_chunk_sizes(
1513 &modified.column(0).clone(),
1514 &mod_info[0],
1515 nullable,
1516 CDC_MIN_CHUNK_SIZE,
1517 CDC_MAX_CHUNK_SIZE,
1518 enable_dictionary,
1519 );
1520
1521 assert_page_length_differences(
1522 &base_info[0],
1523 &mod_info[0],
1524 0,
1525 2,
1526 0,
1527 edit_length as i64,
1528 );
1529 }
1530 }
1531
1532 #[test]
1533 fn update_once() {
1534 let (dtype, nullable, part_length, edit_length) = config();
1535 let schema = make_schema(&dtype, nullable);
1536
1537 let part1 = generate_table(&schema, part_length, 0);
1538 let part2 = generate_table(&schema, edit_length, 1);
1539 let part3 = generate_table(&schema, part_length, part_length as u64);
1540 let part4 = generate_table(&schema, edit_length, 2);
1541
1542 let base = concat_batches([&part1, &part2, &part3]);
1543 let modified = concat_batches([&part1, &part4, &part3]);
1544
1545 for enable_dictionary in [false, true] {
1546 let base_data = write_with_cdc_options(
1547 &[&base],
1548 CDC_MIN_CHUNK_SIZE,
1549 CDC_MAX_CHUNK_SIZE,
1550 Some(CDC_ROW_GROUP_LENGTH),
1551 enable_dictionary,
1552 );
1553 let mod_data = write_with_cdc_options(
1554 &[&modified],
1555 CDC_MIN_CHUNK_SIZE,
1556 CDC_MAX_CHUNK_SIZE,
1557 Some(CDC_ROW_GROUP_LENGTH),
1558 enable_dictionary,
1559 );
1560
1561 let base_info = get_column_info(&base_data, 0);
1562 let mod_info = get_column_info(&mod_data, 0);
1563 assert_eq!(base_info.len(), 1);
1564 assert_eq!(mod_info.len(), 1);
1565
1566 assert_cdc_chunk_sizes(
1567 &base.column(0).clone(),
1568 &base_info[0],
1569 nullable,
1570 CDC_MIN_CHUNK_SIZE,
1571 CDC_MAX_CHUNK_SIZE,
1572 enable_dictionary,
1573 );
1574 assert_cdc_chunk_sizes(
1575 &modified.column(0).clone(),
1576 &mod_info[0],
1577 nullable,
1578 CDC_MIN_CHUNK_SIZE,
1579 CDC_MAX_CHUNK_SIZE,
1580 enable_dictionary,
1581 );
1582
1583 assert_page_length_differences_update(&base_info[0], &mod_info[0], 1);
1584 }
1585 }
1586
1587 #[test]
1588 fn update_twice() {
1589 let (dtype, nullable, part_length, edit_length) = config();
1590 let schema = make_schema(&dtype, nullable);
1591
1592 let part1 = generate_table(&schema, part_length, 0);
1593 let part2 = generate_table(&schema, edit_length, 1);
1594 let part3 = generate_table(&schema, part_length, part_length as u64);
1595 let part4 = generate_table(&schema, edit_length, 2);
1596 let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1597 let part6 = generate_table(&schema, edit_length, 3);
1598 let part7 = generate_table(&schema, edit_length, 4);
1599
1600 let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1601 let modified = concat_batches([&part1, &part6, &part3, &part7, &part5]);
1602
1603 for enable_dictionary in [false, true] {
1604 let base_data = write_with_cdc_options(
1605 &[&base],
1606 CDC_MIN_CHUNK_SIZE,
1607 CDC_MAX_CHUNK_SIZE,
1608 Some(CDC_ROW_GROUP_LENGTH),
1609 enable_dictionary,
1610 );
1611 let mod_data = write_with_cdc_options(
1612 &[&modified],
1613 CDC_MIN_CHUNK_SIZE,
1614 CDC_MAX_CHUNK_SIZE,
1615 Some(CDC_ROW_GROUP_LENGTH),
1616 enable_dictionary,
1617 );
1618
1619 let base_info = get_column_info(&base_data, 0);
1620 let mod_info = get_column_info(&mod_data, 0);
1621 assert_eq!(base_info.len(), 1);
1622 assert_eq!(mod_info.len(), 1);
1623
1624 assert_cdc_chunk_sizes(
1625 &base.column(0).clone(),
1626 &base_info[0],
1627 nullable,
1628 CDC_MIN_CHUNK_SIZE,
1629 CDC_MAX_CHUNK_SIZE,
1630 enable_dictionary,
1631 );
1632 assert_cdc_chunk_sizes(
1633 &modified.column(0).clone(),
1634 &mod_info[0],
1635 nullable,
1636 CDC_MIN_CHUNK_SIZE,
1637 CDC_MAX_CHUNK_SIZE,
1638 enable_dictionary,
1639 );
1640
1641 assert_page_length_differences_update(&base_info[0], &mod_info[0], 2);
1642 }
1643 }
1644
1645 #[test]
1646 fn prepend() {
1647 let (dtype, nullable, part_length, edit_length) = config();
1648 let schema = make_schema(&dtype, nullable);
1649
1650 let part1 = generate_table(&schema, part_length, 0);
1651 let part2 = generate_table(&schema, edit_length, 1);
1652 let part3 = generate_table(&schema, part_length, part_length as u64);
1653 let part4 = generate_table(&schema, edit_length, 2);
1654
1655 let base = concat_batches([&part1, &part2, &part3]);
1656 let modified = concat_batches([&part4, &part1, &part2, &part3]);
1657
1658 for enable_dictionary in [false, true] {
1659 let base_data = write_with_cdc_options(
1660 &[&base],
1661 CDC_MIN_CHUNK_SIZE,
1662 CDC_MAX_CHUNK_SIZE,
1663 Some(CDC_ROW_GROUP_LENGTH),
1664 enable_dictionary,
1665 );
1666 let mod_data = write_with_cdc_options(
1667 &[&modified],
1668 CDC_MIN_CHUNK_SIZE,
1669 CDC_MAX_CHUNK_SIZE,
1670 Some(CDC_ROW_GROUP_LENGTH),
1671 enable_dictionary,
1672 );
1673
1674 let base_info = get_column_info(&base_data, 0);
1675 let mod_info = get_column_info(&mod_data, 0);
1676 assert_eq!(base_info.len(), 1);
1677 assert_eq!(mod_info.len(), 1);
1678
1679 assert_cdc_chunk_sizes(
1680 &base.column(0).clone(),
1681 &base_info[0],
1682 nullable,
1683 CDC_MIN_CHUNK_SIZE,
1684 CDC_MAX_CHUNK_SIZE,
1685 enable_dictionary,
1686 );
1687 assert_cdc_chunk_sizes(
1688 &modified.column(0).clone(),
1689 &mod_info[0],
1690 nullable,
1691 CDC_MIN_CHUNK_SIZE,
1692 CDC_MAX_CHUNK_SIZE,
1693 enable_dictionary,
1694 );
1695
1696 assert!(
1697 mod_info[0].page_lengths.len() >= base_info[0].page_lengths.len(),
1698 "Modified should have same or more pages"
1699 );
1700
1701 assert_page_length_differences(
1702 &base_info[0],
1703 &mod_info[0],
1704 0,
1705 1,
1706 0,
1707 edit_length as i64,
1708 );
1709 }
1710 }
1711
1712 #[test]
1713 fn append() {
1714 let (dtype, nullable, part_length, edit_length) = config();
1715 let schema = make_schema(&dtype, nullable);
1716
1717 let part1 = generate_table(&schema, part_length, 0);
1718 let part2 = generate_table(&schema, edit_length, 1);
1719 let part3 = generate_table(&schema, part_length, part_length as u64);
1720 let part4 = generate_table(&schema, edit_length, 2);
1721
1722 let base = concat_batches([&part1, &part2, &part3]);
1723 let modified = concat_batches([&part1, &part2, &part3, &part4]);
1724
1725 for enable_dictionary in [false, true] {
1726 let base_data = write_with_cdc_options(
1727 &[&base],
1728 CDC_MIN_CHUNK_SIZE,
1729 CDC_MAX_CHUNK_SIZE,
1730 Some(CDC_ROW_GROUP_LENGTH),
1731 enable_dictionary,
1732 );
1733 let mod_data = write_with_cdc_options(
1734 &[&modified],
1735 CDC_MIN_CHUNK_SIZE,
1736 CDC_MAX_CHUNK_SIZE,
1737 Some(CDC_ROW_GROUP_LENGTH),
1738 enable_dictionary,
1739 );
1740
1741 let base_info = get_column_info(&base_data, 0);
1742 let mod_info = get_column_info(&mod_data, 0);
1743 assert_eq!(base_info.len(), 1);
1744 assert_eq!(mod_info.len(), 1);
1745
1746 assert_cdc_chunk_sizes(
1747 &base.column(0).clone(),
1748 &base_info[0],
1749 nullable,
1750 CDC_MIN_CHUNK_SIZE,
1751 CDC_MAX_CHUNK_SIZE,
1752 enable_dictionary,
1753 );
1754 assert_cdc_chunk_sizes(
1755 &modified.column(0).clone(),
1756 &mod_info[0],
1757 nullable,
1758 CDC_MIN_CHUNK_SIZE,
1759 CDC_MAX_CHUNK_SIZE,
1760 enable_dictionary,
1761 );
1762
1763 let bp = &base_info[0].page_lengths;
1764 let mp = &mod_info[0].page_lengths;
1765 assert!(mp.len() >= bp.len());
1766 for i in 0..bp.len() - 1 {
1767 assert_eq!(bp[i], mp[i], "Page {i} should be identical");
1768 }
1769 assert!(mp[bp.len() - 1] >= bp[bp.len() - 1]);
1770 }
1771 }
1772
1773 #[test]
1774 fn empty_table() {
1775 let (dtype, nullable, _, _) = config();
1776 let schema = make_schema(&dtype, nullable);
1777
1778 let empty = RecordBatch::new_empty(schema);
1779 for enable_dictionary in [false, true] {
1780 let data = write_with_cdc_options(
1781 &[&empty],
1782 CDC_MIN_CHUNK_SIZE,
1783 CDC_MAX_CHUNK_SIZE,
1784 Some(CDC_ROW_GROUP_LENGTH),
1785 enable_dictionary,
1786 );
1787 let info = get_column_info(&data, 0);
1788 if !info.is_empty() {
1790 assert!(info[0].page_lengths.is_empty());
1791 }
1792 }
1793 }
1794
1795 #[test]
1796 fn array_offsets() {
1797 let (dtype, nullable, part_length, edit_length) = config();
1798 let schema = make_schema(&dtype, nullable);
1799
1800 let table = concat_batches([
1801 &generate_table(&schema, part_length, 0),
1802 &generate_table(&schema, edit_length, 1),
1803 &generate_table(&schema, part_length, part_length as u64),
1804 ]);
1805
1806 for offset in [0usize, 512, 1024] {
1807 if offset >= table.num_rows() {
1808 continue;
1809 }
1810 let sliced = table.slice(offset, table.num_rows() - offset);
1811 let data = write_with_cdc_options(
1812 &[&sliced],
1813 CDC_MIN_CHUNK_SIZE,
1814 CDC_MAX_CHUNK_SIZE,
1815 Some(CDC_ROW_GROUP_LENGTH),
1816 true,
1817 );
1818 let info = get_column_info(&data, 0);
1819 assert_eq!(info.len(), 1);
1820
1821 assert_cdc_chunk_sizes(
1823 &sliced.column(0).clone(),
1824 &info[0],
1825 nullable,
1826 CDC_MIN_CHUNK_SIZE,
1827 CDC_MAX_CHUNK_SIZE,
1828 true,
1829 );
1830 }
1831 }
1832 }
1833 };
1834 }
1835
1836 cdc_single_rg_tests!(cdc_bool_non_null, DataType::Boolean, false);
1838 cdc_single_rg_tests!(cdc_i32_non_null, DataType::Int32, false);
1839 cdc_single_rg_tests!(cdc_i64_nullable, DataType::Int64, true);
1840 cdc_single_rg_tests!(cdc_f64_nullable, DataType::Float64, true);
1841 cdc_single_rg_tests!(cdc_utf8_non_null, DataType::Utf8, false);
1842 cdc_single_rg_tests!(cdc_binary_nullable, DataType::Binary, true);
1843 cdc_single_rg_tests!(cdc_fsb16_nullable, DataType::FixedSizeBinary(16), true);
1844 cdc_single_rg_tests!(cdc_date32_non_null, DataType::Date32, false);
1845 cdc_single_rg_tests!(
1846 cdc_timestamp_nullable,
1847 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1848 true
1849 );
1850
1851 mod cdc_multiple_row_groups {
1854 use super::*;
1855
1856 const PART_LENGTH: usize = 128 * 1024;
1857 const EDIT_LENGTH: usize = 128;
1858 const ROW_GROUP_LENGTH: usize = 64 * 1024;
1859
1860 fn schema() -> Arc<Schema> {
1861 Arc::new(Schema::new(vec![
1862 Field::new("int32", DataType::Int32, true),
1863 Field::new("float64", DataType::Float64, true),
1864 Field::new("bool", DataType::Boolean, false),
1865 ]))
1866 }
1867
1868 #[test]
1869 fn insert_once() {
1870 let s = schema();
1871 let part1 = generate_table(&s, PART_LENGTH, 0);
1872 let part2 = generate_table(&s, PART_LENGTH, 2);
1873 let part3 = generate_table(&s, PART_LENGTH, 4);
1874 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1875 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1876
1877 let base = concat_batches([&part1, &edit1, &part2, &part3]);
1878 let modified = concat_batches([&part1, &edit1, &edit2, &part2, &part3]);
1879 assert_eq!(modified.num_rows(), base.num_rows() + EDIT_LENGTH);
1880
1881 let base_data = write_with_cdc_options(
1882 &[&base],
1883 CDC_MIN_CHUNK_SIZE,
1884 CDC_MAX_CHUNK_SIZE,
1885 Some(ROW_GROUP_LENGTH),
1886 false,
1887 );
1888 let mod_data = write_with_cdc_options(
1889 &[&modified],
1890 CDC_MIN_CHUNK_SIZE,
1891 CDC_MAX_CHUNK_SIZE,
1892 Some(ROW_GROUP_LENGTH),
1893 false,
1894 );
1895
1896 for col in 0..s.fields().len() {
1897 let base_info = get_column_info(&base_data, col);
1898 let mod_info = get_column_info(&mod_data, col);
1899
1900 assert_eq!(base_info.len(), 7, "expected 7 row groups for col {col}");
1901 assert_eq!(mod_info.len(), 7);
1902
1903 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
1905 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
1906
1907 for i in 2..mod_info.len() - 1 {
1909 assert_page_length_differences(
1910 &base_info[i],
1911 &mod_info[i],
1912 0,
1913 1,
1914 1,
1915 EDIT_LENGTH as i64,
1916 );
1917 }
1918 assert_page_length_differences(
1920 base_info.last().unwrap(),
1921 mod_info.last().unwrap(),
1922 0,
1923 1,
1924 0,
1925 EDIT_LENGTH as i64,
1926 );
1927 }
1928 }
1929
1930 #[test]
1931 fn delete_once() {
1932 let s = schema();
1933 let part1 = generate_table(&s, PART_LENGTH, 0);
1934 let part2 = generate_table(&s, PART_LENGTH, 2);
1935 let part3 = generate_table(&s, PART_LENGTH, 4);
1936 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1937 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1938
1939 let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
1940 let modified = concat_batches([&part1, &part2, &part3, &edit2]);
1941
1942 let base_data = write_with_cdc_options(
1943 &[&base],
1944 CDC_MIN_CHUNK_SIZE,
1945 CDC_MAX_CHUNK_SIZE,
1946 Some(ROW_GROUP_LENGTH),
1947 false,
1948 );
1949 let mod_data = write_with_cdc_options(
1950 &[&modified],
1951 CDC_MIN_CHUNK_SIZE,
1952 CDC_MAX_CHUNK_SIZE,
1953 Some(ROW_GROUP_LENGTH),
1954 false,
1955 );
1956
1957 for col in 0..s.fields().len() {
1958 let base_info = get_column_info(&base_data, col);
1959 let mod_info = get_column_info(&mod_data, col);
1960
1961 assert_eq!(base_info.len(), 7);
1962 assert_eq!(mod_info.len(), 7);
1963
1964 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
1965 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
1966
1967 for i in 2..mod_info.len() - 1 {
1968 assert_page_length_differences(
1969 &base_info[i],
1970 &mod_info[i],
1971 0,
1972 1,
1973 1,
1974 EDIT_LENGTH as i64,
1975 );
1976 }
1977 assert_page_length_differences(
1978 base_info.last().unwrap(),
1979 mod_info.last().unwrap(),
1980 0,
1981 0,
1982 1,
1983 EDIT_LENGTH as i64,
1984 );
1985 }
1986 }
1987
1988 #[test]
1989 fn update_once() {
1990 let s = schema();
1991 let part1 = generate_table(&s, PART_LENGTH, 0);
1992 let part2 = generate_table(&s, PART_LENGTH, 2);
1993 let part3 = generate_table(&s, PART_LENGTH, 4);
1994 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1995 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1996 let edit3 = generate_table(&s, EDIT_LENGTH, 5);
1997
1998 let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
1999 let modified = concat_batches([&part1, &edit3, &part2, &part3, &edit2]);
2000
2001 let base_data = write_with_cdc_options(
2002 &[&base],
2003 CDC_MIN_CHUNK_SIZE,
2004 CDC_MAX_CHUNK_SIZE,
2005 Some(ROW_GROUP_LENGTH),
2006 false,
2007 );
2008 let mod_data = write_with_cdc_options(
2009 &[&modified],
2010 CDC_MIN_CHUNK_SIZE,
2011 CDC_MAX_CHUNK_SIZE,
2012 Some(ROW_GROUP_LENGTH),
2013 false,
2014 );
2015
2016 for col in 0..s.fields().len() {
2017 let nullable = s.field(col).is_nullable();
2018 let base_info = get_column_info(&base_data, col);
2019 let mod_info = get_column_info(&mod_data, col);
2020
2021 assert_eq!(base_info.len(), 7);
2022 assert_eq!(mod_info.len(), 7);
2023
2024 assert_cdc_chunk_sizes(
2026 &base.column(col).slice(0, ROW_GROUP_LENGTH),
2027 &base_info[0],
2028 nullable,
2029 CDC_MIN_CHUNK_SIZE,
2030 CDC_MAX_CHUNK_SIZE,
2031 false,
2032 );
2033
2034 assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2035 assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2036
2037 assert_page_length_differences_update(&base_info[2], &mod_info[2], 1);
2039
2040 for i in 3..mod_info.len() {
2042 assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2043 }
2044 }
2045 }
2046
2047 #[test]
2048 fn append() {
2049 let s = schema();
2050 let part1 = generate_table(&s, PART_LENGTH, 0);
2051 let part2 = generate_table(&s, PART_LENGTH, 2);
2052 let part3 = generate_table(&s, PART_LENGTH, 4);
2053 let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2054 let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2055
2056 let base = concat_batches([&part1, &edit1, &part2, &part3]);
2057 let modified = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2058
2059 let base_data = write_with_cdc_options(
2060 &[&base],
2061 CDC_MIN_CHUNK_SIZE,
2062 CDC_MAX_CHUNK_SIZE,
2063 Some(ROW_GROUP_LENGTH),
2064 false,
2065 );
2066 let mod_data = write_with_cdc_options(
2067 &[&modified],
2068 CDC_MIN_CHUNK_SIZE,
2069 CDC_MAX_CHUNK_SIZE,
2070 Some(ROW_GROUP_LENGTH),
2071 false,
2072 );
2073
2074 for col in 0..s.fields().len() {
2075 let nullable = s.field(col).is_nullable();
2076 let base_info = get_column_info(&base_data, col);
2077 let mod_info = get_column_info(&mod_data, col);
2078
2079 assert_eq!(base_info.len(), 7);
2080 assert_eq!(mod_info.len(), 7);
2081
2082 assert_cdc_chunk_sizes(
2084 &base.column(col).slice(0, ROW_GROUP_LENGTH),
2085 &base_info[0],
2086 nullable,
2087 CDC_MIN_CHUNK_SIZE,
2088 CDC_MAX_CHUNK_SIZE,
2089 false,
2090 );
2091
2092 for i in 0..base_info.len() - 1 {
2094 assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2095 }
2096
2097 let bp = &base_info.last().unwrap().page_lengths;
2099 let mp = &mod_info.last().unwrap().page_lengths;
2100 assert!(mp.len() >= bp.len());
2101 for i in 0..bp.len() - 1 {
2102 assert_eq!(bp[i], mp[i]);
2103 }
2104 }
2105 }
2106 }
2107
2108 #[test]
2111 fn test_cdc_array_offsets_direct() {
2112 use crate::basic::Type as PhysicalType;
2113 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
2114
2115 let options = CdcOptions {
2116 min_chunk_size: CDC_MIN_CHUNK_SIZE,
2117 max_chunk_size: CDC_MAX_CHUNK_SIZE,
2118 norm_level: 0,
2119 };
2120 let desc = {
2121 let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
2122 .build()
2123 .unwrap();
2124 ColumnDescriptor::new(Arc::new(tp), 0, 0, ColumnPath::new(vec![]))
2125 };
2126
2127 let bpr = bytes_per_record(&DataType::Int32, false);
2128 let n = CDC_PART_SIZE / bpr;
2129 let offset = 10usize;
2130
2131 let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as i32).collect();
2132 let mut chunker = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2133 let chunks = chunker.get_arrow_chunks(None, None, &array).unwrap();
2134
2135 let sliced = array.slice(offset, n - offset);
2136 let mut chunker2 = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2137 let chunks2 = chunker2.get_arrow_chunks(None, None, &sliced).unwrap();
2138
2139 let values: Vec<usize> = chunks.iter().map(|c| c.num_values).collect();
2140 let values2: Vec<usize> = chunks2.iter().map(|c| c.num_values).collect();
2141
2142 assert!(values.len() > 1, "expected multiple chunks, got {values:?}");
2143 assert_eq!(values.len(), values2.len(), "chunk count must match");
2144
2145 assert_eq!(
2146 values[0] - values2[0],
2147 offset,
2148 "offsetted first chunk should be {offset} values shorter"
2149 );
2150 assert_eq!(
2151 &values[1..],
2152 &values2[1..],
2153 "all chunks after the first must be identical"
2154 );
2155 }
2156}