Skip to main content

parquet/column/chunker/
cdc.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::{ParquetError, Result};
19use crate::file::properties::CdcOptions;
20use crate::schema::types::ColumnDescriptor;
21
22use super::CdcChunk;
23use super::cdc_generated::{GEARHASH_TABLE, NUM_GEARHASH_TABLES};
24
25/// CDC (Content-Defined Chunking) divides data into variable-sized chunks based on
26/// content rather than fixed-size boundaries.
27///
28/// For example, given this sequence of values in a column:
29///
30/// ```text
31/// File1:    [1,2,3,   4,5,6,   7,8,9]
32///            chunk1   chunk2   chunk3
33/// ```
34///
35/// If a value is inserted between 3 and 4:
36///
37/// ```text
38/// File2:    [1,2,3,0,   4,5,6,   7,8,9]
39///            new-chunk  chunk2   chunk3
40/// ```
41///
42/// The chunking process adjusts to maintain stable boundaries across data modifications.
43/// Each chunk defines a new parquet data page which is contiguously written to the file.
44/// Since each page is compressed independently, the files' contents look like:
45///
46/// ```text
47/// File1:    [Page1][Page2][Page3]...
48/// File2:    [Page4][Page2][Page3]...
49/// ```
50///
51/// When uploaded to a content-addressable storage (CAS) system, the CAS splits the byte
52/// stream into content-defined blobs with unique identifiers. Identical blobs are stored
53/// only once, so Page2 and Page3 are deduplicated across File1 and File2.
54///
55/// ## Implementation
56///
57/// Only the parquet writer needs to be aware of content-defined chunking; the reader is
58/// unaffected. Each parquet column writer holds a `ContentDefinedChunker` instance
59/// depending on the writer's properties. The chunker's state is maintained across the
60/// entire column without being reset between pages and row groups.
61///
62/// This implements a [FastCDC]-inspired algorithm using gear hashing. The input data is
63/// fed byte-by-byte into a rolling hash; when the hash matches a predefined mask, a new
64/// chunk boundary candidate is recorded. To reduce the exponential variance of chunk
65/// sizes inherent in a single gear hash, the algorithm requires **8 consecutive mask
66/// matches** — each against a different pre-computed gear hash table — before committing
67/// to a boundary. This [central-limit-theorem normalization] makes the chunk size
68/// distribution approximately normal between `min_chunk_size` and `max_chunk_size`.
69///
70/// The chunker receives the record-shredded column data (def_levels, rep_levels, values)
71/// and iterates over the (def_level, rep_level, value) triplets while adjusting the
72/// column-global rolling hash. Whenever the rolling hash matches, the chunker creates a
73/// new chunk. For nested data (lists, maps, structs) chunk boundaries are restricted to
74/// top-level record boundaries (`rep_level == 0`) so that a nested row is never split
75/// across chunks.
76///
77/// Note that boundaries are deterministically calculated exclusively based on the data
78/// itself, so the same data always produces the same chunks given the same configuration.
79///
80/// Ported from the C++ implementation in apache/arrow#45360
81/// (`cpp/src/parquet/chunker_internal.cc`).
82///
83/// [FastCDC]: https://www.usenix.org/conference/atc16/technical-sessions/presentation/xia
84/// [central-limit-theorem normalization]: https://www.cidrdb.org/cidr2023/papers/p43-low.pdf
85#[derive(Debug)]
86pub(crate) struct ContentDefinedChunker {
87    /// Maximum definition level for this column.
88    max_def_level: i16,
89    /// Maximum repetition level for this column.
90    max_rep_level: i16,
91    /// Definition level at the nearest REPEATED ancestor.
92    repeated_ancestor_def_level: i16,
93
94    /// Minimum chunk size in bytes.
95    /// The rolling hash will not be updated until this size is reached for each chunk.
96    /// All data sent through the hash function counts towards the chunk size, including
97    /// definition and repetition levels if present.
98    min_chunk_size: i64,
99    /// Maximum chunk size in bytes.
100    /// A new chunk is created whenever the chunk size exceeds this value. The chunk size
101    /// distribution approximates a normal distribution between `min_chunk_size` and
102    /// `max_chunk_size`. Note that the parquet writer has a related `data_pagesize`
103    /// property that controls the maximum size of a parquet data page after encoding.
104    /// While setting `data_pagesize` smaller than `max_chunk_size` doesn't affect
105    /// chunking effectiveness, it results in more small parquet data pages.
106    max_chunk_size: i64,
107    /// Mask for matching against the rolling hash.
108    rolling_hash_mask: u64,
109
110    /// Rolling hash state, never reset — initialized once for the entire column.
111    rolling_hash: u64,
112    /// Whether the rolling hash has matched the mask since the last chunk boundary.
113    has_matched: bool,
114    /// Current run count for the central-limit-theorem normalization.
115    nth_run: usize,
116    /// Current chunk size in bytes.
117    chunk_size: i64,
118}
119
120impl ContentDefinedChunker {
121    pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self> {
122        let rolling_hash_mask = Self::calculate_mask(
123            options.min_chunk_size as i64,
124            options.max_chunk_size as i64,
125            options.norm_level,
126        )?;
127        Ok(Self {
128            max_def_level: desc.max_def_level(),
129            max_rep_level: desc.max_rep_level(),
130            repeated_ancestor_def_level: desc.repeated_ancestor_def_level(),
131            min_chunk_size: options.min_chunk_size as i64,
132            max_chunk_size: options.max_chunk_size as i64,
133            rolling_hash_mask,
134            rolling_hash: 0,
135            has_matched: false,
136            nth_run: 0,
137            chunk_size: 0,
138        })
139    }
140
141    /// Calculate the mask used to determine chunk boundaries from the rolling hash.
142    ///
143    /// The mask is calculated so that the expected chunk size distribution approximates
144    /// a normal distribution between min and max chunk sizes.
145    fn calculate_mask(min_chunk_size: i64, max_chunk_size: i64, norm_level: i32) -> Result<u64> {
146        if min_chunk_size < 0 {
147            return Err(ParquetError::General(
148                "min_chunk_size must be non-negative".to_string(),
149            ));
150        }
151        if max_chunk_size <= min_chunk_size {
152            return Err(ParquetError::General(
153                "max_chunk_size must be greater than min_chunk_size".to_string(),
154            ));
155        }
156
157        let avg_chunk_size = (min_chunk_size + max_chunk_size) / 2;
158        // Target size after subtracting the min-size skip window and dividing by the
159        // number of hash tables (for central-limit-theorem normalization).
160        let target_size = (avg_chunk_size - min_chunk_size) / NUM_GEARHASH_TABLES as i64;
161
162        // floor(log2(target_size)) — equivalent to C++ NumRequiredBits(target_size) - 1
163        let mask_bits = if target_size > 0 {
164            63 - target_size.leading_zeros() as i32
165        } else {
166            0
167        };
168
169        let effective_bits = mask_bits - norm_level;
170
171        if !(1..=63).contains(&effective_bits) {
172            return Err(ParquetError::General(format!(
173                "The number of bits in the CDC mask must be between 1 and 63, got {effective_bits}"
174            )));
175        }
176
177        // Create the mask by setting the top `effective_bits` bits.
178        Ok(u64::MAX << (64 - effective_bits))
179    }
180
181    /// Feed raw bytes into the rolling hash.
182    ///
183    /// The byte count always accumulates toward `chunk_size`, but the actual hash
184    /// update is skipped until `min_chunk_size` has been reached. This "skip window"
185    /// is the FastCDC optimization that prevents boundaries from appearing too early
186    /// in a chunk.
187    #[inline]
188    fn roll(&mut self, bytes: &[u8]) {
189        self.chunk_size += bytes.len() as i64;
190        if self.chunk_size < self.min_chunk_size {
191            return;
192        }
193        for &b in bytes {
194            self.rolling_hash = self
195                .rolling_hash
196                .wrapping_shl(1)
197                .wrapping_add(GEARHASH_TABLE[self.nth_run][b as usize]);
198            self.has_matched =
199                self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
200        }
201    }
202
203    /// Feed exactly `N` bytes into the rolling hash (compile-time width).
204    ///
205    /// Like [`roll`](Self::roll), but the byte count is known at compile time,
206    /// allowing the compiler to unroll the inner loop.
207    #[inline(always)]
208    fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N]) {
209        self.chunk_size += N as i64;
210        if self.chunk_size < self.min_chunk_size {
211            return;
212        }
213        for j in 0..N {
214            self.rolling_hash = self
215                .rolling_hash
216                .wrapping_shl(1)
217                .wrapping_add(GEARHASH_TABLE[self.nth_run][bytes[j] as usize]);
218            self.has_matched =
219                self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
220        }
221    }
222
223    /// Feed a definition or repetition level (i16) into the rolling hash.
224    #[inline]
225    fn roll_level(&mut self, level: i16) {
226        self.roll_fixed(&level.to_le_bytes());
227    }
228
229    /// Check whether a new chunk boundary should be created.
230    ///
231    /// A boundary is created when **either** of two conditions holds:
232    ///
233    /// 1. **CLT normalization**: The rolling hash has matched the mask (`has_matched`)
234    ///    *and* this is the 8th consecutive such match (`nth_run` reaches
235    ///    `NUM_GEARHASH_TABLES`). Each match advances to the next gear hash table, so
236    ///    8 independent matches are required. A single hash table would yield
237    ///    exponentially distributed chunk sizes; requiring 8 independent matches
238    ///    approximates a normal (Gaussian) distribution by the central limit theorem.
239    ///
240    /// 2. **Hard size limit**: `chunk_size` has reached `max_chunk_size`. This caps
241    ///    chunk size even if the CLT normalization sequence has not completed.
242    ///
243    /// Note: when `max_chunk_size` forces a boundary, `nth_run` is **not** reset, so
244    /// the CLT sequence continues from where it left off in the next chunk. This
245    /// matches the C++ behavior.
246    #[inline]
247    fn need_new_chunk(&mut self) -> bool {
248        if self.has_matched {
249            self.has_matched = false;
250            self.nth_run += 1;
251            if self.nth_run >= NUM_GEARHASH_TABLES {
252                self.nth_run = 0;
253                self.chunk_size = 0;
254                return true;
255            }
256        }
257        if self.chunk_size >= self.max_chunk_size {
258            self.chunk_size = 0;
259            return true;
260        }
261        false
262    }
263
264    /// Compute chunk boundaries for the given column data.
265    ///
266    /// The chunking state is maintained across the entire column without being
267    /// reset between pages and row groups. This enables the chunking process to
268    /// be continued between different write calls.
269    ///
270    /// We go over the (def_level, rep_level, value) triplets one by one while
271    /// adjusting the column-global rolling hash based on the triplet. Whenever
272    /// the rolling hash matches a predefined mask it sets `has_matched` to true.
273    ///
274    /// After each triplet [`need_new_chunk`](Self::need_new_chunk) is called to
275    /// evaluate if we need to create a new chunk.
276    fn calculate<F>(
277        &mut self,
278        def_levels: Option<&[i16]>,
279        rep_levels: Option<&[i16]>,
280        num_levels: usize,
281        mut roll_value: F,
282    ) -> Vec<CdcChunk>
283    where
284        F: FnMut(&mut Self, usize),
285    {
286        let has_def_levels = self.max_def_level > 0;
287        let has_rep_levels = self.max_rep_level > 0;
288
289        let mut chunks = Vec::new();
290        let mut prev_offset: usize = 0;
291        let mut prev_value_offset: usize = 0;
292        // Total number of values seen; for non-nested data this equals num_levels.
293        let mut total_values: usize = num_levels;
294
295        if !has_rep_levels && !has_def_levels {
296            // Fastest path: non-nested, non-null data.
297            for offset in 0..num_levels {
298                roll_value(self, offset);
299                if self.need_new_chunk() {
300                    chunks.push(CdcChunk {
301                        level_offset: prev_offset,
302                        value_offset: prev_offset,
303                        num_levels: offset - prev_offset,
304                        num_values: offset - prev_offset,
305                    });
306                    prev_offset = offset;
307                }
308            }
309            // Set the previous value offset to add the last chunk.
310            prev_value_offset = prev_offset;
311        } else if !has_rep_levels {
312            // Non-nested data with nulls.
313            let def_levels = def_levels.expect("def_levels required when max_def_level > 0");
314            #[allow(clippy::needless_range_loop)]
315            for offset in 0..num_levels {
316                let def_level = def_levels[offset];
317                self.roll_level(def_level);
318                if def_level == self.max_def_level {
319                    roll_value(self, offset);
320                }
321                if self.need_new_chunk() {
322                    chunks.push(CdcChunk {
323                        level_offset: prev_offset,
324                        value_offset: prev_offset,
325                        num_levels: offset - prev_offset,
326                        num_values: offset - prev_offset,
327                    });
328                    prev_offset = offset;
329                }
330            }
331            // Set the previous value offset to add the last chunk.
332            prev_value_offset = prev_offset;
333        } else {
334            // Nested data with nulls.
335            let def_levels = def_levels.expect("def_levels required for nested data");
336            let rep_levels = rep_levels.expect("rep_levels required for nested data");
337            let mut value_offset: usize = 0;
338
339            for offset in 0..num_levels {
340                let def_level = def_levels[offset];
341                let rep_level = rep_levels[offset];
342
343                self.roll_level(def_level);
344                self.roll_level(rep_level);
345                if def_level == self.max_def_level {
346                    roll_value(self, value_offset);
347                }
348
349                if rep_level == 0 && self.need_new_chunk() {
350                    // If we are at a record boundary and need a new chunk, create one.
351                    let levels_to_write = offset - prev_offset;
352                    if levels_to_write > 0 {
353                        chunks.push(CdcChunk {
354                            level_offset: prev_offset,
355                            value_offset: prev_value_offset,
356                            num_levels: levels_to_write,
357                            num_values: value_offset - prev_value_offset,
358                        });
359                        prev_offset = offset;
360                        prev_value_offset = value_offset;
361                    }
362                }
363                if def_level >= self.repeated_ancestor_def_level {
364                    // We only increment the value offset if we have a leaf value.
365                    value_offset += 1;
366                }
367            }
368            total_values = value_offset;
369        }
370
371        // Add the last chunk if we have any levels left.
372        if prev_offset < num_levels {
373            chunks.push(CdcChunk {
374                level_offset: prev_offset,
375                value_offset: prev_value_offset,
376                num_levels: num_levels - prev_offset,
377                num_values: total_values - prev_value_offset,
378            });
379        }
380
381        #[cfg(debug_assertions)]
382        self.validate_chunks(&chunks, num_levels, total_values);
383
384        chunks
385    }
386
387    /// Compute CDC chunk boundaries by dispatching on the Arrow array's data type
388    /// to feed value bytes into the rolling hash.
389    #[cfg(feature = "arrow")]
390    pub(crate) fn get_arrow_chunks(
391        &mut self,
392        def_levels: Option<&[i16]>,
393        rep_levels: Option<&[i16]>,
394        array: &dyn arrow_array::Array,
395    ) -> Result<Vec<CdcChunk>> {
396        use arrow_array::cast::AsArray;
397        use arrow_schema::DataType;
398
399        let num_levels = match def_levels {
400            Some(def_levels) => def_levels.len(),
401            None => array.len(),
402        };
403
404        macro_rules! fixed_width {
405            ($N:literal) => {{
406                let data = array.to_data();
407                let buffer = data.buffers()[0].as_slice();
408                let values = &buffer[data.offset() * $N..];
409                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
410                    let offset = i * $N;
411                    let slice = &values[offset..offset + $N];
412                    c.roll_fixed::<$N>(slice.try_into().unwrap());
413                })
414            }};
415        }
416
417        macro_rules! binary_like {
418            ($a:expr) => {{
419                let a = $a;
420                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
421                    c.roll(a.value(i).as_ref());
422                })
423            }};
424        }
425
426        let dtype = array.data_type();
427        let chunks = match dtype {
428            DataType::Null => self.calculate(def_levels, rep_levels, num_levels, |_, _| {}),
429            DataType::Boolean => {
430                let a = array.as_boolean();
431                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
432                    c.roll_fixed(&[a.value(i) as u8]);
433                })
434            }
435            DataType::Int8 | DataType::UInt8 => fixed_width!(1),
436            DataType::Int16 | DataType::UInt16 | DataType::Float16 => fixed_width!(2),
437            DataType::Int32
438            | DataType::UInt32
439            | DataType::Float32
440            | DataType::Date32
441            | DataType::Time32(_)
442            | DataType::Interval(arrow_schema::IntervalUnit::YearMonth)
443            | DataType::Decimal32(_, _) => fixed_width!(4),
444            DataType::Int64
445            | DataType::UInt64
446            | DataType::Float64
447            | DataType::Date64
448            | DataType::Time64(_)
449            | DataType::Timestamp(_, _)
450            | DataType::Duration(_)
451            | DataType::Interval(arrow_schema::IntervalUnit::DayTime)
452            | DataType::Decimal64(_, _) => fixed_width!(8),
453            DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
454            | DataType::Decimal128(_, _) => fixed_width!(16),
455            DataType::Decimal256(_, _) => fixed_width!(32),
456            DataType::FixedSizeBinary(_) => binary_like!(array.as_fixed_size_binary()),
457            DataType::Binary => binary_like!(array.as_binary::<i32>()),
458            DataType::LargeBinary => binary_like!(array.as_binary::<i64>()),
459            DataType::Utf8 => binary_like!(array.as_string::<i32>()),
460            DataType::LargeUtf8 => binary_like!(array.as_string::<i64>()),
461            DataType::BinaryView => binary_like!(array.as_binary_view()),
462            DataType::Utf8View => binary_like!(array.as_string_view()),
463            DataType::Dictionary(_, _) => {
464                let dict = array.as_any_dictionary();
465                self.get_arrow_chunks(def_levels, rep_levels, dict.keys())?
466            }
467            _ => {
468                return Err(ParquetError::General(format!(
469                    "content-defined chunking is not supported for data type {dtype:?}",
470                )));
471            }
472        };
473        Ok(chunks)
474    }
475
476    #[cfg(debug_assertions)]
477    fn validate_chunks(&self, chunks: &[CdcChunk], num_levels: usize, total_values: usize) {
478        assert!(!chunks.is_empty(), "chunks must be non-empty");
479
480        let first = &chunks[0];
481        assert_eq!(first.level_offset, 0, "first chunk must start at level 0");
482        assert_eq!(first.value_offset, 0, "first chunk must start at value 0");
483
484        let mut sum_levels = first.num_levels;
485        let mut sum_values = first.num_values;
486        for i in 1..chunks.len() {
487            let chunk = &chunks[i];
488            let prev = &chunks[i - 1];
489            assert!(chunk.num_levels > 0, "chunk must have levels");
490            assert_eq!(
491                chunk.level_offset,
492                prev.level_offset + prev.num_levels,
493                "level offsets must be contiguous"
494            );
495            assert_eq!(
496                chunk.value_offset,
497                prev.value_offset + prev.num_values,
498                "value offsets must be contiguous"
499            );
500            sum_levels += chunk.num_levels;
501            sum_values += chunk.num_values;
502        }
503        assert_eq!(sum_levels, num_levels, "chunks must cover all levels");
504        assert_eq!(sum_values, total_values, "chunks must cover all values");
505
506        let last = chunks.last().unwrap();
507        assert_eq!(
508            last.level_offset + last.num_levels,
509            num_levels,
510            "last chunk must end at num_levels"
511        );
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::basic::Type as PhysicalType;
519    use crate::schema::types::{ColumnPath, Type};
520    use std::sync::Arc;
521
522    fn make_desc(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
523        let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
524            .build()
525            .unwrap();
526        ColumnDescriptor::new(
527            Arc::new(tp),
528            max_def_level,
529            max_rep_level,
530            ColumnPath::new(vec![]),
531        )
532    }
533
534    #[test]
535    fn test_calculate_mask_defaults() {
536        let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 0).unwrap();
537        // avg = 640 KiB, target = (640-256)*1024/8 = 49152, log2(49152) = 15
538        // mask = u64::MAX << (64 - 15) = top 15 bits set
539        let expected = u64::MAX << (64 - 15);
540        assert_eq!(mask, expected);
541    }
542
543    #[test]
544    fn test_calculate_mask_with_norm_level() {
545        let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 1).unwrap();
546        let expected = u64::MAX << (64 - 14);
547        assert_eq!(mask, expected);
548    }
549
550    #[test]
551    fn test_calculate_mask_invalid() {
552        assert!(ContentDefinedChunker::calculate_mask(-1, 100, 0).is_err());
553        assert!(ContentDefinedChunker::calculate_mask(100, 50, 0).is_err());
554        assert!(ContentDefinedChunker::calculate_mask(100, 100, 0).is_err());
555    }
556
557    #[test]
558    fn test_non_nested_non_null_single_chunk() {
559        let options = CdcOptions {
560            min_chunk_size: 8,
561            max_chunk_size: 1024,
562            norm_level: 0,
563        };
564        let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
565
566        // Write a small amount of data — should produce exactly 1 chunk.
567        let num_values = 4;
568        let chunks = chunker.calculate(None, None, num_values, |c, i| {
569            c.roll_fixed::<4>(&(i as i32).to_le_bytes());
570        });
571        assert_eq!(chunks.len(), 1);
572        assert_eq!(chunks[0].level_offset, 0);
573        assert_eq!(chunks[0].value_offset, 0);
574        assert_eq!(chunks[0].num_levels, 4);
575    }
576
577    #[test]
578    fn test_max_chunk_size_forces_boundary() {
579        let options = CdcOptions {
580            min_chunk_size: 256,
581            max_chunk_size: 1024,
582            norm_level: 0,
583        };
584        let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
585
586        // Write enough data to exceed max_chunk_size multiple times.
587        // Each i32 = 4 bytes, max_chunk_size=1024, so ~256 values per chunk max.
588        let num_values = 2000;
589        let chunks = chunker.calculate(None, None, num_values, |c, i| {
590            c.roll_fixed::<4>(&(i as i32).to_le_bytes());
591        });
592
593        // Should have multiple chunks
594        assert!(chunks.len() > 1);
595
596        // Verify contiguity
597        let mut total_levels = 0;
598        for (i, chunk) in chunks.iter().enumerate() {
599            assert_eq!(chunk.level_offset, total_levels);
600            if i < chunks.len() - 1 {
601                assert!(chunk.num_levels > 0);
602            }
603            total_levels += chunk.num_levels;
604        }
605        assert_eq!(total_levels, num_values);
606    }
607
608    #[test]
609    fn test_deterministic_chunks() {
610        let options = CdcOptions {
611            min_chunk_size: 4,
612            max_chunk_size: 64,
613            norm_level: 0,
614        };
615
616        let roll = |c: &mut ContentDefinedChunker, i: usize| {
617            c.roll_fixed::<8>(&(i as i64).to_le_bytes());
618        };
619
620        let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
621        let chunks1 = chunker1.calculate(None, None, 200, roll);
622
623        let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
624        let chunks2 = chunker2.calculate(None, None, 200, roll);
625
626        assert_eq!(chunks1.len(), chunks2.len());
627        for (a, b) in chunks1.iter().zip(chunks2.iter()) {
628            assert_eq!(a.level_offset, b.level_offset);
629            assert_eq!(a.value_offset, b.value_offset);
630            assert_eq!(a.num_levels, b.num_levels);
631        }
632    }
633
634    #[test]
635    fn test_nullable_non_nested() {
636        let options = CdcOptions {
637            min_chunk_size: 4,
638            max_chunk_size: 64,
639            norm_level: 0,
640        };
641        let mut chunker = ContentDefinedChunker::new(&make_desc(1, 0), &options).unwrap();
642
643        let num_levels = 20;
644        // def_level=1 means non-null, def_level=0 means null
645        let def_levels: Vec<i16> = (0..num_levels)
646            .map(|i| if i % 3 == 0 { 0 } else { 1 })
647            .collect();
648
649        let chunks = chunker.calculate(Some(&def_levels), None, num_levels, |c, i| {
650            c.roll_fixed::<4>(&(i as i32).to_le_bytes());
651        });
652
653        assert!(!chunks.is_empty());
654        let total: usize = chunks.iter().map(|c| c.num_levels).sum();
655        assert_eq!(total, num_levels);
656    }
657}
658
659/// Integration tests that exercise CDC through the Arrow writer/reader roundtrip.
660/// Ported from the C++ test suite in `chunker_internal_test.cc`.
661#[cfg(all(test, feature = "arrow"))]
662mod arrow_tests {
663    use std::borrow::Borrow;
664    use std::sync::Arc;
665
666    use arrow_array::cast::AsArray;
667    use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
668    use arrow_schema::{DataType, Field, Schema};
669
670    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
671    use crate::arrow::arrow_writer::ArrowWriter;
672    use crate::file::properties::{CdcOptions, WriterProperties};
673    use crate::file::reader::{FileReader, SerializedFileReader};
674
675    // --- Constants matching C++ TestCDCSingleRowGroup ---
676
677    const CDC_MIN_CHUNK_SIZE: usize = 4 * 1024;
678    const CDC_MAX_CHUNK_SIZE: usize = 16 * 1024;
679    const CDC_PART_SIZE: usize = 128 * 1024;
680    const CDC_EDIT_SIZE: usize = 128;
681    const CDC_ROW_GROUP_LENGTH: usize = 1024 * 1024;
682
683    // --- Helpers ---
684
685    /// Deterministic hash function matching the C++ test generator.
686    fn test_hash(seed: u64, index: u64) -> u64 {
687        let mut h = (index.wrapping_add(seed)).wrapping_mul(0xc4ceb9fe1a85ec53u64);
688        h ^= h >> 33;
689        h = h.wrapping_mul(0xff51afd7ed558ccdu64);
690        h ^= h >> 33;
691        h = h.wrapping_mul(0xc4ceb9fe1a85ec53u64);
692        h ^= h >> 33;
693        h
694    }
695
696    /// Generate a deterministic array for any supported data type, matching C++ `GenerateArray`.
697    fn generate_array(dtype: &DataType, nullable: bool, length: usize, seed: u64) -> ArrayRef {
698        macro_rules! gen_primitive {
699            ($array_type:ty, $cast:expr) => {{
700                if nullable {
701                    let arr: $array_type = (0..length)
702                        .map(|i| {
703                            let val = test_hash(seed, i as u64);
704                            if val % 10 == 0 {
705                                None
706                            } else {
707                                Some($cast(val))
708                            }
709                        })
710                        .collect();
711                    Arc::new(arr) as ArrayRef
712                } else {
713                    let arr: $array_type = (0..length)
714                        .map(|i| Some($cast(test_hash(seed, i as u64))))
715                        .collect();
716                    Arc::new(arr) as ArrayRef
717                }
718            }};
719        }
720
721        match dtype {
722            DataType::Boolean => {
723                if nullable {
724                    let arr: BooleanArray = (0..length)
725                        .map(|i| {
726                            let val = test_hash(seed, i as u64);
727                            if val % 10 == 0 {
728                                None
729                            } else {
730                                Some(val % 2 == 0)
731                            }
732                        })
733                        .collect();
734                    Arc::new(arr)
735                } else {
736                    let arr: BooleanArray = (0..length)
737                        .map(|i| Some(test_hash(seed, i as u64) % 2 == 0))
738                        .collect();
739                    Arc::new(arr)
740                }
741            }
742            DataType::Int32 => gen_primitive!(Int32Array, |v: u64| v as i32),
743            DataType::Int64 => {
744                gen_primitive!(arrow_array::Int64Array, |v: u64| v as i64)
745            }
746            DataType::Float64 => {
747                gen_primitive!(arrow_array::Float64Array, |v: u64| (v % 100000) as f64
748                    / 1000.0)
749            }
750            DataType::Utf8 => {
751                let arr: arrow_array::StringArray = if nullable {
752                    (0..length)
753                        .map(|i| {
754                            let val = test_hash(seed, i as u64);
755                            if val % 10 == 0 {
756                                None
757                            } else {
758                                Some(format!("str_{val}"))
759                            }
760                        })
761                        .collect()
762                } else {
763                    (0..length)
764                        .map(|i| Some(format!("str_{}", test_hash(seed, i as u64))))
765                        .collect()
766                };
767                Arc::new(arr)
768            }
769            DataType::Binary => {
770                let arr: arrow_array::BinaryArray = if nullable {
771                    (0..length)
772                        .map(|i| {
773                            let val = test_hash(seed, i as u64);
774                            if val % 10 == 0 {
775                                None
776                            } else {
777                                Some(format!("bin_{val}").into_bytes())
778                            }
779                        })
780                        .collect()
781                } else {
782                    (0..length)
783                        .map(|i| Some(format!("bin_{}", test_hash(seed, i as u64)).into_bytes()))
784                        .collect()
785                };
786                Arc::new(arr)
787            }
788            DataType::FixedSizeBinary(size) => {
789                let size = *size;
790                let mut builder = arrow_array::builder::FixedSizeBinaryBuilder::new(size);
791                for i in 0..length {
792                    let val = test_hash(seed, i as u64);
793                    if nullable && val % 10 == 0 {
794                        builder.append_null();
795                    } else {
796                        let s = format!("bin_{val}");
797                        let bytes = s.as_bytes();
798                        let mut buf = vec![0u8; size as usize];
799                        let copy_len = bytes.len().min(size as usize);
800                        buf[..copy_len].copy_from_slice(&bytes[..copy_len]);
801                        builder.append_value(&buf).unwrap();
802                    }
803                }
804                Arc::new(builder.finish())
805            }
806            DataType::Date32 => {
807                gen_primitive!(arrow_array::Date32Array, |v: u64| v as i32)
808            }
809            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
810                gen_primitive!(arrow_array::TimestampNanosecondArray, |v: u64| v as i64)
811            }
812            _ => panic!("Unsupported test data type: {dtype:?}"),
813        }
814    }
815
816    /// Generate a RecordBatch with the given schema, matching C++ `GenerateTable`.
817    fn generate_table(schema: &Arc<Schema>, length: usize, seed: u64) -> RecordBatch {
818        let arrays: Vec<ArrayRef> = schema
819            .fields()
820            .iter()
821            .enumerate()
822            .map(|(i, field)| {
823                generate_array(
824                    field.data_type(),
825                    field.is_nullable(),
826                    length,
827                    seed + i as u64 * 10,
828                )
829            })
830            .collect();
831        RecordBatch::try_new(schema.clone(), arrays).unwrap()
832    }
833
834    /// Compute the CDC byte width for a data type, matching C++ `bytes_per_record`.
835    /// Returns 0 for variable-length types.
836    fn cdc_byte_width(dtype: &DataType) -> usize {
837        match dtype {
838            DataType::Boolean => 1,
839            DataType::Int8 | DataType::UInt8 => 1,
840            DataType::Int16 | DataType::UInt16 | DataType::Float16 => 2,
841            DataType::Int32
842            | DataType::UInt32
843            | DataType::Float32
844            | DataType::Date32
845            | DataType::Time32(_) => 4,
846            DataType::Int64
847            | DataType::UInt64
848            | DataType::Float64
849            | DataType::Date64
850            | DataType::Time64(_)
851            | DataType::Timestamp(_, _)
852            | DataType::Duration(_) => 8,
853            DataType::Decimal128(_, _) => 16,
854            DataType::Decimal256(_, _) => 32,
855            DataType::FixedSizeBinary(n) => *n as usize,
856            _ => 0, // variable-length
857        }
858    }
859
860    /// Compute bytes_per_record for determining part/edit lengths, matching C++.
861    fn bytes_per_record(dtype: &DataType, nullable: bool) -> usize {
862        let bw = cdc_byte_width(dtype);
863        if bw > 0 {
864            if nullable { bw + 2 } else { bw }
865        } else {
866            16 // variable-length fallback, matching C++
867        }
868    }
869
870    /// Compute the CDC chunk size for an array slice, matching C++ `CalculateCdcSize`.
871    fn calculate_cdc_size(array: &dyn Array, nullable: bool) -> i64 {
872        let dtype = array.data_type();
873        let bw = cdc_byte_width(dtype);
874        let result = if bw > 0 {
875            // Fixed-width: count only non-null values
876            let valid_count = array.len() - array.null_count();
877            (valid_count * bw) as i64
878        } else {
879            // Variable-length: sum of actual byte lengths
880            match dtype {
881                DataType::Utf8 => {
882                    let a = array.as_string::<i32>();
883                    (0..a.len())
884                        .filter(|&i| a.is_valid(i))
885                        .map(|i| a.value(i).len() as i64)
886                        .sum()
887                }
888                DataType::Binary => {
889                    let a = array.as_binary::<i32>();
890                    (0..a.len())
891                        .filter(|&i| a.is_valid(i))
892                        .map(|i| a.value(i).len() as i64)
893                        .sum()
894                }
895                DataType::LargeBinary => {
896                    let a = array.as_binary::<i64>();
897                    (0..a.len())
898                        .filter(|&i| a.is_valid(i))
899                        .map(|i| a.value(i).len() as i64)
900                        .sum()
901                }
902                _ => panic!("CDC size calculation not implemented for {dtype:?}"),
903            }
904        };
905
906        if nullable {
907            // Add 2 bytes per element for definition levels
908            result + array.len() as i64 * 2
909        } else {
910            result
911        }
912    }
913
914    /// Page-level metadata for a single column within a row group.
915    struct ColumnInfo {
916        page_lengths: Vec<i64>,
917        has_dictionary_page: bool,
918    }
919
920    /// Extract per-row-group column info from Parquet data.
921    fn get_column_info(data: &[u8], column_index: usize) -> Vec<ColumnInfo> {
922        let reader = SerializedFileReader::new(bytes::Bytes::from(data.to_vec())).unwrap();
923        let metadata = reader.metadata();
924        let mut result = Vec::new();
925        for rg in 0..metadata.num_row_groups() {
926            let rg_reader = reader.get_row_group(rg).unwrap();
927            let col_reader = rg_reader.get_column_page_reader(column_index).unwrap();
928            let mut info = ColumnInfo {
929                page_lengths: Vec::new(),
930                has_dictionary_page: false,
931            };
932            for page in col_reader {
933                let page = page.unwrap();
934                match page.page_type() {
935                    crate::basic::PageType::DATA_PAGE | crate::basic::PageType::DATA_PAGE_V2 => {
936                        info.page_lengths.push(page.num_values() as i64);
937                    }
938                    crate::basic::PageType::DICTIONARY_PAGE => {
939                        info.has_dictionary_page = true;
940                    }
941                    _ => {}
942                }
943            }
944            result.push(info);
945        }
946        result
947    }
948
949    /// Assert that CDC chunk sizes are within the expected range.
950    /// Equivalent to C++ `AssertContentDefinedChunkSizes`.
951    fn assert_cdc_chunk_sizes(
952        array: &ArrayRef,
953        info: &ColumnInfo,
954        nullable: bool,
955        min_chunk_size: usize,
956        max_chunk_size: usize,
957        expect_dictionary_page: bool,
958    ) {
959        // Boolean and FixedSizeBinary never produce dictionary pages (matching C++)
960        let expect_dict = match array.data_type() {
961            DataType::Boolean | DataType::FixedSizeBinary(_) => false,
962            _ => expect_dictionary_page,
963        };
964        assert_eq!(
965            info.has_dictionary_page,
966            expect_dict,
967            "dictionary page mismatch for {:?}",
968            array.data_type()
969        );
970
971        let page_lengths = &info.page_lengths;
972        assert!(
973            page_lengths.len() > 1,
974            "CDC should produce multiple pages, got {page_lengths:?}"
975        );
976
977        let bw = cdc_byte_width(array.data_type());
978        // Only do exact CDC size validation for fixed-width and base binary-like types
979        if bw > 0
980            || matches!(
981                array.data_type(),
982                DataType::Utf8 | DataType::Binary | DataType::LargeBinary
983            )
984        {
985            let mut offset = 0i64;
986            for (i, &page_len) in page_lengths.iter().enumerate() {
987                let slice = array.slice(offset as usize, page_len as usize);
988                let cdc_size = calculate_cdc_size(slice.as_ref(), nullable);
989                if i < page_lengths.len() - 1 {
990                    assert!(
991                        cdc_size >= min_chunk_size as i64,
992                        "Page {i}: CDC size {cdc_size} < min {min_chunk_size}, pages={page_lengths:?}"
993                    );
994                }
995                assert!(
996                    cdc_size <= max_chunk_size as i64,
997                    "Page {i}: CDC size {cdc_size} > max {max_chunk_size}, pages={page_lengths:?}"
998                );
999                offset += page_len;
1000            }
1001            assert_eq!(
1002                offset,
1003                array.len() as i64,
1004                "page lengths must sum to array length"
1005            );
1006        }
1007    }
1008
1009    /// Write batches with CDC options and validate roundtrip.
1010    /// Matches C++ `WriteTableToBuffer`.
1011    fn write_with_cdc_options(
1012        batches: &[&RecordBatch],
1013        min_chunk_size: usize,
1014        max_chunk_size: usize,
1015        max_row_group_rows: Option<usize>,
1016        enable_dictionary: bool,
1017    ) -> Vec<u8> {
1018        assert!(!batches.is_empty());
1019        let schema = batches[0].schema();
1020        let mut builder = WriterProperties::builder()
1021            .set_dictionary_enabled(enable_dictionary)
1022            .set_content_defined_chunking(Some(CdcOptions {
1023                min_chunk_size,
1024                max_chunk_size,
1025                norm_level: 0,
1026            }));
1027        if let Some(max_rows) = max_row_group_rows {
1028            builder = builder.set_max_row_group_row_count(Some(max_rows));
1029        }
1030        let props = builder.build();
1031        let mut buf = Vec::new();
1032        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
1033        for batch in batches {
1034            writer.write(batch).unwrap();
1035        }
1036        writer.close().unwrap();
1037
1038        // Roundtrip validation (matching C++ WriteTableToBuffer)
1039        let readback = read_batches(&buf);
1040        let original_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1041        let readback_rows: usize = readback.iter().map(|b| b.num_rows()).sum();
1042        assert_eq!(original_rows, readback_rows, "Roundtrip row count mismatch");
1043        if original_rows > 0 {
1044            let original = concat_batches(batches.iter().copied());
1045            let roundtrip = concat_batches(&readback);
1046            assert_eq!(original, roundtrip, "Roundtrip validation failed");
1047        }
1048
1049        buf
1050    }
1051
1052    fn read_batches(data: &[u8]) -> Vec<RecordBatch> {
1053        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(data.to_vec()))
1054            .unwrap()
1055            .build()
1056            .unwrap();
1057        reader.collect::<std::result::Result<Vec<_>, _>>().unwrap()
1058    }
1059
1060    fn concat_batches(batches: impl IntoIterator<Item = impl Borrow<RecordBatch>>) -> RecordBatch {
1061        let batches: Vec<_> = batches.into_iter().collect();
1062        let schema = batches[0].borrow().schema();
1063        let batches = batches.iter().map(|b| b.borrow());
1064        arrow_select::concat::concat_batches(&schema, batches).unwrap()
1065    }
1066
1067    /// LCS-based diff between two sequences of page lengths (ported from C++).
1068    /// Includes the merge-adjacent-diffs post-processing from C++.
1069    fn find_differences(first: &[i64], second: &[i64]) -> Vec<(Vec<i64>, Vec<i64>)> {
1070        let n = first.len();
1071        let m = second.len();
1072        let mut dp = vec![vec![0usize; m + 1]; n + 1];
1073        for i in 0..n {
1074            for j in 0..m {
1075                if first[i] == second[j] {
1076                    dp[i + 1][j + 1] = dp[i][j] + 1;
1077                } else {
1078                    dp[i + 1][j + 1] = dp[i + 1][j].max(dp[i][j + 1]);
1079                }
1080            }
1081        }
1082        let mut common = Vec::new();
1083        let (mut i, mut j) = (n, m);
1084        while i > 0 && j > 0 {
1085            if first[i - 1] == second[j - 1] {
1086                common.push((i - 1, j - 1));
1087                i -= 1;
1088                j -= 1;
1089            } else if dp[i - 1][j] >= dp[i][j - 1] {
1090                i -= 1;
1091            } else {
1092                j -= 1;
1093            }
1094        }
1095        common.reverse();
1096
1097        let mut result = Vec::new();
1098        let (mut last_i, mut last_j) = (0usize, 0usize);
1099        for (ci, cj) in &common {
1100            if *ci > last_i || *cj > last_j {
1101                result.push((first[last_i..*ci].to_vec(), second[last_j..*cj].to_vec()));
1102            }
1103            last_i = ci + 1;
1104            last_j = cj + 1;
1105        }
1106        if last_i < n || last_j < m {
1107            result.push((first[last_i..].to_vec(), second[last_j..].to_vec()));
1108        }
1109
1110        // Merge adjacent diffs (matching C++ post-processing)
1111        let mut merged: Vec<(Vec<i64>, Vec<i64>)> = Vec::new();
1112        for diff in result {
1113            if let Some(prev) = merged.last_mut() {
1114                if prev.0.is_empty() && diff.1.is_empty() {
1115                    prev.0 = diff.0;
1116                    continue;
1117                } else if prev.1.is_empty() && diff.0.is_empty() {
1118                    prev.1 = diff.1;
1119                    continue;
1120                }
1121            }
1122            merged.push(diff);
1123        }
1124        merged
1125    }
1126
1127    /// Assert exact page length differences between original and modified files.
1128    /// Matches C++ `AssertPageLengthDifferences` (full version).
1129    fn assert_page_length_differences(
1130        original: &ColumnInfo,
1131        modified: &ColumnInfo,
1132        exact_equal_diffs: usize,
1133        exact_larger_diffs: usize,
1134        exact_smaller_diffs: usize,
1135        edit_length: i64,
1136    ) {
1137        let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1138        let expected = exact_equal_diffs + exact_larger_diffs + exact_smaller_diffs;
1139
1140        if diffs.len() != expected {
1141            eprintln!("Original: {:?}", original.page_lengths);
1142            eprintln!("Modified: {:?}", modified.page_lengths);
1143            for d in &diffs {
1144                eprintln!("  Diff: {:?} vs {:?}", d.0, d.1);
1145            }
1146        }
1147        assert_eq!(
1148            diffs.len(),
1149            expected,
1150            "Expected {expected} diffs, got {}",
1151            diffs.len()
1152        );
1153
1154        let (mut eq, mut larger, mut smaller) = (0usize, 0usize, 0usize);
1155        for (left, right) in &diffs {
1156            let left_sum: i64 = left.iter().sum();
1157            let right_sum: i64 = right.iter().sum();
1158            if left_sum == right_sum {
1159                eq += 1;
1160            } else if left_sum < right_sum {
1161                larger += 1;
1162                assert_eq!(
1163                    left_sum + edit_length,
1164                    right_sum,
1165                    "Larger diff mismatch: {left_sum} + {edit_length} != {right_sum}"
1166                );
1167            } else {
1168                smaller += 1;
1169                assert_eq!(
1170                    left_sum,
1171                    right_sum + edit_length,
1172                    "Smaller diff mismatch: {left_sum} != {right_sum} + {edit_length}"
1173                );
1174            }
1175        }
1176
1177        assert_eq!(eq, exact_equal_diffs, "equal diffs count");
1178        assert_eq!(larger, exact_larger_diffs, "larger diffs count");
1179        assert_eq!(smaller, exact_smaller_diffs, "smaller diffs count");
1180    }
1181
1182    /// Assert page length differences for update cases (simplified version).
1183    /// Matches C++ `AssertPageLengthDifferences` (max_equal_diffs overload).
1184    fn assert_page_length_differences_update(
1185        original: &ColumnInfo,
1186        modified: &ColumnInfo,
1187        max_equal_diffs: usize,
1188    ) {
1189        let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1190        assert!(
1191            diffs.len() <= max_equal_diffs,
1192            "Expected at most {max_equal_diffs} diffs, got {}",
1193            diffs.len()
1194        );
1195        for (left, right) in &diffs {
1196            let left_sum: i64 = left.iter().sum();
1197            let right_sum: i64 = right.iter().sum();
1198            assert_eq!(
1199                left_sum, right_sum,
1200                "Update diff should not change total row count"
1201            );
1202        }
1203    }
1204
1205    // --- FindDifferences tests (ported from C++) ---
1206
1207    #[test]
1208    fn test_find_differences_basic() {
1209        let diffs = find_differences(&[1, 2, 3, 4, 5], &[1, 7, 8, 4, 5]);
1210        assert_eq!(diffs.len(), 1);
1211        assert_eq!(diffs[0].0, vec![2, 3]);
1212        assert_eq!(diffs[0].1, vec![7, 8]);
1213    }
1214
1215    #[test]
1216    fn test_find_differences_multiple() {
1217        let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7], &[1, 8, 9, 4, 10, 6, 11]);
1218        assert_eq!(diffs.len(), 3);
1219        assert_eq!(diffs[0].0, vec![2, 3]);
1220        assert_eq!(diffs[0].1, vec![8, 9]);
1221        assert_eq!(diffs[1].0, vec![5]);
1222        assert_eq!(diffs[1].1, vec![10]);
1223        assert_eq!(diffs[2].0, vec![7]);
1224        assert_eq!(diffs[2].1, vec![11]);
1225    }
1226
1227    #[test]
1228    fn test_find_differences_different_lengths() {
1229        let diffs = find_differences(&[1, 2, 3], &[1, 2, 3, 4, 5]);
1230        assert_eq!(diffs.len(), 1);
1231        assert!(diffs[0].0.is_empty());
1232        assert_eq!(diffs[0].1, vec![4, 5]);
1233    }
1234
1235    #[test]
1236    fn test_find_differences_empty() {
1237        let diffs = find_differences(&[], &[]);
1238        assert!(diffs.is_empty());
1239    }
1240
1241    #[test]
1242    fn test_find_differences_changes_at_both_ends() {
1243        let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7, 8, 9], &[0, 0, 2, 3, 4, 5, 7, 7, 8]);
1244        assert_eq!(diffs.len(), 3);
1245        assert_eq!(diffs[0].0, vec![1]);
1246        assert_eq!(diffs[0].1, vec![0, 0]);
1247        assert_eq!(diffs[1].0, vec![6]);
1248        assert_eq!(diffs[1].1, vec![7]);
1249        assert_eq!(diffs[2].0, vec![9]);
1250        assert!(diffs[2].1.is_empty());
1251    }
1252
1253    #[test]
1254    fn test_find_differences_additional() {
1255        let diffs = find_differences(
1256            &[445, 312, 393, 401, 410, 138, 558, 457],
1257            &[445, 312, 393, 393, 410, 138, 558, 457],
1258        );
1259        assert_eq!(diffs.len(), 1);
1260        assert_eq!(diffs[0].0, vec![401]);
1261        assert_eq!(diffs[0].1, vec![393]);
1262    }
1263
1264    // --- Parameterized single-row-group tests via macro ---
1265
1266    macro_rules! cdc_single_rg_tests {
1267        ($mod_name:ident, $dtype:expr, $nullable:expr) => {
1268            mod $mod_name {
1269                use super::*;
1270
1271                fn config() -> (DataType, bool, usize, usize) {
1272                    let dtype: DataType = $dtype;
1273                    let nullable: bool = $nullable;
1274                    let bpr = bytes_per_record(&dtype, nullable);
1275                    let part_length = CDC_PART_SIZE / bpr;
1276                    let edit_length = CDC_EDIT_SIZE / bpr;
1277                    (dtype, nullable, part_length, edit_length)
1278                }
1279
1280                fn make_schema(dtype: &DataType, nullable: bool) -> Arc<Schema> {
1281                    Arc::new(Schema::new(vec![Field::new("f0", dtype.clone(), nullable)]))
1282                }
1283
1284                #[test]
1285                fn delete_once() {
1286                    let (dtype, nullable, part_length, edit_length) = config();
1287                    let schema = make_schema(&dtype, nullable);
1288
1289                    let part1 = generate_table(&schema, part_length, 0);
1290                    let part2 = generate_table(&schema, edit_length, 1);
1291                    let part3 = generate_table(&schema, part_length, part_length as u64);
1292
1293                    let base = concat_batches([&part1, &part2, &part3]);
1294                    let modified = concat_batches([&part1, &part3]);
1295
1296                    for enable_dictionary in [false, true] {
1297                        let base_data = write_with_cdc_options(
1298                            &[&base],
1299                            CDC_MIN_CHUNK_SIZE,
1300                            CDC_MAX_CHUNK_SIZE,
1301                            Some(CDC_ROW_GROUP_LENGTH),
1302                            enable_dictionary,
1303                        );
1304                        let mod_data = write_with_cdc_options(
1305                            &[&modified],
1306                            CDC_MIN_CHUNK_SIZE,
1307                            CDC_MAX_CHUNK_SIZE,
1308                            Some(CDC_ROW_GROUP_LENGTH),
1309                            enable_dictionary,
1310                        );
1311
1312                        let base_info = get_column_info(&base_data, 0);
1313                        let mod_info = get_column_info(&mod_data, 0);
1314                        assert_eq!(base_info.len(), 1);
1315                        assert_eq!(mod_info.len(), 1);
1316
1317                        assert_cdc_chunk_sizes(
1318                            &base.column(0).clone(),
1319                            &base_info[0],
1320                            nullable,
1321                            CDC_MIN_CHUNK_SIZE,
1322                            CDC_MAX_CHUNK_SIZE,
1323                            enable_dictionary,
1324                        );
1325                        assert_cdc_chunk_sizes(
1326                            &modified.column(0).clone(),
1327                            &mod_info[0],
1328                            nullable,
1329                            CDC_MIN_CHUNK_SIZE,
1330                            CDC_MAX_CHUNK_SIZE,
1331                            enable_dictionary,
1332                        );
1333
1334                        assert_page_length_differences(
1335                            &base_info[0],
1336                            &mod_info[0],
1337                            0,
1338                            0,
1339                            1,
1340                            edit_length as i64,
1341                        );
1342                    }
1343                }
1344
1345                #[test]
1346                fn delete_twice() {
1347                    let (dtype, nullable, part_length, edit_length) = config();
1348                    let schema = make_schema(&dtype, nullable);
1349
1350                    let part1 = generate_table(&schema, part_length, 0);
1351                    let part2 = generate_table(&schema, edit_length, 1);
1352                    let part3 = generate_table(&schema, part_length, part_length as u64);
1353                    let part4 = generate_table(&schema, edit_length, 2);
1354                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1355
1356                    let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1357                    let modified = concat_batches([&part1, &part3, &part5]);
1358
1359                    for enable_dictionary in [false, true] {
1360                        let base_data = write_with_cdc_options(
1361                            &[&base],
1362                            CDC_MIN_CHUNK_SIZE,
1363                            CDC_MAX_CHUNK_SIZE,
1364                            Some(CDC_ROW_GROUP_LENGTH),
1365                            enable_dictionary,
1366                        );
1367                        let mod_data = write_with_cdc_options(
1368                            &[&modified],
1369                            CDC_MIN_CHUNK_SIZE,
1370                            CDC_MAX_CHUNK_SIZE,
1371                            Some(CDC_ROW_GROUP_LENGTH),
1372                            enable_dictionary,
1373                        );
1374
1375                        let base_info = get_column_info(&base_data, 0);
1376                        let mod_info = get_column_info(&mod_data, 0);
1377                        assert_eq!(base_info.len(), 1);
1378                        assert_eq!(mod_info.len(), 1);
1379
1380                        assert_cdc_chunk_sizes(
1381                            &base.column(0).clone(),
1382                            &base_info[0],
1383                            nullable,
1384                            CDC_MIN_CHUNK_SIZE,
1385                            CDC_MAX_CHUNK_SIZE,
1386                            enable_dictionary,
1387                        );
1388                        assert_cdc_chunk_sizes(
1389                            &modified.column(0).clone(),
1390                            &mod_info[0],
1391                            nullable,
1392                            CDC_MIN_CHUNK_SIZE,
1393                            CDC_MAX_CHUNK_SIZE,
1394                            enable_dictionary,
1395                        );
1396
1397                        assert_page_length_differences(
1398                            &base_info[0],
1399                            &mod_info[0],
1400                            0,
1401                            0,
1402                            2,
1403                            edit_length as i64,
1404                        );
1405                    }
1406                }
1407
1408                #[test]
1409                fn insert_once() {
1410                    let (dtype, nullable, part_length, edit_length) = config();
1411                    let schema = make_schema(&dtype, nullable);
1412
1413                    let part1 = generate_table(&schema, part_length, 0);
1414                    let part2 = generate_table(&schema, edit_length, 1);
1415                    let part3 = generate_table(&schema, part_length, part_length as u64);
1416
1417                    let base = concat_batches([&part1, &part3]);
1418                    let modified = concat_batches([&part1, &part2, &part3]);
1419
1420                    for enable_dictionary in [false, true] {
1421                        let base_data = write_with_cdc_options(
1422                            &[&base],
1423                            CDC_MIN_CHUNK_SIZE,
1424                            CDC_MAX_CHUNK_SIZE,
1425                            Some(CDC_ROW_GROUP_LENGTH),
1426                            enable_dictionary,
1427                        );
1428                        let mod_data = write_with_cdc_options(
1429                            &[&modified],
1430                            CDC_MIN_CHUNK_SIZE,
1431                            CDC_MAX_CHUNK_SIZE,
1432                            Some(CDC_ROW_GROUP_LENGTH),
1433                            enable_dictionary,
1434                        );
1435
1436                        let base_info = get_column_info(&base_data, 0);
1437                        let mod_info = get_column_info(&mod_data, 0);
1438                        assert_eq!(base_info.len(), 1);
1439                        assert_eq!(mod_info.len(), 1);
1440
1441                        assert_cdc_chunk_sizes(
1442                            &base.column(0).clone(),
1443                            &base_info[0],
1444                            nullable,
1445                            CDC_MIN_CHUNK_SIZE,
1446                            CDC_MAX_CHUNK_SIZE,
1447                            enable_dictionary,
1448                        );
1449                        assert_cdc_chunk_sizes(
1450                            &modified.column(0).clone(),
1451                            &mod_info[0],
1452                            nullable,
1453                            CDC_MIN_CHUNK_SIZE,
1454                            CDC_MAX_CHUNK_SIZE,
1455                            enable_dictionary,
1456                        );
1457
1458                        assert_page_length_differences(
1459                            &base_info[0],
1460                            &mod_info[0],
1461                            0,
1462                            1,
1463                            0,
1464                            edit_length as i64,
1465                        );
1466                    }
1467                }
1468
1469                #[test]
1470                fn insert_twice() {
1471                    let (dtype, nullable, part_length, edit_length) = config();
1472                    let schema = make_schema(&dtype, nullable);
1473
1474                    let part1 = generate_table(&schema, part_length, 0);
1475                    let part2 = generate_table(&schema, edit_length, 1);
1476                    let part3 = generate_table(&schema, part_length, part_length as u64);
1477                    let part4 = generate_table(&schema, edit_length, 2);
1478                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1479
1480                    let base = concat_batches([&part1, &part3, &part5]);
1481                    let modified = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1482
1483                    for enable_dictionary in [false, true] {
1484                        let base_data = write_with_cdc_options(
1485                            &[&base],
1486                            CDC_MIN_CHUNK_SIZE,
1487                            CDC_MAX_CHUNK_SIZE,
1488                            Some(CDC_ROW_GROUP_LENGTH),
1489                            enable_dictionary,
1490                        );
1491                        let mod_data = write_with_cdc_options(
1492                            &[&modified],
1493                            CDC_MIN_CHUNK_SIZE,
1494                            CDC_MAX_CHUNK_SIZE,
1495                            Some(CDC_ROW_GROUP_LENGTH),
1496                            enable_dictionary,
1497                        );
1498
1499                        let base_info = get_column_info(&base_data, 0);
1500                        let mod_info = get_column_info(&mod_data, 0);
1501                        assert_eq!(base_info.len(), 1);
1502                        assert_eq!(mod_info.len(), 1);
1503
1504                        assert_cdc_chunk_sizes(
1505                            &base.column(0).clone(),
1506                            &base_info[0],
1507                            nullable,
1508                            CDC_MIN_CHUNK_SIZE,
1509                            CDC_MAX_CHUNK_SIZE,
1510                            enable_dictionary,
1511                        );
1512                        assert_cdc_chunk_sizes(
1513                            &modified.column(0).clone(),
1514                            &mod_info[0],
1515                            nullable,
1516                            CDC_MIN_CHUNK_SIZE,
1517                            CDC_MAX_CHUNK_SIZE,
1518                            enable_dictionary,
1519                        );
1520
1521                        assert_page_length_differences(
1522                            &base_info[0],
1523                            &mod_info[0],
1524                            0,
1525                            2,
1526                            0,
1527                            edit_length as i64,
1528                        );
1529                    }
1530                }
1531
1532                #[test]
1533                fn update_once() {
1534                    let (dtype, nullable, part_length, edit_length) = config();
1535                    let schema = make_schema(&dtype, nullable);
1536
1537                    let part1 = generate_table(&schema, part_length, 0);
1538                    let part2 = generate_table(&schema, edit_length, 1);
1539                    let part3 = generate_table(&schema, part_length, part_length as u64);
1540                    let part4 = generate_table(&schema, edit_length, 2);
1541
1542                    let base = concat_batches([&part1, &part2, &part3]);
1543                    let modified = concat_batches([&part1, &part4, &part3]);
1544
1545                    for enable_dictionary in [false, true] {
1546                        let base_data = write_with_cdc_options(
1547                            &[&base],
1548                            CDC_MIN_CHUNK_SIZE,
1549                            CDC_MAX_CHUNK_SIZE,
1550                            Some(CDC_ROW_GROUP_LENGTH),
1551                            enable_dictionary,
1552                        );
1553                        let mod_data = write_with_cdc_options(
1554                            &[&modified],
1555                            CDC_MIN_CHUNK_SIZE,
1556                            CDC_MAX_CHUNK_SIZE,
1557                            Some(CDC_ROW_GROUP_LENGTH),
1558                            enable_dictionary,
1559                        );
1560
1561                        let base_info = get_column_info(&base_data, 0);
1562                        let mod_info = get_column_info(&mod_data, 0);
1563                        assert_eq!(base_info.len(), 1);
1564                        assert_eq!(mod_info.len(), 1);
1565
1566                        assert_cdc_chunk_sizes(
1567                            &base.column(0).clone(),
1568                            &base_info[0],
1569                            nullable,
1570                            CDC_MIN_CHUNK_SIZE,
1571                            CDC_MAX_CHUNK_SIZE,
1572                            enable_dictionary,
1573                        );
1574                        assert_cdc_chunk_sizes(
1575                            &modified.column(0).clone(),
1576                            &mod_info[0],
1577                            nullable,
1578                            CDC_MIN_CHUNK_SIZE,
1579                            CDC_MAX_CHUNK_SIZE,
1580                            enable_dictionary,
1581                        );
1582
1583                        assert_page_length_differences_update(&base_info[0], &mod_info[0], 1);
1584                    }
1585                }
1586
1587                #[test]
1588                fn update_twice() {
1589                    let (dtype, nullable, part_length, edit_length) = config();
1590                    let schema = make_schema(&dtype, nullable);
1591
1592                    let part1 = generate_table(&schema, part_length, 0);
1593                    let part2 = generate_table(&schema, edit_length, 1);
1594                    let part3 = generate_table(&schema, part_length, part_length as u64);
1595                    let part4 = generate_table(&schema, edit_length, 2);
1596                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1597                    let part6 = generate_table(&schema, edit_length, 3);
1598                    let part7 = generate_table(&schema, edit_length, 4);
1599
1600                    let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1601                    let modified = concat_batches([&part1, &part6, &part3, &part7, &part5]);
1602
1603                    for enable_dictionary in [false, true] {
1604                        let base_data = write_with_cdc_options(
1605                            &[&base],
1606                            CDC_MIN_CHUNK_SIZE,
1607                            CDC_MAX_CHUNK_SIZE,
1608                            Some(CDC_ROW_GROUP_LENGTH),
1609                            enable_dictionary,
1610                        );
1611                        let mod_data = write_with_cdc_options(
1612                            &[&modified],
1613                            CDC_MIN_CHUNK_SIZE,
1614                            CDC_MAX_CHUNK_SIZE,
1615                            Some(CDC_ROW_GROUP_LENGTH),
1616                            enable_dictionary,
1617                        );
1618
1619                        let base_info = get_column_info(&base_data, 0);
1620                        let mod_info = get_column_info(&mod_data, 0);
1621                        assert_eq!(base_info.len(), 1);
1622                        assert_eq!(mod_info.len(), 1);
1623
1624                        assert_cdc_chunk_sizes(
1625                            &base.column(0).clone(),
1626                            &base_info[0],
1627                            nullable,
1628                            CDC_MIN_CHUNK_SIZE,
1629                            CDC_MAX_CHUNK_SIZE,
1630                            enable_dictionary,
1631                        );
1632                        assert_cdc_chunk_sizes(
1633                            &modified.column(0).clone(),
1634                            &mod_info[0],
1635                            nullable,
1636                            CDC_MIN_CHUNK_SIZE,
1637                            CDC_MAX_CHUNK_SIZE,
1638                            enable_dictionary,
1639                        );
1640
1641                        assert_page_length_differences_update(&base_info[0], &mod_info[0], 2);
1642                    }
1643                }
1644
1645                #[test]
1646                fn prepend() {
1647                    let (dtype, nullable, part_length, edit_length) = config();
1648                    let schema = make_schema(&dtype, nullable);
1649
1650                    let part1 = generate_table(&schema, part_length, 0);
1651                    let part2 = generate_table(&schema, edit_length, 1);
1652                    let part3 = generate_table(&schema, part_length, part_length as u64);
1653                    let part4 = generate_table(&schema, edit_length, 2);
1654
1655                    let base = concat_batches([&part1, &part2, &part3]);
1656                    let modified = concat_batches([&part4, &part1, &part2, &part3]);
1657
1658                    for enable_dictionary in [false, true] {
1659                        let base_data = write_with_cdc_options(
1660                            &[&base],
1661                            CDC_MIN_CHUNK_SIZE,
1662                            CDC_MAX_CHUNK_SIZE,
1663                            Some(CDC_ROW_GROUP_LENGTH),
1664                            enable_dictionary,
1665                        );
1666                        let mod_data = write_with_cdc_options(
1667                            &[&modified],
1668                            CDC_MIN_CHUNK_SIZE,
1669                            CDC_MAX_CHUNK_SIZE,
1670                            Some(CDC_ROW_GROUP_LENGTH),
1671                            enable_dictionary,
1672                        );
1673
1674                        let base_info = get_column_info(&base_data, 0);
1675                        let mod_info = get_column_info(&mod_data, 0);
1676                        assert_eq!(base_info.len(), 1);
1677                        assert_eq!(mod_info.len(), 1);
1678
1679                        assert_cdc_chunk_sizes(
1680                            &base.column(0).clone(),
1681                            &base_info[0],
1682                            nullable,
1683                            CDC_MIN_CHUNK_SIZE,
1684                            CDC_MAX_CHUNK_SIZE,
1685                            enable_dictionary,
1686                        );
1687                        assert_cdc_chunk_sizes(
1688                            &modified.column(0).clone(),
1689                            &mod_info[0],
1690                            nullable,
1691                            CDC_MIN_CHUNK_SIZE,
1692                            CDC_MAX_CHUNK_SIZE,
1693                            enable_dictionary,
1694                        );
1695
1696                        assert!(
1697                            mod_info[0].page_lengths.len() >= base_info[0].page_lengths.len(),
1698                            "Modified should have same or more pages"
1699                        );
1700
1701                        assert_page_length_differences(
1702                            &base_info[0],
1703                            &mod_info[0],
1704                            0,
1705                            1,
1706                            0,
1707                            edit_length as i64,
1708                        );
1709                    }
1710                }
1711
1712                #[test]
1713                fn append() {
1714                    let (dtype, nullable, part_length, edit_length) = config();
1715                    let schema = make_schema(&dtype, nullable);
1716
1717                    let part1 = generate_table(&schema, part_length, 0);
1718                    let part2 = generate_table(&schema, edit_length, 1);
1719                    let part3 = generate_table(&schema, part_length, part_length as u64);
1720                    let part4 = generate_table(&schema, edit_length, 2);
1721
1722                    let base = concat_batches([&part1, &part2, &part3]);
1723                    let modified = concat_batches([&part1, &part2, &part3, &part4]);
1724
1725                    for enable_dictionary in [false, true] {
1726                        let base_data = write_with_cdc_options(
1727                            &[&base],
1728                            CDC_MIN_CHUNK_SIZE,
1729                            CDC_MAX_CHUNK_SIZE,
1730                            Some(CDC_ROW_GROUP_LENGTH),
1731                            enable_dictionary,
1732                        );
1733                        let mod_data = write_with_cdc_options(
1734                            &[&modified],
1735                            CDC_MIN_CHUNK_SIZE,
1736                            CDC_MAX_CHUNK_SIZE,
1737                            Some(CDC_ROW_GROUP_LENGTH),
1738                            enable_dictionary,
1739                        );
1740
1741                        let base_info = get_column_info(&base_data, 0);
1742                        let mod_info = get_column_info(&mod_data, 0);
1743                        assert_eq!(base_info.len(), 1);
1744                        assert_eq!(mod_info.len(), 1);
1745
1746                        assert_cdc_chunk_sizes(
1747                            &base.column(0).clone(),
1748                            &base_info[0],
1749                            nullable,
1750                            CDC_MIN_CHUNK_SIZE,
1751                            CDC_MAX_CHUNK_SIZE,
1752                            enable_dictionary,
1753                        );
1754                        assert_cdc_chunk_sizes(
1755                            &modified.column(0).clone(),
1756                            &mod_info[0],
1757                            nullable,
1758                            CDC_MIN_CHUNK_SIZE,
1759                            CDC_MAX_CHUNK_SIZE,
1760                            enable_dictionary,
1761                        );
1762
1763                        let bp = &base_info[0].page_lengths;
1764                        let mp = &mod_info[0].page_lengths;
1765                        assert!(mp.len() >= bp.len());
1766                        for i in 0..bp.len() - 1 {
1767                            assert_eq!(bp[i], mp[i], "Page {i} should be identical");
1768                        }
1769                        assert!(mp[bp.len() - 1] >= bp[bp.len() - 1]);
1770                    }
1771                }
1772
1773                #[test]
1774                fn empty_table() {
1775                    let (dtype, nullable, _, _) = config();
1776                    let schema = make_schema(&dtype, nullable);
1777
1778                    let empty = RecordBatch::new_empty(schema);
1779                    for enable_dictionary in [false, true] {
1780                        let data = write_with_cdc_options(
1781                            &[&empty],
1782                            CDC_MIN_CHUNK_SIZE,
1783                            CDC_MAX_CHUNK_SIZE,
1784                            Some(CDC_ROW_GROUP_LENGTH),
1785                            enable_dictionary,
1786                        );
1787                        let info = get_column_info(&data, 0);
1788                        // Empty table: either no row groups or one with no data pages
1789                        if !info.is_empty() {
1790                            assert!(info[0].page_lengths.is_empty());
1791                        }
1792                    }
1793                }
1794
1795                #[test]
1796                fn array_offsets() {
1797                    let (dtype, nullable, part_length, edit_length) = config();
1798                    let schema = make_schema(&dtype, nullable);
1799
1800                    let table = concat_batches([
1801                        &generate_table(&schema, part_length, 0),
1802                        &generate_table(&schema, edit_length, 1),
1803                        &generate_table(&schema, part_length, part_length as u64),
1804                    ]);
1805
1806                    for offset in [0usize, 512, 1024] {
1807                        if offset >= table.num_rows() {
1808                            continue;
1809                        }
1810                        let sliced = table.slice(offset, table.num_rows() - offset);
1811                        let data = write_with_cdc_options(
1812                            &[&sliced],
1813                            CDC_MIN_CHUNK_SIZE,
1814                            CDC_MAX_CHUNK_SIZE,
1815                            Some(CDC_ROW_GROUP_LENGTH),
1816                            true,
1817                        );
1818                        let info = get_column_info(&data, 0);
1819                        assert_eq!(info.len(), 1);
1820
1821                        // Verify CDC actually produced content-defined chunks
1822                        assert_cdc_chunk_sizes(
1823                            &sliced.column(0).clone(),
1824                            &info[0],
1825                            nullable,
1826                            CDC_MIN_CHUNK_SIZE,
1827                            CDC_MAX_CHUNK_SIZE,
1828                            true,
1829                        );
1830                    }
1831                }
1832            }
1833        };
1834    }
1835
1836    // Instantiate for representative types matching C++ categories
1837    cdc_single_rg_tests!(cdc_bool_non_null, DataType::Boolean, false);
1838    cdc_single_rg_tests!(cdc_i32_non_null, DataType::Int32, false);
1839    cdc_single_rg_tests!(cdc_i64_nullable, DataType::Int64, true);
1840    cdc_single_rg_tests!(cdc_f64_nullable, DataType::Float64, true);
1841    cdc_single_rg_tests!(cdc_utf8_non_null, DataType::Utf8, false);
1842    cdc_single_rg_tests!(cdc_binary_nullable, DataType::Binary, true);
1843    cdc_single_rg_tests!(cdc_fsb16_nullable, DataType::FixedSizeBinary(16), true);
1844    cdc_single_rg_tests!(cdc_date32_non_null, DataType::Date32, false);
1845    cdc_single_rg_tests!(
1846        cdc_timestamp_nullable,
1847        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1848        true
1849    );
1850
1851    // --- Multiple row group tests matching C++ TestCDCMultipleRowGroups ---
1852
1853    mod cdc_multiple_row_groups {
1854        use super::*;
1855
1856        const PART_LENGTH: usize = 128 * 1024;
1857        const EDIT_LENGTH: usize = 128;
1858        const ROW_GROUP_LENGTH: usize = 64 * 1024;
1859
1860        fn schema() -> Arc<Schema> {
1861            Arc::new(Schema::new(vec![
1862                Field::new("int32", DataType::Int32, true),
1863                Field::new("float64", DataType::Float64, true),
1864                Field::new("bool", DataType::Boolean, false),
1865            ]))
1866        }
1867
1868        #[test]
1869        fn insert_once() {
1870            let s = schema();
1871            let part1 = generate_table(&s, PART_LENGTH, 0);
1872            let part2 = generate_table(&s, PART_LENGTH, 2);
1873            let part3 = generate_table(&s, PART_LENGTH, 4);
1874            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1875            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1876
1877            let base = concat_batches([&part1, &edit1, &part2, &part3]);
1878            let modified = concat_batches([&part1, &edit1, &edit2, &part2, &part3]);
1879            assert_eq!(modified.num_rows(), base.num_rows() + EDIT_LENGTH);
1880
1881            let base_data = write_with_cdc_options(
1882                &[&base],
1883                CDC_MIN_CHUNK_SIZE,
1884                CDC_MAX_CHUNK_SIZE,
1885                Some(ROW_GROUP_LENGTH),
1886                false,
1887            );
1888            let mod_data = write_with_cdc_options(
1889                &[&modified],
1890                CDC_MIN_CHUNK_SIZE,
1891                CDC_MAX_CHUNK_SIZE,
1892                Some(ROW_GROUP_LENGTH),
1893                false,
1894            );
1895
1896            for col in 0..s.fields().len() {
1897                let base_info = get_column_info(&base_data, col);
1898                let mod_info = get_column_info(&mod_data, col);
1899
1900                assert_eq!(base_info.len(), 7, "expected 7 row groups for col {col}");
1901                assert_eq!(mod_info.len(), 7);
1902
1903                // First two row groups should be identical
1904                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
1905                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
1906
1907                // Middle row groups: 1 larger + 1 smaller diff
1908                for i in 2..mod_info.len() - 1 {
1909                    assert_page_length_differences(
1910                        &base_info[i],
1911                        &mod_info[i],
1912                        0,
1913                        1,
1914                        1,
1915                        EDIT_LENGTH as i64,
1916                    );
1917                }
1918                // Last row group: just larger
1919                assert_page_length_differences(
1920                    base_info.last().unwrap(),
1921                    mod_info.last().unwrap(),
1922                    0,
1923                    1,
1924                    0,
1925                    EDIT_LENGTH as i64,
1926                );
1927            }
1928        }
1929
1930        #[test]
1931        fn delete_once() {
1932            let s = schema();
1933            let part1 = generate_table(&s, PART_LENGTH, 0);
1934            let part2 = generate_table(&s, PART_LENGTH, 2);
1935            let part3 = generate_table(&s, PART_LENGTH, 4);
1936            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1937            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1938
1939            let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
1940            let modified = concat_batches([&part1, &part2, &part3, &edit2]);
1941
1942            let base_data = write_with_cdc_options(
1943                &[&base],
1944                CDC_MIN_CHUNK_SIZE,
1945                CDC_MAX_CHUNK_SIZE,
1946                Some(ROW_GROUP_LENGTH),
1947                false,
1948            );
1949            let mod_data = write_with_cdc_options(
1950                &[&modified],
1951                CDC_MIN_CHUNK_SIZE,
1952                CDC_MAX_CHUNK_SIZE,
1953                Some(ROW_GROUP_LENGTH),
1954                false,
1955            );
1956
1957            for col in 0..s.fields().len() {
1958                let base_info = get_column_info(&base_data, col);
1959                let mod_info = get_column_info(&mod_data, col);
1960
1961                assert_eq!(base_info.len(), 7);
1962                assert_eq!(mod_info.len(), 7);
1963
1964                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
1965                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
1966
1967                for i in 2..mod_info.len() - 1 {
1968                    assert_page_length_differences(
1969                        &base_info[i],
1970                        &mod_info[i],
1971                        0,
1972                        1,
1973                        1,
1974                        EDIT_LENGTH as i64,
1975                    );
1976                }
1977                assert_page_length_differences(
1978                    base_info.last().unwrap(),
1979                    mod_info.last().unwrap(),
1980                    0,
1981                    0,
1982                    1,
1983                    EDIT_LENGTH as i64,
1984                );
1985            }
1986        }
1987
1988        #[test]
1989        fn update_once() {
1990            let s = schema();
1991            let part1 = generate_table(&s, PART_LENGTH, 0);
1992            let part2 = generate_table(&s, PART_LENGTH, 2);
1993            let part3 = generate_table(&s, PART_LENGTH, 4);
1994            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1995            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1996            let edit3 = generate_table(&s, EDIT_LENGTH, 5);
1997
1998            let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
1999            let modified = concat_batches([&part1, &edit3, &part2, &part3, &edit2]);
2000
2001            let base_data = write_with_cdc_options(
2002                &[&base],
2003                CDC_MIN_CHUNK_SIZE,
2004                CDC_MAX_CHUNK_SIZE,
2005                Some(ROW_GROUP_LENGTH),
2006                false,
2007            );
2008            let mod_data = write_with_cdc_options(
2009                &[&modified],
2010                CDC_MIN_CHUNK_SIZE,
2011                CDC_MAX_CHUNK_SIZE,
2012                Some(ROW_GROUP_LENGTH),
2013                false,
2014            );
2015
2016            for col in 0..s.fields().len() {
2017                let nullable = s.field(col).is_nullable();
2018                let base_info = get_column_info(&base_data, col);
2019                let mod_info = get_column_info(&mod_data, col);
2020
2021                assert_eq!(base_info.len(), 7);
2022                assert_eq!(mod_info.len(), 7);
2023
2024                // Validate CDC chunk sizes on at least the first row group
2025                assert_cdc_chunk_sizes(
2026                    &base.column(col).slice(0, ROW_GROUP_LENGTH),
2027                    &base_info[0],
2028                    nullable,
2029                    CDC_MIN_CHUNK_SIZE,
2030                    CDC_MAX_CHUNK_SIZE,
2031                    false,
2032                );
2033
2034                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2035                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2036
2037                // Row group containing the edit
2038                assert_page_length_differences_update(&base_info[2], &mod_info[2], 1);
2039
2040                // Remaining row groups should be identical
2041                for i in 3..mod_info.len() {
2042                    assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2043                }
2044            }
2045        }
2046
2047        #[test]
2048        fn append() {
2049            let s = schema();
2050            let part1 = generate_table(&s, PART_LENGTH, 0);
2051            let part2 = generate_table(&s, PART_LENGTH, 2);
2052            let part3 = generate_table(&s, PART_LENGTH, 4);
2053            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2054            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2055
2056            let base = concat_batches([&part1, &edit1, &part2, &part3]);
2057            let modified = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2058
2059            let base_data = write_with_cdc_options(
2060                &[&base],
2061                CDC_MIN_CHUNK_SIZE,
2062                CDC_MAX_CHUNK_SIZE,
2063                Some(ROW_GROUP_LENGTH),
2064                false,
2065            );
2066            let mod_data = write_with_cdc_options(
2067                &[&modified],
2068                CDC_MIN_CHUNK_SIZE,
2069                CDC_MAX_CHUNK_SIZE,
2070                Some(ROW_GROUP_LENGTH),
2071                false,
2072            );
2073
2074            for col in 0..s.fields().len() {
2075                let nullable = s.field(col).is_nullable();
2076                let base_info = get_column_info(&base_data, col);
2077                let mod_info = get_column_info(&mod_data, col);
2078
2079                assert_eq!(base_info.len(), 7);
2080                assert_eq!(mod_info.len(), 7);
2081
2082                // Validate CDC chunk sizes on the first row group
2083                assert_cdc_chunk_sizes(
2084                    &base.column(col).slice(0, ROW_GROUP_LENGTH),
2085                    &base_info[0],
2086                    nullable,
2087                    CDC_MIN_CHUNK_SIZE,
2088                    CDC_MAX_CHUNK_SIZE,
2089                    false,
2090                );
2091
2092                // All row groups except last should be identical
2093                for i in 0..base_info.len() - 1 {
2094                    assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2095                }
2096
2097                // Last row group: pages should be identical except last
2098                let bp = &base_info.last().unwrap().page_lengths;
2099                let mp = &mod_info.last().unwrap().page_lengths;
2100                assert!(mp.len() >= bp.len());
2101                for i in 0..bp.len() - 1 {
2102                    assert_eq!(bp[i], mp[i]);
2103                }
2104            }
2105        }
2106    }
2107
2108    // --- Direct chunker test (kept from original) ---
2109
2110    #[test]
2111    fn test_cdc_array_offsets_direct() {
2112        use crate::basic::Type as PhysicalType;
2113        use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
2114
2115        let options = CdcOptions {
2116            min_chunk_size: CDC_MIN_CHUNK_SIZE,
2117            max_chunk_size: CDC_MAX_CHUNK_SIZE,
2118            norm_level: 0,
2119        };
2120        let desc = {
2121            let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
2122                .build()
2123                .unwrap();
2124            ColumnDescriptor::new(Arc::new(tp), 0, 0, ColumnPath::new(vec![]))
2125        };
2126
2127        let bpr = bytes_per_record(&DataType::Int32, false);
2128        let n = CDC_PART_SIZE / bpr;
2129        let offset = 10usize;
2130
2131        let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as i32).collect();
2132        let mut chunker = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2133        let chunks = chunker.get_arrow_chunks(None, None, &array).unwrap();
2134
2135        let sliced = array.slice(offset, n - offset);
2136        let mut chunker2 = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2137        let chunks2 = chunker2.get_arrow_chunks(None, None, &sliced).unwrap();
2138
2139        let values: Vec<usize> = chunks.iter().map(|c| c.num_values).collect();
2140        let values2: Vec<usize> = chunks2.iter().map(|c| c.num_values).collect();
2141
2142        assert!(values.len() > 1, "expected multiple chunks, got {values:?}");
2143        assert_eq!(values.len(), values2.len(), "chunk count must match");
2144
2145        assert_eq!(
2146            values[0] - values2[0],
2147            offset,
2148            "offsetted first chunk should be {offset} values shorter"
2149        );
2150        assert_eq!(
2151            &values[1..],
2152            &values2[1..],
2153            "all chunks after the first must be identical"
2154        );
2155    }
2156}