Skip to main content

parquet/column/chunker/
cdc.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::{ParquetError, Result};
19use crate::file::properties::CdcOptions;
20use crate::schema::types::ColumnDescriptor;
21
22use super::CdcChunk;
23use super::cdc_generated::{GEARHASH_TABLE, NUM_GEARHASH_TABLES};
24
25/// CDC (Content-Defined Chunking) divides data into variable-sized chunks based on
26/// content rather than fixed-size boundaries.
27///
28/// For example, given this sequence of values in a column:
29///
30/// ```text
31/// File1:    [1,2,3,   4,5,6,   7,8,9]
32///            chunk1   chunk2   chunk3
33/// ```
34///
35/// If a value is inserted between 3 and 4:
36///
37/// ```text
38/// File2:    [1,2,3,0,   4,5,6,   7,8,9]
39///            new-chunk  chunk2   chunk3
40/// ```
41///
42/// The chunking process adjusts to maintain stable boundaries across data modifications.
43/// Each chunk defines a new parquet data page which is contiguously written to the file.
44/// Since each page is compressed independently, the files' contents look like:
45///
46/// ```text
47/// File1:    [Page1][Page2][Page3]...
48/// File2:    [Page4][Page2][Page3]...
49/// ```
50///
51/// When uploaded to a content-addressable storage (CAS) system, the CAS splits the byte
52/// stream into content-defined blobs with unique identifiers. Identical blobs are stored
53/// only once, so Page2 and Page3 are deduplicated across File1 and File2.
54///
55/// ## Implementation
56///
57/// Only the parquet writer needs to be aware of content-defined chunking; the reader is
58/// unaffected. Each parquet column writer holds a `ContentDefinedChunker` instance
59/// depending on the writer's properties. The chunker's state is maintained across the
60/// entire column without being reset between pages and row groups.
61///
62/// This implements a [FastCDC]-inspired algorithm using gear hashing. The input data is
63/// fed byte-by-byte into a rolling hash; when the hash matches a predefined mask, a new
64/// chunk boundary candidate is recorded. To reduce the exponential variance of chunk
65/// sizes inherent in a single gear hash, the algorithm requires **8 consecutive mask
66/// matches** — each against a different pre-computed gear hash table — before committing
67/// to a boundary. This [central-limit-theorem normalization] makes the chunk size
68/// distribution approximately normal between `min_chunk_size` and `max_chunk_size`.
69///
70/// The chunker receives the record-shredded column data (def_levels, rep_levels, values)
71/// and iterates over the (def_level, rep_level, value) triplets while adjusting the
72/// column-global rolling hash. Whenever the rolling hash matches, the chunker creates a
73/// new chunk. For nested data (lists, maps, structs) chunk boundaries are restricted to
74/// top-level record boundaries (`rep_level == 0`) so that a nested row is never split
75/// across chunks.
76///
77/// Note that boundaries are deterministically calculated exclusively based on the data
78/// itself, so the same data always produces the same chunks given the same configuration.
79///
80/// Ported from the C++ implementation in apache/arrow#45360
81/// (`cpp/src/parquet/chunker_internal.cc`).
82///
83/// [FastCDC]: https://www.usenix.org/conference/atc16/technical-sessions/presentation/xia
84/// [central-limit-theorem normalization]: https://www.cidrdb.org/cidr2023/papers/p43-low.pdf
85#[derive(Debug)]
86pub(crate) struct ContentDefinedChunker {
87    /// Maximum definition level for this column.
88    max_def_level: i16,
89    /// Maximum repetition level for this column.
90    max_rep_level: i16,
91    /// Definition level at the nearest REPEATED ancestor.
92    repeated_ancestor_def_level: i16,
93
94    /// Minimum chunk size in bytes.
95    /// The rolling hash will not be updated until this size is reached for each chunk.
96    /// All data sent through the hash function counts towards the chunk size, including
97    /// definition and repetition levels if present.
98    min_chunk_size: i64,
99    /// Maximum chunk size in bytes.
100    /// A new chunk is created whenever the chunk size exceeds this value. The chunk size
101    /// distribution approximates a normal distribution between `min_chunk_size` and
102    /// `max_chunk_size`. Note that the parquet writer has a related `data_pagesize`
103    /// property that controls the maximum size of a parquet data page after encoding.
104    /// While setting `data_pagesize` smaller than `max_chunk_size` doesn't affect
105    /// chunking effectiveness, it results in more small parquet data pages.
106    max_chunk_size: i64,
107    /// Mask for matching against the rolling hash.
108    rolling_hash_mask: u64,
109
110    /// Rolling hash state, never reset — initialized once for the entire column.
111    rolling_hash: u64,
112    /// Whether the rolling hash has matched the mask since the last chunk boundary.
113    has_matched: bool,
114    /// Current run count for the central-limit-theorem normalization.
115    nth_run: usize,
116    /// Current chunk size in bytes.
117    chunk_size: i64,
118}
119
120impl ContentDefinedChunker {
121    pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self> {
122        let rolling_hash_mask = Self::calculate_mask(
123            options.min_chunk_size as i64,
124            options.max_chunk_size as i64,
125            options.norm_level,
126        )?;
127        Ok(Self {
128            max_def_level: desc.max_def_level(),
129            max_rep_level: desc.max_rep_level(),
130            repeated_ancestor_def_level: desc.repeated_ancestor_def_level(),
131            min_chunk_size: options.min_chunk_size as i64,
132            max_chunk_size: options.max_chunk_size as i64,
133            rolling_hash_mask,
134            rolling_hash: 0,
135            has_matched: false,
136            nth_run: 0,
137            chunk_size: 0,
138        })
139    }
140
141    /// Calculate the mask used to determine chunk boundaries from the rolling hash.
142    ///
143    /// The mask is calculated so that the expected chunk size distribution approximates
144    /// a normal distribution between min and max chunk sizes.
145    fn calculate_mask(min_chunk_size: i64, max_chunk_size: i64, norm_level: i32) -> Result<u64> {
146        if min_chunk_size < 0 {
147            return Err(ParquetError::General(
148                "min_chunk_size must be non-negative".to_string(),
149            ));
150        }
151        if max_chunk_size <= min_chunk_size {
152            return Err(ParquetError::General(
153                "max_chunk_size must be greater than min_chunk_size".to_string(),
154            ));
155        }
156
157        let avg_chunk_size = (min_chunk_size + max_chunk_size) / 2;
158        // Target size after subtracting the min-size skip window and dividing by the
159        // number of hash tables (for central-limit-theorem normalization).
160        let target_size = (avg_chunk_size - min_chunk_size) / NUM_GEARHASH_TABLES as i64;
161
162        // floor(log2(target_size)) — equivalent to C++ NumRequiredBits(target_size) - 1
163        let mask_bits = if target_size > 0 {
164            63 - target_size.leading_zeros() as i32
165        } else {
166            0
167        };
168
169        let effective_bits = mask_bits - norm_level;
170
171        if !(1..=63).contains(&effective_bits) {
172            return Err(ParquetError::General(format!(
173                "The number of bits in the CDC mask must be between 1 and 63, got {effective_bits}"
174            )));
175        }
176
177        // Create the mask by setting the top `effective_bits` bits.
178        Ok(u64::MAX << (64 - effective_bits))
179    }
180
181    /// Feed raw bytes into the rolling hash.
182    ///
183    /// The byte count always accumulates toward `chunk_size`, but the actual hash
184    /// update is skipped until `min_chunk_size` has been reached. This "skip window"
185    /// is the FastCDC optimization that prevents boundaries from appearing too early
186    /// in a chunk.
187    #[inline]
188    fn roll(&mut self, bytes: &[u8]) {
189        self.chunk_size += bytes.len() as i64;
190        if self.chunk_size < self.min_chunk_size {
191            return;
192        }
193        for &b in bytes {
194            self.rolling_hash = self
195                .rolling_hash
196                .wrapping_shl(1)
197                .wrapping_add(GEARHASH_TABLE[self.nth_run][b as usize]);
198            self.has_matched =
199                self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
200        }
201    }
202
203    /// Feed exactly `N` bytes into the rolling hash (compile-time width).
204    ///
205    /// Like [`roll`](Self::roll), but the byte count is known at compile time,
206    /// allowing the compiler to unroll the inner loop.
207    #[inline(always)]
208    fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N]) {
209        self.chunk_size += N as i64;
210        if self.chunk_size < self.min_chunk_size {
211            return;
212        }
213        for j in 0..N {
214            self.rolling_hash = self
215                .rolling_hash
216                .wrapping_shl(1)
217                .wrapping_add(GEARHASH_TABLE[self.nth_run][bytes[j] as usize]);
218            self.has_matched =
219                self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
220        }
221    }
222
223    /// Feed a definition or repetition level (i16) into the rolling hash.
224    #[inline]
225    fn roll_level(&mut self, level: i16) {
226        self.roll_fixed(&level.to_le_bytes());
227    }
228
229    /// Check whether a new chunk boundary should be created.
230    ///
231    /// A boundary is created when **either** of two conditions holds:
232    ///
233    /// 1. **CLT normalization**: The rolling hash has matched the mask (`has_matched`)
234    ///    *and* this is the 8th consecutive such match (`nth_run` reaches
235    ///    `NUM_GEARHASH_TABLES`). Each match advances to the next gear hash table, so
236    ///    8 independent matches are required. A single hash table would yield
237    ///    exponentially distributed chunk sizes; requiring 8 independent matches
238    ///    approximates a normal (Gaussian) distribution by the central limit theorem.
239    ///
240    /// 2. **Hard size limit**: `chunk_size` has reached `max_chunk_size`. This caps
241    ///    chunk size even if the CLT normalization sequence has not completed.
242    ///
243    /// Note: when `max_chunk_size` forces a boundary, `nth_run` is **not** reset, so
244    /// the CLT sequence continues from where it left off in the next chunk. This
245    /// matches the C++ behavior.
246    #[inline]
247    fn need_new_chunk(&mut self) -> bool {
248        if self.has_matched {
249            self.has_matched = false;
250            self.nth_run += 1;
251            if self.nth_run >= NUM_GEARHASH_TABLES {
252                self.nth_run = 0;
253                self.chunk_size = 0;
254                return true;
255            }
256        }
257        if self.chunk_size >= self.max_chunk_size {
258            self.chunk_size = 0;
259            return true;
260        }
261        false
262    }
263
264    /// Compute chunk boundaries for the given column data.
265    ///
266    /// The chunking state is maintained across the entire column without being
267    /// reset between pages and row groups. This enables the chunking process to
268    /// be continued between different write calls.
269    ///
270    /// We go over the (def_level, rep_level, value) triplets one by one while
271    /// adjusting the column-global rolling hash based on the triplet. Whenever
272    /// the rolling hash matches a predefined mask it sets `has_matched` to true.
273    ///
274    /// After each triplet [`need_new_chunk`](Self::need_new_chunk) is called to
275    /// evaluate if we need to create a new chunk.
276    fn calculate<F>(
277        &mut self,
278        def_levels: Option<&[i16]>,
279        rep_levels: Option<&[i16]>,
280        num_levels: usize,
281        mut roll_value: F,
282    ) -> Vec<CdcChunk>
283    where
284        F: FnMut(&mut Self, usize),
285    {
286        let has_def_levels = self.max_def_level > 0;
287        let has_rep_levels = self.max_rep_level > 0;
288
289        let mut chunks = Vec::new();
290        let mut prev_offset: usize = 0;
291        let mut prev_value_offset: usize = 0;
292        let mut value_offset: usize = 0;
293
294        if !has_rep_levels && !has_def_levels {
295            // Fastest path: non-nested, non-null data.
296            // Every level corresponds to exactly one non-null value, so
297            // value_offset == level_offset and num_values == num_levels.
298            //
299            // Example: required Int32, array = [10, 20, 30]
300            //   level:         0   1   2
301            //   value_offset:  0   1   2
302            for offset in 0..num_levels {
303                roll_value(self, offset);
304                if self.need_new_chunk() {
305                    chunks.push(CdcChunk {
306                        level_offset: prev_offset,
307                        num_levels: offset - prev_offset,
308                        value_offset: prev_offset,
309                        num_values: offset - prev_offset,
310                    });
311                    prev_offset = offset;
312                }
313            }
314            prev_value_offset = prev_offset;
315            value_offset = num_levels;
316        } else if !has_rep_levels {
317            // Non-nested data with nulls. value_offset only increments for
318            // non-null values (def == max_def), so it diverges from the
319            // level offset when nulls are present.
320            //
321            // Example: optional Int32, array = [1, null, 2, null, 3]
322            //   def_levels:    [1, 0, 1, 0, 1]
323            //   level:          0  1  2  3  4
324            //   value_offset:   0     1     2  (only increments on def==1)
325            let def_levels = def_levels.expect("def_levels required when max_def_level > 0");
326            #[allow(clippy::needless_range_loop)]
327            for offset in 0..num_levels {
328                let def_level = def_levels[offset];
329                self.roll_level(def_level);
330                if def_level == self.max_def_level {
331                    // For non-nested data, the leaf array has one slot per
332                    // level (nulls are array elements), so `offset` (the
333                    // level index) is the correct array index for hashing.
334                    roll_value(self, offset);
335                }
336                // Check boundary before incrementing value_offset so that
337                // num_values reflects only entries in the completed chunk.
338                if self.need_new_chunk() {
339                    chunks.push(CdcChunk {
340                        level_offset: prev_offset,
341                        num_levels: offset - prev_offset,
342                        value_offset: prev_value_offset,
343                        num_values: value_offset - prev_value_offset,
344                    });
345                    prev_offset = offset;
346                    prev_value_offset = value_offset;
347                }
348                if def_level == self.max_def_level {
349                    value_offset += 1;
350                }
351            }
352        } else {
353            // Nested data with nulls. Two counters are needed:
354            //
355            //   leaf_offset: index into the leaf values array for hashing,
356            //     incremented for all leaf slots (def >= repeated_ancestor_def_level),
357            //     including null elements.
358            //
359            //   value_offset: index into non_null_indices for chunk boundaries,
360            //     incremented only for non-null leaf values (def == max_def_level).
361            //
362            // These diverge when nullable elements exist inside lists.
363            //
364            // Example: List<Int32?> with repeated_ancestor_def_level=2, max_def=3
365            //   row 0: [1, null, 2]   (3 leaf slots, 2 non-null)
366            //   row 1: [3]            (1 leaf slot, 1 non-null)
367            //
368            //   leaf array:    [1, null, 2, 3]
369            //   def_levels:    [3,  2,   3, 3]
370            //   rep_levels:    [0,  1,   1, 0]
371            //
372            //   level  def  leaf_offset  value_offset  action
373            //   ─────  ───  ───────────  ────────────  ──────────────────────────
374            //     0     3       0             0        roll_value(0), value++, leaf++
375            //     1     2       1             1        leaf++ only (null element)
376            //     2     3       2             1        roll_value(2), value++, leaf++
377            //     3     3       3             2        roll_value(3), value++, leaf++
378            //
379            // roll_value(2) correctly indexes leaf array position 2 (value "2").
380            // Using value_offset=1 would index position 1 (the null slot).
381            //
382            // Using value_offset for roll_value would hash the wrong array slot.
383            let def_levels = def_levels.expect("def_levels required for nested data");
384            let rep_levels = rep_levels.expect("rep_levels required for nested data");
385            let mut leaf_offset: usize = 0;
386
387            for offset in 0..num_levels {
388                let def_level = def_levels[offset];
389                let rep_level = rep_levels[offset];
390
391                self.roll_level(def_level);
392                self.roll_level(rep_level);
393                if def_level == self.max_def_level {
394                    roll_value(self, leaf_offset);
395                }
396
397                // Check boundary before incrementing value_offset so that
398                // num_values reflects only entries in the completed chunk.
399                if rep_level == 0 && self.need_new_chunk() {
400                    let levels_to_write = offset - prev_offset;
401                    if levels_to_write > 0 {
402                        chunks.push(CdcChunk {
403                            level_offset: prev_offset,
404                            num_levels: levels_to_write,
405                            value_offset: prev_value_offset,
406                            num_values: value_offset - prev_value_offset,
407                        });
408                        prev_offset = offset;
409                        prev_value_offset = value_offset;
410                    }
411                }
412                if def_level == self.max_def_level {
413                    value_offset += 1;
414                }
415                if def_level >= self.repeated_ancestor_def_level {
416                    leaf_offset += 1;
417                }
418            }
419        }
420
421        // Add the last chunk if we have any levels left.
422        if prev_offset < num_levels {
423            chunks.push(CdcChunk {
424                level_offset: prev_offset,
425                num_levels: num_levels - prev_offset,
426                value_offset: prev_value_offset,
427                num_values: value_offset - prev_value_offset,
428            });
429        }
430
431        #[cfg(debug_assertions)]
432        self.validate_chunks(&chunks, num_levels, value_offset);
433
434        chunks
435    }
436
437    /// Compute CDC chunk boundaries by dispatching on the Arrow array's data type
438    /// to feed value bytes into the rolling hash.
439    #[cfg(feature = "arrow")]
440    pub(crate) fn get_arrow_chunks(
441        &mut self,
442        def_levels: Option<&[i16]>,
443        rep_levels: Option<&[i16]>,
444        array: &dyn arrow_array::Array,
445    ) -> Result<Vec<CdcChunk>> {
446        use arrow_array::cast::AsArray;
447        use arrow_schema::DataType;
448
449        let num_levels = match def_levels {
450            Some(def_levels) => def_levels.len(),
451            None => array.len(),
452        };
453
454        macro_rules! fixed_width {
455            ($N:literal) => {{
456                let data = array.to_data();
457                let buffer = data.buffers()[0].as_slice();
458                let values = &buffer[data.offset() * $N..];
459                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
460                    let offset = i * $N;
461                    let slice = &values[offset..offset + $N];
462                    c.roll_fixed::<$N>(slice.try_into().unwrap());
463                })
464            }};
465        }
466
467        macro_rules! binary_like {
468            ($a:expr) => {{
469                let a = $a;
470                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
471                    c.roll(a.value(i).as_ref());
472                })
473            }};
474        }
475
476        let dtype = array.data_type();
477        let chunks = match dtype {
478            DataType::Null => self.calculate(def_levels, rep_levels, num_levels, |_, _| {}),
479            DataType::Boolean => {
480                let a = array.as_boolean();
481                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
482                    c.roll_fixed(&[a.value(i) as u8]);
483                })
484            }
485            DataType::Int8 | DataType::UInt8 => fixed_width!(1),
486            DataType::Int16 | DataType::UInt16 | DataType::Float16 => fixed_width!(2),
487            DataType::Int32
488            | DataType::UInt32
489            | DataType::Float32
490            | DataType::Date32
491            | DataType::Time32(_)
492            | DataType::Interval(arrow_schema::IntervalUnit::YearMonth)
493            | DataType::Decimal32(_, _) => fixed_width!(4),
494            DataType::Int64
495            | DataType::UInt64
496            | DataType::Float64
497            | DataType::Date64
498            | DataType::Time64(_)
499            | DataType::Timestamp(_, _)
500            | DataType::Duration(_)
501            | DataType::Interval(arrow_schema::IntervalUnit::DayTime)
502            | DataType::Decimal64(_, _) => fixed_width!(8),
503            DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
504            | DataType::Decimal128(_, _) => fixed_width!(16),
505            DataType::Decimal256(_, _) => fixed_width!(32),
506            DataType::FixedSizeBinary(_) => binary_like!(array.as_fixed_size_binary()),
507            DataType::Binary => binary_like!(array.as_binary::<i32>()),
508            DataType::LargeBinary => binary_like!(array.as_binary::<i64>()),
509            DataType::Utf8 => binary_like!(array.as_string::<i32>()),
510            DataType::LargeUtf8 => binary_like!(array.as_string::<i64>()),
511            DataType::BinaryView => binary_like!(array.as_binary_view()),
512            DataType::Utf8View => binary_like!(array.as_string_view()),
513            DataType::Dictionary(_, _) => {
514                let dict = array.as_any_dictionary();
515                self.get_arrow_chunks(def_levels, rep_levels, dict.keys())?
516            }
517            _ => {
518                return Err(ParquetError::General(format!(
519                    "content-defined chunking is not supported for data type {dtype:?}",
520                )));
521            }
522        };
523        Ok(chunks)
524    }
525
526    #[cfg(debug_assertions)]
527    fn validate_chunks(&self, chunks: &[CdcChunk], num_levels: usize, total_values: usize) {
528        assert!(!chunks.is_empty(), "chunks must be non-empty");
529
530        let first = &chunks[0];
531        assert_eq!(first.level_offset, 0, "first chunk must start at level 0");
532        assert_eq!(first.value_offset, 0, "first chunk must start at value 0");
533
534        let mut sum_levels = first.num_levels;
535        let mut sum_values = first.num_values;
536        for i in 1..chunks.len() {
537            let chunk = &chunks[i];
538            let prev = &chunks[i - 1];
539            assert!(chunk.num_levels > 0, "chunk must have levels");
540            assert_eq!(
541                chunk.level_offset,
542                prev.level_offset + prev.num_levels,
543                "level offsets must be contiguous"
544            );
545            assert_eq!(
546                chunk.value_offset,
547                prev.value_offset + prev.num_values,
548                "value offsets must be contiguous"
549            );
550            sum_levels += chunk.num_levels;
551            sum_values += chunk.num_values;
552        }
553        assert_eq!(sum_levels, num_levels, "chunks must cover all levels");
554        assert_eq!(sum_values, total_values, "chunks must cover all values");
555
556        let last = chunks.last().unwrap();
557        assert_eq!(
558            last.level_offset + last.num_levels,
559            num_levels,
560            "last chunk must end at num_levels"
561        );
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use crate::basic::Type as PhysicalType;
569    use crate::schema::types::{ColumnPath, Type};
570    use std::sync::Arc;
571
572    fn make_desc(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
573        let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
574            .build()
575            .unwrap();
576        ColumnDescriptor::new(
577            Arc::new(tp),
578            max_def_level,
579            max_rep_level,
580            ColumnPath::new(vec![]),
581        )
582    }
583
584    #[test]
585    fn test_calculate_mask_defaults() {
586        let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 0).unwrap();
587        // avg = 640 KiB, target = (640-256)*1024/8 = 49152, log2(49152) = 15
588        // mask = u64::MAX << (64 - 15) = top 15 bits set
589        let expected = u64::MAX << (64 - 15);
590        assert_eq!(mask, expected);
591    }
592
593    #[test]
594    fn test_calculate_mask_with_norm_level() {
595        let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 1).unwrap();
596        let expected = u64::MAX << (64 - 14);
597        assert_eq!(mask, expected);
598    }
599
600    #[test]
601    fn test_calculate_mask_invalid() {
602        assert!(ContentDefinedChunker::calculate_mask(-1, 100, 0).is_err());
603        assert!(ContentDefinedChunker::calculate_mask(100, 50, 0).is_err());
604        assert!(ContentDefinedChunker::calculate_mask(100, 100, 0).is_err());
605    }
606
607    #[test]
608    fn test_non_nested_non_null_single_chunk() {
609        let options = CdcOptions {
610            min_chunk_size: 8,
611            max_chunk_size: 1024,
612            norm_level: 0,
613        };
614        let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
615
616        // Write a small amount of data — should produce exactly 1 chunk.
617        let num_values = 4;
618        let chunks = chunker.calculate(None, None, num_values, |c, i| {
619            c.roll_fixed::<4>(&(i as i32).to_le_bytes());
620        });
621        assert_eq!(chunks.len(), 1);
622        assert_eq!(chunks[0].level_offset, 0);
623        assert_eq!(chunks[0].value_offset, 0);
624        assert_eq!(chunks[0].num_levels, 4);
625    }
626
627    #[test]
628    fn test_max_chunk_size_forces_boundary() {
629        let options = CdcOptions {
630            min_chunk_size: 256,
631            max_chunk_size: 1024,
632            norm_level: 0,
633        };
634        let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
635
636        // Write enough data to exceed max_chunk_size multiple times.
637        // Each i32 = 4 bytes, max_chunk_size=1024, so ~256 values per chunk max.
638        let num_values = 2000;
639        let chunks = chunker.calculate(None, None, num_values, |c, i| {
640            c.roll_fixed::<4>(&(i as i32).to_le_bytes());
641        });
642
643        // Should have multiple chunks
644        assert!(chunks.len() > 1);
645
646        // Verify contiguity
647        let mut total_levels = 0;
648        for (i, chunk) in chunks.iter().enumerate() {
649            assert_eq!(chunk.level_offset, total_levels);
650            if i < chunks.len() - 1 {
651                assert!(chunk.num_levels > 0);
652            }
653            total_levels += chunk.num_levels;
654        }
655        assert_eq!(total_levels, num_values);
656    }
657
658    #[test]
659    fn test_deterministic_chunks() {
660        let options = CdcOptions {
661            min_chunk_size: 4,
662            max_chunk_size: 64,
663            norm_level: 0,
664        };
665
666        let roll = |c: &mut ContentDefinedChunker, i: usize| {
667            c.roll_fixed::<8>(&(i as i64).to_le_bytes());
668        };
669
670        let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
671        let chunks1 = chunker1.calculate(None, None, 200, roll);
672
673        let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
674        let chunks2 = chunker2.calculate(None, None, 200, roll);
675
676        assert_eq!(chunks1.len(), chunks2.len());
677        for (a, b) in chunks1.iter().zip(chunks2.iter()) {
678            assert_eq!(a.level_offset, b.level_offset);
679            assert_eq!(a.num_levels, b.num_levels);
680            assert_eq!(a.value_offset, b.value_offset);
681            assert_eq!(a.num_values, b.num_values);
682        }
683    }
684
685    #[test]
686    fn test_nullable_non_nested() {
687        let options = CdcOptions {
688            min_chunk_size: 4,
689            max_chunk_size: 64,
690            norm_level: 0,
691        };
692        let mut chunker = ContentDefinedChunker::new(&make_desc(1, 0), &options).unwrap();
693
694        let num_levels = 20;
695        // def_level=1 means non-null, def_level=0 means null
696        // Pattern: null at indices 0, 3, 6, 9, 12, 15, 18 → 7 nulls, 13 non-null
697        let def_levels: Vec<i16> = (0..num_levels)
698            .map(|i| if i % 3 == 0 { 0 } else { 1 })
699            .collect();
700        let expected_non_null: usize = def_levels.iter().filter(|&&d| d == 1).count();
701
702        let chunks = chunker.calculate(Some(&def_levels), None, num_levels, |c, i| {
703            c.roll_fixed::<4>(&(i as i32).to_le_bytes());
704        });
705
706        assert!(!chunks.is_empty());
707        let total_levels: usize = chunks.iter().map(|c| c.num_levels).sum();
708        let total_values: usize = chunks.iter().map(|c| c.num_values).sum();
709        assert_eq!(total_levels, num_levels);
710        assert_eq!(total_values, expected_non_null);
711        // With nulls present, total_values < total_levels
712        assert!(total_values < total_levels);
713    }
714}
715
716/// Integration tests that exercise CDC through the Arrow writer/reader roundtrip.
717/// Ported from the C++ test suite in `chunker_internal_test.cc`.
718#[cfg(all(test, feature = "arrow"))]
719mod arrow_tests {
720    use std::borrow::Borrow;
721    use std::sync::Arc;
722
723    use arrow::util::data_gen::create_random_batch;
724    use arrow_array::cast::AsArray;
725    use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
726    use arrow_buffer::Buffer;
727    use arrow_data::ArrayData;
728    use arrow_schema::{DataType, Field, Fields, Schema};
729
730    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
731    use crate::arrow::arrow_writer::ArrowWriter;
732    use crate::file::properties::{CdcOptions, WriterProperties};
733    use crate::file::reader::{FileReader, SerializedFileReader};
734
735    // --- Constants matching C++ TestCDCSingleRowGroup ---
736
737    const CDC_MIN_CHUNK_SIZE: usize = 4 * 1024;
738    const CDC_MAX_CHUNK_SIZE: usize = 16 * 1024;
739    const CDC_PART_SIZE: usize = 128 * 1024;
740    const CDC_EDIT_SIZE: usize = 128;
741    const CDC_ROW_GROUP_LENGTH: usize = 1024 * 1024;
742
743    // --- Helpers ---
744
745    /// Deterministic hash function matching the C++ test generator.
746    fn test_hash(seed: u64, index: u64) -> u64 {
747        let mut h = (index.wrapping_add(seed)).wrapping_mul(0xc4ceb9fe1a85ec53u64);
748        h ^= h >> 33;
749        h = h.wrapping_mul(0xff51afd7ed558ccdu64);
750        h ^= h >> 33;
751        h = h.wrapping_mul(0xc4ceb9fe1a85ec53u64);
752        h ^= h >> 33;
753        h
754    }
755
756    /// Generate a deterministic array for any supported data type, matching C++ `GenerateArray`.
757    fn generate_array(dtype: &DataType, nullable: bool, length: usize, seed: u64) -> ArrayRef {
758        macro_rules! gen_primitive {
759            ($array_type:ty, $cast:expr) => {{
760                if nullable {
761                    let arr: $array_type = (0..length)
762                        .map(|i| {
763                            let val = test_hash(seed, i as u64);
764                            if val % 10 == 0 {
765                                None
766                            } else {
767                                Some($cast(val))
768                            }
769                        })
770                        .collect();
771                    Arc::new(arr) as ArrayRef
772                } else {
773                    let arr: $array_type = (0..length)
774                        .map(|i| Some($cast(test_hash(seed, i as u64))))
775                        .collect();
776                    Arc::new(arr) as ArrayRef
777                }
778            }};
779        }
780
781        match dtype {
782            DataType::Boolean => {
783                if nullable {
784                    let arr: BooleanArray = (0..length)
785                        .map(|i| {
786                            let val = test_hash(seed, i as u64);
787                            if val % 10 == 0 {
788                                None
789                            } else {
790                                Some(val % 2 == 0)
791                            }
792                        })
793                        .collect();
794                    Arc::new(arr)
795                } else {
796                    let arr: BooleanArray = (0..length)
797                        .map(|i| Some(test_hash(seed, i as u64) % 2 == 0))
798                        .collect();
799                    Arc::new(arr)
800                }
801            }
802            DataType::Int32 => gen_primitive!(Int32Array, |v: u64| v as i32),
803            DataType::Int64 => {
804                gen_primitive!(arrow_array::Int64Array, |v: u64| v as i64)
805            }
806            DataType::Float64 => {
807                gen_primitive!(arrow_array::Float64Array, |v: u64| (v % 100000) as f64
808                    / 1000.0)
809            }
810            DataType::Utf8 => {
811                let arr: arrow_array::StringArray = if nullable {
812                    (0..length)
813                        .map(|i| {
814                            let val = test_hash(seed, i as u64);
815                            if val % 10 == 0 {
816                                None
817                            } else {
818                                Some(format!("str_{val}"))
819                            }
820                        })
821                        .collect()
822                } else {
823                    (0..length)
824                        .map(|i| Some(format!("str_{}", test_hash(seed, i as u64))))
825                        .collect()
826                };
827                Arc::new(arr)
828            }
829            DataType::Binary => {
830                let arr: arrow_array::BinaryArray = if nullable {
831                    (0..length)
832                        .map(|i| {
833                            let val = test_hash(seed, i as u64);
834                            if val % 10 == 0 {
835                                None
836                            } else {
837                                Some(format!("bin_{val}").into_bytes())
838                            }
839                        })
840                        .collect()
841                } else {
842                    (0..length)
843                        .map(|i| Some(format!("bin_{}", test_hash(seed, i as u64)).into_bytes()))
844                        .collect()
845                };
846                Arc::new(arr)
847            }
848            DataType::FixedSizeBinary(size) => {
849                let size = *size;
850                let mut builder = arrow_array::builder::FixedSizeBinaryBuilder::new(size);
851                for i in 0..length {
852                    let val = test_hash(seed, i as u64);
853                    if nullable && val % 10 == 0 {
854                        builder.append_null();
855                    } else {
856                        let s = format!("bin_{val}");
857                        let bytes = s.as_bytes();
858                        let mut buf = vec![0u8; size as usize];
859                        let copy_len = bytes.len().min(size as usize);
860                        buf[..copy_len].copy_from_slice(&bytes[..copy_len]);
861                        builder.append_value(&buf).unwrap();
862                    }
863                }
864                Arc::new(builder.finish())
865            }
866            DataType::Date32 => {
867                gen_primitive!(arrow_array::Date32Array, |v: u64| v as i32)
868            }
869            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
870                gen_primitive!(arrow_array::TimestampNanosecondArray, |v: u64| v as i64)
871            }
872            _ => panic!("Unsupported test data type: {dtype:?}"),
873        }
874    }
875
876    /// Generate a RecordBatch with the given schema, matching C++ `GenerateTable`.
877    fn generate_table(schema: &Arc<Schema>, length: usize, seed: u64) -> RecordBatch {
878        let arrays: Vec<ArrayRef> = schema
879            .fields()
880            .iter()
881            .enumerate()
882            .map(|(i, field)| {
883                generate_array(
884                    field.data_type(),
885                    field.is_nullable(),
886                    length,
887                    seed + i as u64 * 10,
888                )
889            })
890            .collect();
891        RecordBatch::try_new(schema.clone(), arrays).unwrap()
892    }
893
894    /// Compute the CDC byte width for a data type, matching C++ `bytes_per_record`.
895    /// Returns 0 for variable-length types.
896    fn cdc_byte_width(dtype: &DataType) -> usize {
897        match dtype {
898            DataType::Boolean => 1,
899            DataType::Int8 | DataType::UInt8 => 1,
900            DataType::Int16 | DataType::UInt16 | DataType::Float16 => 2,
901            DataType::Int32
902            | DataType::UInt32
903            | DataType::Float32
904            | DataType::Date32
905            | DataType::Time32(_) => 4,
906            DataType::Int64
907            | DataType::UInt64
908            | DataType::Float64
909            | DataType::Date64
910            | DataType::Time64(_)
911            | DataType::Timestamp(_, _)
912            | DataType::Duration(_) => 8,
913            DataType::Decimal128(_, _) => 16,
914            DataType::Decimal256(_, _) => 32,
915            DataType::FixedSizeBinary(n) => *n as usize,
916            _ => 0, // variable-length
917        }
918    }
919
920    /// Compute bytes_per_record for determining part/edit lengths, matching C++.
921    fn bytes_per_record(dtype: &DataType, nullable: bool) -> usize {
922        let bw = cdc_byte_width(dtype);
923        if bw > 0 {
924            if nullable { bw + 2 } else { bw }
925        } else {
926            16 // variable-length fallback, matching C++
927        }
928    }
929
930    /// Compute the CDC chunk size for an array slice, matching C++ `CalculateCdcSize`.
931    fn calculate_cdc_size(array: &dyn Array, nullable: bool) -> i64 {
932        let dtype = array.data_type();
933        let bw = cdc_byte_width(dtype);
934        let result = if bw > 0 {
935            // Fixed-width: count only non-null values
936            let valid_count = array.len() - array.null_count();
937            (valid_count * bw) as i64
938        } else {
939            // Variable-length: sum of actual byte lengths
940            match dtype {
941                DataType::Utf8 => {
942                    let a = array.as_string::<i32>();
943                    (0..a.len())
944                        .filter(|&i| a.is_valid(i))
945                        .map(|i| a.value(i).len() as i64)
946                        .sum()
947                }
948                DataType::Binary => {
949                    let a = array.as_binary::<i32>();
950                    (0..a.len())
951                        .filter(|&i| a.is_valid(i))
952                        .map(|i| a.value(i).len() as i64)
953                        .sum()
954                }
955                DataType::LargeBinary => {
956                    let a = array.as_binary::<i64>();
957                    (0..a.len())
958                        .filter(|&i| a.is_valid(i))
959                        .map(|i| a.value(i).len() as i64)
960                        .sum()
961                }
962                _ => panic!("CDC size calculation not implemented for {dtype:?}"),
963            }
964        };
965
966        if nullable {
967            // Add 2 bytes per element for definition levels
968            result + array.len() as i64 * 2
969        } else {
970            result
971        }
972    }
973
974    /// Page-level metadata for a single column within a row group.
975    struct ColumnInfo {
976        page_lengths: Vec<i64>,
977        has_dictionary_page: bool,
978    }
979
980    /// Extract per-row-group column info from Parquet data.
981    fn get_column_info(data: &[u8], column_index: usize) -> Vec<ColumnInfo> {
982        let reader = SerializedFileReader::new(bytes::Bytes::from(data.to_vec())).unwrap();
983        let metadata = reader.metadata();
984        let mut result = Vec::new();
985        for rg in 0..metadata.num_row_groups() {
986            let rg_reader = reader.get_row_group(rg).unwrap();
987            let col_reader = rg_reader.get_column_page_reader(column_index).unwrap();
988            let mut info = ColumnInfo {
989                page_lengths: Vec::new(),
990                has_dictionary_page: false,
991            };
992            for page in col_reader {
993                let page = page.unwrap();
994                match page.page_type() {
995                    crate::basic::PageType::DATA_PAGE | crate::basic::PageType::DATA_PAGE_V2 => {
996                        info.page_lengths.push(page.num_values() as i64);
997                    }
998                    crate::basic::PageType::DICTIONARY_PAGE => {
999                        info.has_dictionary_page = true;
1000                    }
1001                    _ => {}
1002                }
1003            }
1004            result.push(info);
1005        }
1006        result
1007    }
1008
1009    /// Assert that CDC chunk sizes are within the expected range.
1010    /// Equivalent to C++ `AssertContentDefinedChunkSizes`.
1011    fn assert_cdc_chunk_sizes(
1012        array: &ArrayRef,
1013        info: &ColumnInfo,
1014        nullable: bool,
1015        min_chunk_size: usize,
1016        max_chunk_size: usize,
1017        expect_dictionary_page: bool,
1018    ) {
1019        // Boolean and FixedSizeBinary never produce dictionary pages (matching C++)
1020        let expect_dict = match array.data_type() {
1021            DataType::Boolean | DataType::FixedSizeBinary(_) => false,
1022            _ => expect_dictionary_page,
1023        };
1024        assert_eq!(
1025            info.has_dictionary_page,
1026            expect_dict,
1027            "dictionary page mismatch for {:?}",
1028            array.data_type()
1029        );
1030
1031        let page_lengths = &info.page_lengths;
1032        assert!(
1033            page_lengths.len() > 1,
1034            "CDC should produce multiple pages, got {page_lengths:?}"
1035        );
1036
1037        let bw = cdc_byte_width(array.data_type());
1038        // Only do exact CDC size validation for fixed-width and base binary-like types
1039        if bw > 0
1040            || matches!(
1041                array.data_type(),
1042                DataType::Utf8 | DataType::Binary | DataType::LargeBinary
1043            )
1044        {
1045            let mut offset = 0i64;
1046            for (i, &page_len) in page_lengths.iter().enumerate() {
1047                let slice = array.slice(offset as usize, page_len as usize);
1048                let cdc_size = calculate_cdc_size(slice.as_ref(), nullable);
1049                if i < page_lengths.len() - 1 {
1050                    assert!(
1051                        cdc_size >= min_chunk_size as i64,
1052                        "Page {i}: CDC size {cdc_size} < min {min_chunk_size}, pages={page_lengths:?}"
1053                    );
1054                }
1055                assert!(
1056                    cdc_size <= max_chunk_size as i64,
1057                    "Page {i}: CDC size {cdc_size} > max {max_chunk_size}, pages={page_lengths:?}"
1058                );
1059                offset += page_len;
1060            }
1061            assert_eq!(
1062                offset,
1063                array.len() as i64,
1064                "page lengths must sum to array length"
1065            );
1066        }
1067    }
1068
1069    /// Write batches with CDC options and validate roundtrip.
1070    /// Matches C++ `WriteTableToBuffer`.
1071    fn write_with_cdc_options(
1072        batches: &[&RecordBatch],
1073        min_chunk_size: usize,
1074        max_chunk_size: usize,
1075        max_row_group_rows: Option<usize>,
1076        enable_dictionary: bool,
1077    ) -> Vec<u8> {
1078        assert!(!batches.is_empty());
1079        let schema = batches[0].schema();
1080        let mut builder = WriterProperties::builder()
1081            .set_dictionary_enabled(enable_dictionary)
1082            .set_content_defined_chunking(Some(CdcOptions {
1083                min_chunk_size,
1084                max_chunk_size,
1085                norm_level: 0,
1086            }));
1087        if let Some(max_rows) = max_row_group_rows {
1088            builder = builder.set_max_row_group_row_count(Some(max_rows));
1089        }
1090        let props = builder.build();
1091        let mut buf = Vec::new();
1092        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
1093        for batch in batches {
1094            writer.write(batch).unwrap();
1095        }
1096        writer.close().unwrap();
1097
1098        // Roundtrip validation (matching C++ WriteTableToBuffer)
1099        let readback = read_batches(&buf);
1100        let original_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1101        let readback_rows: usize = readback.iter().map(|b| b.num_rows()).sum();
1102        assert_eq!(original_rows, readback_rows, "Roundtrip row count mismatch");
1103        if original_rows > 0 {
1104            let original = concat_batches(batches.iter().copied());
1105            let roundtrip = concat_batches(&readback);
1106            assert_eq!(original, roundtrip, "Roundtrip validation failed");
1107        }
1108
1109        buf
1110    }
1111
1112    fn read_batches(data: &[u8]) -> Vec<RecordBatch> {
1113        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(data.to_vec()))
1114            .unwrap()
1115            .build()
1116            .unwrap();
1117        reader.collect::<std::result::Result<Vec<_>, _>>().unwrap()
1118    }
1119
1120    fn concat_batches(batches: impl IntoIterator<Item = impl Borrow<RecordBatch>>) -> RecordBatch {
1121        let batches: Vec<_> = batches.into_iter().collect();
1122        let schema = batches[0].borrow().schema();
1123        let batches = batches.iter().map(|b| b.borrow());
1124        arrow_select::concat::concat_batches(&schema, batches).unwrap()
1125    }
1126
1127    /// LCS-based diff between two sequences of page lengths (ported from C++).
1128    /// Includes the merge-adjacent-diffs post-processing from C++.
1129    fn find_differences(first: &[i64], second: &[i64]) -> Vec<(Vec<i64>, Vec<i64>)> {
1130        let n = first.len();
1131        let m = second.len();
1132        let mut dp = vec![vec![0usize; m + 1]; n + 1];
1133        for i in 0..n {
1134            for j in 0..m {
1135                if first[i] == second[j] {
1136                    dp[i + 1][j + 1] = dp[i][j] + 1;
1137                } else {
1138                    dp[i + 1][j + 1] = dp[i + 1][j].max(dp[i][j + 1]);
1139                }
1140            }
1141        }
1142        let mut common = Vec::new();
1143        let (mut i, mut j) = (n, m);
1144        while i > 0 && j > 0 {
1145            if first[i - 1] == second[j - 1] {
1146                common.push((i - 1, j - 1));
1147                i -= 1;
1148                j -= 1;
1149            } else if dp[i - 1][j] >= dp[i][j - 1] {
1150                i -= 1;
1151            } else {
1152                j -= 1;
1153            }
1154        }
1155        common.reverse();
1156
1157        let mut result = Vec::new();
1158        let (mut last_i, mut last_j) = (0usize, 0usize);
1159        for (ci, cj) in &common {
1160            if *ci > last_i || *cj > last_j {
1161                result.push((first[last_i..*ci].to_vec(), second[last_j..*cj].to_vec()));
1162            }
1163            last_i = ci + 1;
1164            last_j = cj + 1;
1165        }
1166        if last_i < n || last_j < m {
1167            result.push((first[last_i..].to_vec(), second[last_j..].to_vec()));
1168        }
1169
1170        // Merge adjacent diffs (matching C++ post-processing)
1171        let mut merged: Vec<(Vec<i64>, Vec<i64>)> = Vec::new();
1172        for diff in result {
1173            if let Some(prev) = merged.last_mut() {
1174                if prev.0.is_empty() && diff.1.is_empty() {
1175                    prev.0 = diff.0;
1176                    continue;
1177                } else if prev.1.is_empty() && diff.0.is_empty() {
1178                    prev.1 = diff.1;
1179                    continue;
1180                }
1181            }
1182            merged.push(diff);
1183        }
1184        merged
1185    }
1186
1187    /// Assert exact page length differences between original and modified files.
1188    /// Matches C++ `AssertPageLengthDifferences` (full version).
1189    fn assert_page_length_differences(
1190        original: &ColumnInfo,
1191        modified: &ColumnInfo,
1192        exact_equal_diffs: usize,
1193        exact_larger_diffs: usize,
1194        exact_smaller_diffs: usize,
1195        edit_length: i64,
1196    ) {
1197        let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1198        let expected = exact_equal_diffs + exact_larger_diffs + exact_smaller_diffs;
1199
1200        if diffs.len() != expected {
1201            eprintln!("Original: {:?}", original.page_lengths);
1202            eprintln!("Modified: {:?}", modified.page_lengths);
1203            for d in &diffs {
1204                eprintln!("  Diff: {:?} vs {:?}", d.0, d.1);
1205            }
1206        }
1207        assert_eq!(
1208            diffs.len(),
1209            expected,
1210            "Expected {expected} diffs, got {}",
1211            diffs.len()
1212        );
1213
1214        let (mut eq, mut larger, mut smaller) = (0usize, 0usize, 0usize);
1215        for (left, right) in &diffs {
1216            let left_sum: i64 = left.iter().sum();
1217            let right_sum: i64 = right.iter().sum();
1218            if left_sum == right_sum {
1219                eq += 1;
1220            } else if left_sum < right_sum {
1221                larger += 1;
1222                assert_eq!(
1223                    left_sum + edit_length,
1224                    right_sum,
1225                    "Larger diff mismatch: {left_sum} + {edit_length} != {right_sum}"
1226                );
1227            } else {
1228                smaller += 1;
1229                assert_eq!(
1230                    left_sum,
1231                    right_sum + edit_length,
1232                    "Smaller diff mismatch: {left_sum} != {right_sum} + {edit_length}"
1233                );
1234            }
1235        }
1236
1237        assert_eq!(eq, exact_equal_diffs, "equal diffs count");
1238        assert_eq!(larger, exact_larger_diffs, "larger diffs count");
1239        assert_eq!(smaller, exact_smaller_diffs, "smaller diffs count");
1240    }
1241
1242    /// Assert page length differences for update cases (simplified version).
1243    /// Matches C++ `AssertPageLengthDifferences` (max_equal_diffs overload).
1244    fn assert_page_length_differences_update(
1245        original: &ColumnInfo,
1246        modified: &ColumnInfo,
1247        max_equal_diffs: usize,
1248    ) {
1249        let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1250        assert!(
1251            diffs.len() <= max_equal_diffs,
1252            "Expected at most {max_equal_diffs} diffs, got {}",
1253            diffs.len()
1254        );
1255        for (left, right) in &diffs {
1256            let left_sum: i64 = left.iter().sum();
1257            let right_sum: i64 = right.iter().sum();
1258            assert_eq!(
1259                left_sum, right_sum,
1260                "Update diff should not change total row count"
1261            );
1262        }
1263    }
1264
1265    // --- FindDifferences tests (ported from C++) ---
1266
1267    #[test]
1268    fn test_find_differences_basic() {
1269        let diffs = find_differences(&[1, 2, 3, 4, 5], &[1, 7, 8, 4, 5]);
1270        assert_eq!(diffs.len(), 1);
1271        assert_eq!(diffs[0].0, vec![2, 3]);
1272        assert_eq!(diffs[0].1, vec![7, 8]);
1273    }
1274
1275    #[test]
1276    fn test_find_differences_multiple() {
1277        let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7], &[1, 8, 9, 4, 10, 6, 11]);
1278        assert_eq!(diffs.len(), 3);
1279        assert_eq!(diffs[0].0, vec![2, 3]);
1280        assert_eq!(diffs[0].1, vec![8, 9]);
1281        assert_eq!(diffs[1].0, vec![5]);
1282        assert_eq!(diffs[1].1, vec![10]);
1283        assert_eq!(diffs[2].0, vec![7]);
1284        assert_eq!(diffs[2].1, vec![11]);
1285    }
1286
1287    #[test]
1288    fn test_find_differences_different_lengths() {
1289        let diffs = find_differences(&[1, 2, 3], &[1, 2, 3, 4, 5]);
1290        assert_eq!(diffs.len(), 1);
1291        assert!(diffs[0].0.is_empty());
1292        assert_eq!(diffs[0].1, vec![4, 5]);
1293    }
1294
1295    #[test]
1296    fn test_find_differences_empty() {
1297        let diffs = find_differences(&[], &[]);
1298        assert!(diffs.is_empty());
1299    }
1300
1301    #[test]
1302    fn test_find_differences_changes_at_both_ends() {
1303        let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7, 8, 9], &[0, 0, 2, 3, 4, 5, 7, 7, 8]);
1304        assert_eq!(diffs.len(), 3);
1305        assert_eq!(diffs[0].0, vec![1]);
1306        assert_eq!(diffs[0].1, vec![0, 0]);
1307        assert_eq!(diffs[1].0, vec![6]);
1308        assert_eq!(diffs[1].1, vec![7]);
1309        assert_eq!(diffs[2].0, vec![9]);
1310        assert!(diffs[2].1.is_empty());
1311    }
1312
1313    #[test]
1314    fn test_find_differences_additional() {
1315        let diffs = find_differences(
1316            &[445, 312, 393, 401, 410, 138, 558, 457],
1317            &[445, 312, 393, 393, 410, 138, 558, 457],
1318        );
1319        assert_eq!(diffs.len(), 1);
1320        assert_eq!(diffs[0].0, vec![401]);
1321        assert_eq!(diffs[0].1, vec![393]);
1322    }
1323
1324    // --- Parameterized single-row-group tests via macro ---
1325
1326    macro_rules! cdc_single_rg_tests {
1327        ($mod_name:ident, $dtype:expr, $nullable:expr) => {
1328            mod $mod_name {
1329                use super::*;
1330
1331                fn config() -> (DataType, bool, usize, usize) {
1332                    let dtype: DataType = $dtype;
1333                    let nullable: bool = $nullable;
1334                    let bpr = bytes_per_record(&dtype, nullable);
1335                    let part_length = CDC_PART_SIZE / bpr;
1336                    let edit_length = CDC_EDIT_SIZE / bpr;
1337                    (dtype, nullable, part_length, edit_length)
1338                }
1339
1340                fn make_schema(dtype: &DataType, nullable: bool) -> Arc<Schema> {
1341                    Arc::new(Schema::new(vec![Field::new("f0", dtype.clone(), nullable)]))
1342                }
1343
1344                #[test]
1345                fn delete_once() {
1346                    let (dtype, nullable, part_length, edit_length) = config();
1347                    let schema = make_schema(&dtype, nullable);
1348
1349                    let part1 = generate_table(&schema, part_length, 0);
1350                    let part2 = generate_table(&schema, edit_length, 1);
1351                    let part3 = generate_table(&schema, part_length, part_length as u64);
1352
1353                    let base = concat_batches([&part1, &part2, &part3]);
1354                    let modified = concat_batches([&part1, &part3]);
1355
1356                    for enable_dictionary in [false, true] {
1357                        let base_data = write_with_cdc_options(
1358                            &[&base],
1359                            CDC_MIN_CHUNK_SIZE,
1360                            CDC_MAX_CHUNK_SIZE,
1361                            Some(CDC_ROW_GROUP_LENGTH),
1362                            enable_dictionary,
1363                        );
1364                        let mod_data = write_with_cdc_options(
1365                            &[&modified],
1366                            CDC_MIN_CHUNK_SIZE,
1367                            CDC_MAX_CHUNK_SIZE,
1368                            Some(CDC_ROW_GROUP_LENGTH),
1369                            enable_dictionary,
1370                        );
1371
1372                        let base_info = get_column_info(&base_data, 0);
1373                        let mod_info = get_column_info(&mod_data, 0);
1374                        assert_eq!(base_info.len(), 1);
1375                        assert_eq!(mod_info.len(), 1);
1376
1377                        assert_cdc_chunk_sizes(
1378                            &base.column(0).clone(),
1379                            &base_info[0],
1380                            nullable,
1381                            CDC_MIN_CHUNK_SIZE,
1382                            CDC_MAX_CHUNK_SIZE,
1383                            enable_dictionary,
1384                        );
1385                        assert_cdc_chunk_sizes(
1386                            &modified.column(0).clone(),
1387                            &mod_info[0],
1388                            nullable,
1389                            CDC_MIN_CHUNK_SIZE,
1390                            CDC_MAX_CHUNK_SIZE,
1391                            enable_dictionary,
1392                        );
1393
1394                        assert_page_length_differences(
1395                            &base_info[0],
1396                            &mod_info[0],
1397                            0,
1398                            0,
1399                            1,
1400                            edit_length as i64,
1401                        );
1402                    }
1403                }
1404
1405                #[test]
1406                fn delete_twice() {
1407                    let (dtype, nullable, part_length, edit_length) = config();
1408                    let schema = make_schema(&dtype, nullable);
1409
1410                    let part1 = generate_table(&schema, part_length, 0);
1411                    let part2 = generate_table(&schema, edit_length, 1);
1412                    let part3 = generate_table(&schema, part_length, part_length as u64);
1413                    let part4 = generate_table(&schema, edit_length, 2);
1414                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1415
1416                    let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1417                    let modified = concat_batches([&part1, &part3, &part5]);
1418
1419                    for enable_dictionary in [false, true] {
1420                        let base_data = write_with_cdc_options(
1421                            &[&base],
1422                            CDC_MIN_CHUNK_SIZE,
1423                            CDC_MAX_CHUNK_SIZE,
1424                            Some(CDC_ROW_GROUP_LENGTH),
1425                            enable_dictionary,
1426                        );
1427                        let mod_data = write_with_cdc_options(
1428                            &[&modified],
1429                            CDC_MIN_CHUNK_SIZE,
1430                            CDC_MAX_CHUNK_SIZE,
1431                            Some(CDC_ROW_GROUP_LENGTH),
1432                            enable_dictionary,
1433                        );
1434
1435                        let base_info = get_column_info(&base_data, 0);
1436                        let mod_info = get_column_info(&mod_data, 0);
1437                        assert_eq!(base_info.len(), 1);
1438                        assert_eq!(mod_info.len(), 1);
1439
1440                        assert_cdc_chunk_sizes(
1441                            &base.column(0).clone(),
1442                            &base_info[0],
1443                            nullable,
1444                            CDC_MIN_CHUNK_SIZE,
1445                            CDC_MAX_CHUNK_SIZE,
1446                            enable_dictionary,
1447                        );
1448                        assert_cdc_chunk_sizes(
1449                            &modified.column(0).clone(),
1450                            &mod_info[0],
1451                            nullable,
1452                            CDC_MIN_CHUNK_SIZE,
1453                            CDC_MAX_CHUNK_SIZE,
1454                            enable_dictionary,
1455                        );
1456
1457                        assert_page_length_differences(
1458                            &base_info[0],
1459                            &mod_info[0],
1460                            0,
1461                            0,
1462                            2,
1463                            edit_length as i64,
1464                        );
1465                    }
1466                }
1467
1468                #[test]
1469                fn insert_once() {
1470                    let (dtype, nullable, part_length, edit_length) = config();
1471                    let schema = make_schema(&dtype, nullable);
1472
1473                    let part1 = generate_table(&schema, part_length, 0);
1474                    let part2 = generate_table(&schema, edit_length, 1);
1475                    let part3 = generate_table(&schema, part_length, part_length as u64);
1476
1477                    let base = concat_batches([&part1, &part3]);
1478                    let modified = concat_batches([&part1, &part2, &part3]);
1479
1480                    for enable_dictionary in [false, true] {
1481                        let base_data = write_with_cdc_options(
1482                            &[&base],
1483                            CDC_MIN_CHUNK_SIZE,
1484                            CDC_MAX_CHUNK_SIZE,
1485                            Some(CDC_ROW_GROUP_LENGTH),
1486                            enable_dictionary,
1487                        );
1488                        let mod_data = write_with_cdc_options(
1489                            &[&modified],
1490                            CDC_MIN_CHUNK_SIZE,
1491                            CDC_MAX_CHUNK_SIZE,
1492                            Some(CDC_ROW_GROUP_LENGTH),
1493                            enable_dictionary,
1494                        );
1495
1496                        let base_info = get_column_info(&base_data, 0);
1497                        let mod_info = get_column_info(&mod_data, 0);
1498                        assert_eq!(base_info.len(), 1);
1499                        assert_eq!(mod_info.len(), 1);
1500
1501                        assert_cdc_chunk_sizes(
1502                            &base.column(0).clone(),
1503                            &base_info[0],
1504                            nullable,
1505                            CDC_MIN_CHUNK_SIZE,
1506                            CDC_MAX_CHUNK_SIZE,
1507                            enable_dictionary,
1508                        );
1509                        assert_cdc_chunk_sizes(
1510                            &modified.column(0).clone(),
1511                            &mod_info[0],
1512                            nullable,
1513                            CDC_MIN_CHUNK_SIZE,
1514                            CDC_MAX_CHUNK_SIZE,
1515                            enable_dictionary,
1516                        );
1517
1518                        assert_page_length_differences(
1519                            &base_info[0],
1520                            &mod_info[0],
1521                            0,
1522                            1,
1523                            0,
1524                            edit_length as i64,
1525                        );
1526                    }
1527                }
1528
1529                #[test]
1530                fn insert_twice() {
1531                    let (dtype, nullable, part_length, edit_length) = config();
1532                    let schema = make_schema(&dtype, nullable);
1533
1534                    let part1 = generate_table(&schema, part_length, 0);
1535                    let part2 = generate_table(&schema, edit_length, 1);
1536                    let part3 = generate_table(&schema, part_length, part_length as u64);
1537                    let part4 = generate_table(&schema, edit_length, 2);
1538                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1539
1540                    let base = concat_batches([&part1, &part3, &part5]);
1541                    let modified = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1542
1543                    for enable_dictionary in [false, true] {
1544                        let base_data = write_with_cdc_options(
1545                            &[&base],
1546                            CDC_MIN_CHUNK_SIZE,
1547                            CDC_MAX_CHUNK_SIZE,
1548                            Some(CDC_ROW_GROUP_LENGTH),
1549                            enable_dictionary,
1550                        );
1551                        let mod_data = write_with_cdc_options(
1552                            &[&modified],
1553                            CDC_MIN_CHUNK_SIZE,
1554                            CDC_MAX_CHUNK_SIZE,
1555                            Some(CDC_ROW_GROUP_LENGTH),
1556                            enable_dictionary,
1557                        );
1558
1559                        let base_info = get_column_info(&base_data, 0);
1560                        let mod_info = get_column_info(&mod_data, 0);
1561                        assert_eq!(base_info.len(), 1);
1562                        assert_eq!(mod_info.len(), 1);
1563
1564                        assert_cdc_chunk_sizes(
1565                            &base.column(0).clone(),
1566                            &base_info[0],
1567                            nullable,
1568                            CDC_MIN_CHUNK_SIZE,
1569                            CDC_MAX_CHUNK_SIZE,
1570                            enable_dictionary,
1571                        );
1572                        assert_cdc_chunk_sizes(
1573                            &modified.column(0).clone(),
1574                            &mod_info[0],
1575                            nullable,
1576                            CDC_MIN_CHUNK_SIZE,
1577                            CDC_MAX_CHUNK_SIZE,
1578                            enable_dictionary,
1579                        );
1580
1581                        assert_page_length_differences(
1582                            &base_info[0],
1583                            &mod_info[0],
1584                            0,
1585                            2,
1586                            0,
1587                            edit_length as i64,
1588                        );
1589                    }
1590                }
1591
1592                #[test]
1593                fn update_once() {
1594                    let (dtype, nullable, part_length, edit_length) = config();
1595                    let schema = make_schema(&dtype, nullable);
1596
1597                    let part1 = generate_table(&schema, part_length, 0);
1598                    let part2 = generate_table(&schema, edit_length, 1);
1599                    let part3 = generate_table(&schema, part_length, part_length as u64);
1600                    let part4 = generate_table(&schema, edit_length, 2);
1601
1602                    let base = concat_batches([&part1, &part2, &part3]);
1603                    let modified = concat_batches([&part1, &part4, &part3]);
1604
1605                    for enable_dictionary in [false, true] {
1606                        let base_data = write_with_cdc_options(
1607                            &[&base],
1608                            CDC_MIN_CHUNK_SIZE,
1609                            CDC_MAX_CHUNK_SIZE,
1610                            Some(CDC_ROW_GROUP_LENGTH),
1611                            enable_dictionary,
1612                        );
1613                        let mod_data = write_with_cdc_options(
1614                            &[&modified],
1615                            CDC_MIN_CHUNK_SIZE,
1616                            CDC_MAX_CHUNK_SIZE,
1617                            Some(CDC_ROW_GROUP_LENGTH),
1618                            enable_dictionary,
1619                        );
1620
1621                        let base_info = get_column_info(&base_data, 0);
1622                        let mod_info = get_column_info(&mod_data, 0);
1623                        assert_eq!(base_info.len(), 1);
1624                        assert_eq!(mod_info.len(), 1);
1625
1626                        assert_cdc_chunk_sizes(
1627                            &base.column(0).clone(),
1628                            &base_info[0],
1629                            nullable,
1630                            CDC_MIN_CHUNK_SIZE,
1631                            CDC_MAX_CHUNK_SIZE,
1632                            enable_dictionary,
1633                        );
1634                        assert_cdc_chunk_sizes(
1635                            &modified.column(0).clone(),
1636                            &mod_info[0],
1637                            nullable,
1638                            CDC_MIN_CHUNK_SIZE,
1639                            CDC_MAX_CHUNK_SIZE,
1640                            enable_dictionary,
1641                        );
1642
1643                        assert_page_length_differences_update(&base_info[0], &mod_info[0], 1);
1644                    }
1645                }
1646
1647                #[test]
1648                fn update_twice() {
1649                    let (dtype, nullable, part_length, edit_length) = config();
1650                    let schema = make_schema(&dtype, nullable);
1651
1652                    let part1 = generate_table(&schema, part_length, 0);
1653                    let part2 = generate_table(&schema, edit_length, 1);
1654                    let part3 = generate_table(&schema, part_length, part_length as u64);
1655                    let part4 = generate_table(&schema, edit_length, 2);
1656                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1657                    let part6 = generate_table(&schema, edit_length, 3);
1658                    let part7 = generate_table(&schema, edit_length, 4);
1659
1660                    let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1661                    let modified = concat_batches([&part1, &part6, &part3, &part7, &part5]);
1662
1663                    for enable_dictionary in [false, true] {
1664                        let base_data = write_with_cdc_options(
1665                            &[&base],
1666                            CDC_MIN_CHUNK_SIZE,
1667                            CDC_MAX_CHUNK_SIZE,
1668                            Some(CDC_ROW_GROUP_LENGTH),
1669                            enable_dictionary,
1670                        );
1671                        let mod_data = write_with_cdc_options(
1672                            &[&modified],
1673                            CDC_MIN_CHUNK_SIZE,
1674                            CDC_MAX_CHUNK_SIZE,
1675                            Some(CDC_ROW_GROUP_LENGTH),
1676                            enable_dictionary,
1677                        );
1678
1679                        let base_info = get_column_info(&base_data, 0);
1680                        let mod_info = get_column_info(&mod_data, 0);
1681                        assert_eq!(base_info.len(), 1);
1682                        assert_eq!(mod_info.len(), 1);
1683
1684                        assert_cdc_chunk_sizes(
1685                            &base.column(0).clone(),
1686                            &base_info[0],
1687                            nullable,
1688                            CDC_MIN_CHUNK_SIZE,
1689                            CDC_MAX_CHUNK_SIZE,
1690                            enable_dictionary,
1691                        );
1692                        assert_cdc_chunk_sizes(
1693                            &modified.column(0).clone(),
1694                            &mod_info[0],
1695                            nullable,
1696                            CDC_MIN_CHUNK_SIZE,
1697                            CDC_MAX_CHUNK_SIZE,
1698                            enable_dictionary,
1699                        );
1700
1701                        assert_page_length_differences_update(&base_info[0], &mod_info[0], 2);
1702                    }
1703                }
1704
1705                #[test]
1706                fn prepend() {
1707                    let (dtype, nullable, part_length, edit_length) = config();
1708                    let schema = make_schema(&dtype, nullable);
1709
1710                    let part1 = generate_table(&schema, part_length, 0);
1711                    let part2 = generate_table(&schema, edit_length, 1);
1712                    let part3 = generate_table(&schema, part_length, part_length as u64);
1713                    let part4 = generate_table(&schema, edit_length, 2);
1714
1715                    let base = concat_batches([&part1, &part2, &part3]);
1716                    let modified = concat_batches([&part4, &part1, &part2, &part3]);
1717
1718                    for enable_dictionary in [false, true] {
1719                        let base_data = write_with_cdc_options(
1720                            &[&base],
1721                            CDC_MIN_CHUNK_SIZE,
1722                            CDC_MAX_CHUNK_SIZE,
1723                            Some(CDC_ROW_GROUP_LENGTH),
1724                            enable_dictionary,
1725                        );
1726                        let mod_data = write_with_cdc_options(
1727                            &[&modified],
1728                            CDC_MIN_CHUNK_SIZE,
1729                            CDC_MAX_CHUNK_SIZE,
1730                            Some(CDC_ROW_GROUP_LENGTH),
1731                            enable_dictionary,
1732                        );
1733
1734                        let base_info = get_column_info(&base_data, 0);
1735                        let mod_info = get_column_info(&mod_data, 0);
1736                        assert_eq!(base_info.len(), 1);
1737                        assert_eq!(mod_info.len(), 1);
1738
1739                        assert_cdc_chunk_sizes(
1740                            &base.column(0).clone(),
1741                            &base_info[0],
1742                            nullable,
1743                            CDC_MIN_CHUNK_SIZE,
1744                            CDC_MAX_CHUNK_SIZE,
1745                            enable_dictionary,
1746                        );
1747                        assert_cdc_chunk_sizes(
1748                            &modified.column(0).clone(),
1749                            &mod_info[0],
1750                            nullable,
1751                            CDC_MIN_CHUNK_SIZE,
1752                            CDC_MAX_CHUNK_SIZE,
1753                            enable_dictionary,
1754                        );
1755
1756                        assert!(
1757                            mod_info[0].page_lengths.len() >= base_info[0].page_lengths.len(),
1758                            "Modified should have same or more pages"
1759                        );
1760
1761                        assert_page_length_differences(
1762                            &base_info[0],
1763                            &mod_info[0],
1764                            0,
1765                            1,
1766                            0,
1767                            edit_length as i64,
1768                        );
1769                    }
1770                }
1771
1772                #[test]
1773                fn append() {
1774                    let (dtype, nullable, part_length, edit_length) = config();
1775                    let schema = make_schema(&dtype, nullable);
1776
1777                    let part1 = generate_table(&schema, part_length, 0);
1778                    let part2 = generate_table(&schema, edit_length, 1);
1779                    let part3 = generate_table(&schema, part_length, part_length as u64);
1780                    let part4 = generate_table(&schema, edit_length, 2);
1781
1782                    let base = concat_batches([&part1, &part2, &part3]);
1783                    let modified = concat_batches([&part1, &part2, &part3, &part4]);
1784
1785                    for enable_dictionary in [false, true] {
1786                        let base_data = write_with_cdc_options(
1787                            &[&base],
1788                            CDC_MIN_CHUNK_SIZE,
1789                            CDC_MAX_CHUNK_SIZE,
1790                            Some(CDC_ROW_GROUP_LENGTH),
1791                            enable_dictionary,
1792                        );
1793                        let mod_data = write_with_cdc_options(
1794                            &[&modified],
1795                            CDC_MIN_CHUNK_SIZE,
1796                            CDC_MAX_CHUNK_SIZE,
1797                            Some(CDC_ROW_GROUP_LENGTH),
1798                            enable_dictionary,
1799                        );
1800
1801                        let base_info = get_column_info(&base_data, 0);
1802                        let mod_info = get_column_info(&mod_data, 0);
1803                        assert_eq!(base_info.len(), 1);
1804                        assert_eq!(mod_info.len(), 1);
1805
1806                        assert_cdc_chunk_sizes(
1807                            &base.column(0).clone(),
1808                            &base_info[0],
1809                            nullable,
1810                            CDC_MIN_CHUNK_SIZE,
1811                            CDC_MAX_CHUNK_SIZE,
1812                            enable_dictionary,
1813                        );
1814                        assert_cdc_chunk_sizes(
1815                            &modified.column(0).clone(),
1816                            &mod_info[0],
1817                            nullable,
1818                            CDC_MIN_CHUNK_SIZE,
1819                            CDC_MAX_CHUNK_SIZE,
1820                            enable_dictionary,
1821                        );
1822
1823                        let bp = &base_info[0].page_lengths;
1824                        let mp = &mod_info[0].page_lengths;
1825                        assert!(mp.len() >= bp.len());
1826                        for i in 0..bp.len() - 1 {
1827                            assert_eq!(bp[i], mp[i], "Page {i} should be identical");
1828                        }
1829                        assert!(mp[bp.len() - 1] >= bp[bp.len() - 1]);
1830                    }
1831                }
1832
1833                #[test]
1834                fn empty_table() {
1835                    let (dtype, nullable, _, _) = config();
1836                    let schema = make_schema(&dtype, nullable);
1837
1838                    let empty = RecordBatch::new_empty(schema);
1839                    for enable_dictionary in [false, true] {
1840                        let data = write_with_cdc_options(
1841                            &[&empty],
1842                            CDC_MIN_CHUNK_SIZE,
1843                            CDC_MAX_CHUNK_SIZE,
1844                            Some(CDC_ROW_GROUP_LENGTH),
1845                            enable_dictionary,
1846                        );
1847                        let info = get_column_info(&data, 0);
1848                        // Empty table: either no row groups or one with no data pages
1849                        if !info.is_empty() {
1850                            assert!(info[0].page_lengths.is_empty());
1851                        }
1852                    }
1853                }
1854
1855                #[test]
1856                fn array_offsets() {
1857                    let (dtype, nullable, part_length, edit_length) = config();
1858                    let schema = make_schema(&dtype, nullable);
1859
1860                    let table = concat_batches([
1861                        &generate_table(&schema, part_length, 0),
1862                        &generate_table(&schema, edit_length, 1),
1863                        &generate_table(&schema, part_length, part_length as u64),
1864                    ]);
1865
1866                    for offset in [0usize, 512, 1024] {
1867                        if offset >= table.num_rows() {
1868                            continue;
1869                        }
1870                        let sliced = table.slice(offset, table.num_rows() - offset);
1871                        let data = write_with_cdc_options(
1872                            &[&sliced],
1873                            CDC_MIN_CHUNK_SIZE,
1874                            CDC_MAX_CHUNK_SIZE,
1875                            Some(CDC_ROW_GROUP_LENGTH),
1876                            true,
1877                        );
1878                        let info = get_column_info(&data, 0);
1879                        assert_eq!(info.len(), 1);
1880
1881                        // Verify CDC actually produced content-defined chunks
1882                        assert_cdc_chunk_sizes(
1883                            &sliced.column(0).clone(),
1884                            &info[0],
1885                            nullable,
1886                            CDC_MIN_CHUNK_SIZE,
1887                            CDC_MAX_CHUNK_SIZE,
1888                            true,
1889                        );
1890                    }
1891                }
1892            }
1893        };
1894    }
1895
1896    // Instantiate for representative types matching C++ categories
1897    cdc_single_rg_tests!(cdc_bool_non_null, DataType::Boolean, false);
1898    cdc_single_rg_tests!(cdc_i32_non_null, DataType::Int32, false);
1899    cdc_single_rg_tests!(cdc_i64_nullable, DataType::Int64, true);
1900    cdc_single_rg_tests!(cdc_f64_nullable, DataType::Float64, true);
1901    cdc_single_rg_tests!(cdc_utf8_non_null, DataType::Utf8, false);
1902    cdc_single_rg_tests!(cdc_binary_nullable, DataType::Binary, true);
1903    cdc_single_rg_tests!(cdc_fsb16_nullable, DataType::FixedSizeBinary(16), true);
1904    cdc_single_rg_tests!(cdc_date32_non_null, DataType::Date32, false);
1905    cdc_single_rg_tests!(
1906        cdc_timestamp_nullable,
1907        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1908        true
1909    );
1910
1911    // --- Multiple row group tests matching C++ TestCDCMultipleRowGroups ---
1912
1913    mod cdc_multiple_row_groups {
1914        use super::*;
1915
1916        const PART_LENGTH: usize = 128 * 1024;
1917        const EDIT_LENGTH: usize = 128;
1918        const ROW_GROUP_LENGTH: usize = 64 * 1024;
1919
1920        fn schema() -> Arc<Schema> {
1921            Arc::new(Schema::new(vec![
1922                Field::new("int32", DataType::Int32, true),
1923                Field::new("float64", DataType::Float64, true),
1924                Field::new("bool", DataType::Boolean, false),
1925            ]))
1926        }
1927
1928        #[test]
1929        fn insert_once() {
1930            let s = schema();
1931            let part1 = generate_table(&s, PART_LENGTH, 0);
1932            let part2 = generate_table(&s, PART_LENGTH, 2);
1933            let part3 = generate_table(&s, PART_LENGTH, 4);
1934            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1935            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1936
1937            let base = concat_batches([&part1, &edit1, &part2, &part3]);
1938            let modified = concat_batches([&part1, &edit1, &edit2, &part2, &part3]);
1939            assert_eq!(modified.num_rows(), base.num_rows() + EDIT_LENGTH);
1940
1941            let base_data = write_with_cdc_options(
1942                &[&base],
1943                CDC_MIN_CHUNK_SIZE,
1944                CDC_MAX_CHUNK_SIZE,
1945                Some(ROW_GROUP_LENGTH),
1946                false,
1947            );
1948            let mod_data = write_with_cdc_options(
1949                &[&modified],
1950                CDC_MIN_CHUNK_SIZE,
1951                CDC_MAX_CHUNK_SIZE,
1952                Some(ROW_GROUP_LENGTH),
1953                false,
1954            );
1955
1956            for col in 0..s.fields().len() {
1957                let base_info = get_column_info(&base_data, col);
1958                let mod_info = get_column_info(&mod_data, col);
1959
1960                assert_eq!(base_info.len(), 7, "expected 7 row groups for col {col}");
1961                assert_eq!(mod_info.len(), 7);
1962
1963                // First two row groups should be identical
1964                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
1965                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
1966
1967                // Middle row groups: 1 larger + 1 smaller diff
1968                for i in 2..mod_info.len() - 1 {
1969                    assert_page_length_differences(
1970                        &base_info[i],
1971                        &mod_info[i],
1972                        0,
1973                        1,
1974                        1,
1975                        EDIT_LENGTH as i64,
1976                    );
1977                }
1978                // Last row group: just larger
1979                assert_page_length_differences(
1980                    base_info.last().unwrap(),
1981                    mod_info.last().unwrap(),
1982                    0,
1983                    1,
1984                    0,
1985                    EDIT_LENGTH as i64,
1986                );
1987            }
1988        }
1989
1990        #[test]
1991        fn delete_once() {
1992            let s = schema();
1993            let part1 = generate_table(&s, PART_LENGTH, 0);
1994            let part2 = generate_table(&s, PART_LENGTH, 2);
1995            let part3 = generate_table(&s, PART_LENGTH, 4);
1996            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1997            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1998
1999            let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2000            let modified = concat_batches([&part1, &part2, &part3, &edit2]);
2001
2002            let base_data = write_with_cdc_options(
2003                &[&base],
2004                CDC_MIN_CHUNK_SIZE,
2005                CDC_MAX_CHUNK_SIZE,
2006                Some(ROW_GROUP_LENGTH),
2007                false,
2008            );
2009            let mod_data = write_with_cdc_options(
2010                &[&modified],
2011                CDC_MIN_CHUNK_SIZE,
2012                CDC_MAX_CHUNK_SIZE,
2013                Some(ROW_GROUP_LENGTH),
2014                false,
2015            );
2016
2017            for col in 0..s.fields().len() {
2018                let base_info = get_column_info(&base_data, col);
2019                let mod_info = get_column_info(&mod_data, col);
2020
2021                assert_eq!(base_info.len(), 7);
2022                assert_eq!(mod_info.len(), 7);
2023
2024                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2025                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2026
2027                for i in 2..mod_info.len() - 1 {
2028                    assert_page_length_differences(
2029                        &base_info[i],
2030                        &mod_info[i],
2031                        0,
2032                        1,
2033                        1,
2034                        EDIT_LENGTH as i64,
2035                    );
2036                }
2037                assert_page_length_differences(
2038                    base_info.last().unwrap(),
2039                    mod_info.last().unwrap(),
2040                    0,
2041                    0,
2042                    1,
2043                    EDIT_LENGTH as i64,
2044                );
2045            }
2046        }
2047
2048        #[test]
2049        fn update_once() {
2050            let s = schema();
2051            let part1 = generate_table(&s, PART_LENGTH, 0);
2052            let part2 = generate_table(&s, PART_LENGTH, 2);
2053            let part3 = generate_table(&s, PART_LENGTH, 4);
2054            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2055            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2056            let edit3 = generate_table(&s, EDIT_LENGTH, 5);
2057
2058            let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2059            let modified = concat_batches([&part1, &edit3, &part2, &part3, &edit2]);
2060
2061            let base_data = write_with_cdc_options(
2062                &[&base],
2063                CDC_MIN_CHUNK_SIZE,
2064                CDC_MAX_CHUNK_SIZE,
2065                Some(ROW_GROUP_LENGTH),
2066                false,
2067            );
2068            let mod_data = write_with_cdc_options(
2069                &[&modified],
2070                CDC_MIN_CHUNK_SIZE,
2071                CDC_MAX_CHUNK_SIZE,
2072                Some(ROW_GROUP_LENGTH),
2073                false,
2074            );
2075
2076            for col in 0..s.fields().len() {
2077                let nullable = s.field(col).is_nullable();
2078                let base_info = get_column_info(&base_data, col);
2079                let mod_info = get_column_info(&mod_data, col);
2080
2081                assert_eq!(base_info.len(), 7);
2082                assert_eq!(mod_info.len(), 7);
2083
2084                // Validate CDC chunk sizes on at least the first row group
2085                assert_cdc_chunk_sizes(
2086                    &base.column(col).slice(0, ROW_GROUP_LENGTH),
2087                    &base_info[0],
2088                    nullable,
2089                    CDC_MIN_CHUNK_SIZE,
2090                    CDC_MAX_CHUNK_SIZE,
2091                    false,
2092                );
2093
2094                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2095                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2096
2097                // Row group containing the edit
2098                assert_page_length_differences_update(&base_info[2], &mod_info[2], 1);
2099
2100                // Remaining row groups should be identical
2101                for i in 3..mod_info.len() {
2102                    assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2103                }
2104            }
2105        }
2106
2107        #[test]
2108        fn append() {
2109            let s = schema();
2110            let part1 = generate_table(&s, PART_LENGTH, 0);
2111            let part2 = generate_table(&s, PART_LENGTH, 2);
2112            let part3 = generate_table(&s, PART_LENGTH, 4);
2113            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2114            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2115
2116            let base = concat_batches([&part1, &edit1, &part2, &part3]);
2117            let modified = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2118
2119            let base_data = write_with_cdc_options(
2120                &[&base],
2121                CDC_MIN_CHUNK_SIZE,
2122                CDC_MAX_CHUNK_SIZE,
2123                Some(ROW_GROUP_LENGTH),
2124                false,
2125            );
2126            let mod_data = write_with_cdc_options(
2127                &[&modified],
2128                CDC_MIN_CHUNK_SIZE,
2129                CDC_MAX_CHUNK_SIZE,
2130                Some(ROW_GROUP_LENGTH),
2131                false,
2132            );
2133
2134            for col in 0..s.fields().len() {
2135                let nullable = s.field(col).is_nullable();
2136                let base_info = get_column_info(&base_data, col);
2137                let mod_info = get_column_info(&mod_data, col);
2138
2139                assert_eq!(base_info.len(), 7);
2140                assert_eq!(mod_info.len(), 7);
2141
2142                // Validate CDC chunk sizes on the first row group
2143                assert_cdc_chunk_sizes(
2144                    &base.column(col).slice(0, ROW_GROUP_LENGTH),
2145                    &base_info[0],
2146                    nullable,
2147                    CDC_MIN_CHUNK_SIZE,
2148                    CDC_MAX_CHUNK_SIZE,
2149                    false,
2150                );
2151
2152                // All row groups except last should be identical
2153                for i in 0..base_info.len() - 1 {
2154                    assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2155                }
2156
2157                // Last row group: pages should be identical except last
2158                let bp = &base_info.last().unwrap().page_lengths;
2159                let mp = &mod_info.last().unwrap().page_lengths;
2160                assert!(mp.len() >= bp.len());
2161                for i in 0..bp.len() - 1 {
2162                    assert_eq!(bp[i], mp[i]);
2163                }
2164            }
2165        }
2166    }
2167
2168    // --- Direct chunker test (kept from original) ---
2169
2170    #[test]
2171    fn test_cdc_array_offsets_direct() {
2172        use crate::basic::Type as PhysicalType;
2173        use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
2174
2175        let options = CdcOptions {
2176            min_chunk_size: CDC_MIN_CHUNK_SIZE,
2177            max_chunk_size: CDC_MAX_CHUNK_SIZE,
2178            norm_level: 0,
2179        };
2180        let desc = {
2181            let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
2182                .build()
2183                .unwrap();
2184            ColumnDescriptor::new(Arc::new(tp), 0, 0, ColumnPath::new(vec![]))
2185        };
2186
2187        let bpr = bytes_per_record(&DataType::Int32, false);
2188        let n = CDC_PART_SIZE / bpr;
2189        let offset = 10usize;
2190
2191        let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as i32).collect();
2192        let mut chunker = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2193        let chunks = chunker.get_arrow_chunks(None, None, &array).unwrap();
2194
2195        let sliced = array.slice(offset, n - offset);
2196        let mut chunker2 = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2197        let chunks2 = chunker2.get_arrow_chunks(None, None, &sliced).unwrap();
2198
2199        let values: Vec<usize> = chunks.iter().map(|c| c.num_values).collect();
2200        let values2: Vec<usize> = chunks2.iter().map(|c| c.num_values).collect();
2201
2202        assert!(values.len() > 1, "expected multiple chunks, got {values:?}");
2203        assert_eq!(values.len(), values2.len(), "chunk count must match");
2204
2205        assert_eq!(
2206            values[0] - values2[0],
2207            offset,
2208            "offsetted first chunk should be {offset} values shorter"
2209        );
2210        assert_eq!(
2211            &values[1..],
2212            &values2[1..],
2213            "all chunks after the first must be identical"
2214        );
2215    }
2216
2217    /// Regression test for <https://github.com/apache/arrow-rs/issues/9637>
2218    ///
2219    /// Writing nested list data with CDC enabled panicked with an out-of-bounds
2220    /// slice access when null list entries had non-zero child ranges.
2221    #[test]
2222    fn test_cdc_list_roundtrip() {
2223        let schema = Arc::new(Schema::new(vec![
2224            Field::new(
2225                "_1",
2226                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2227                true,
2228            ),
2229            Field::new(
2230                "_2",
2231                DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2232                true,
2233            ),
2234            Field::new(
2235                "_3",
2236                DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, true))),
2237                true,
2238            ),
2239        ]));
2240        let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2241        write_with_cdc_options(
2242            &[&batch],
2243            CDC_MIN_CHUNK_SIZE,
2244            CDC_MAX_CHUNK_SIZE,
2245            None,
2246            true,
2247        );
2248    }
2249
2250    /// Test CDC with deeply nested types: List<List<Int32>>, List<Struct<List<Int32>>>
2251    #[test]
2252    fn test_cdc_deeply_nested_roundtrip() {
2253        let inner_field = Field::new_list_field(DataType::Int32, true);
2254        let inner_type = DataType::List(Arc::new(inner_field));
2255        let outer_field = Field::new_list_field(inner_type.clone(), true);
2256        let list_list_type = DataType::List(Arc::new(outer_field));
2257
2258        let struct_inner_field = Field::new_list_field(DataType::Int32, true);
2259        let struct_inner_type = DataType::List(Arc::new(struct_inner_field));
2260        let struct_fields = Fields::from(vec![Field::new("a", struct_inner_type, true)]);
2261        let struct_type = DataType::Struct(struct_fields);
2262        let struct_list_field = Field::new_list_field(struct_type, true);
2263        let list_struct_type = DataType::List(Arc::new(struct_list_field));
2264
2265        let schema = Arc::new(Schema::new(vec![
2266            Field::new("list_list", list_list_type, true),
2267            Field::new("list_struct_list", list_struct_type, true),
2268        ]));
2269        let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2270        write_with_cdc_options(
2271            &[&batch],
2272            CDC_MIN_CHUNK_SIZE,
2273            CDC_MAX_CHUNK_SIZE,
2274            None,
2275            true,
2276        );
2277    }
2278
2279    /// Test CDC with list arrays that have non-empty null segments.
2280    ///
2281    /// Per the Arrow columnar format spec: "a null value may correspond to a
2282    /// non-empty segment in the child array". This test constructs such arrays
2283    /// manually and verifies the CDC writer handles them correctly.
2284    #[test]
2285    fn test_cdc_list_non_empty_null_segments() {
2286        // Build List<Int32> where null entries own non-zero child ranges:
2287        //   row 0: [1, 2]     offsets[0..2]  valid
2288        //   row 1: null        offsets[2..5]  null, but owns 3 child values
2289        //   row 2: [6, 7]     offsets[5..7]  valid
2290        //   row 3: null        offsets[7..9]  null, but owns 2 child values
2291        //   row 4: [10]        offsets[9..10] valid
2292        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2293        let offsets = Buffer::from_iter([0_i32, 2, 5, 7, 9, 10]);
2294        let null_bitmap = Buffer::from([0b00010101]); // rows 0, 2, 4 valid
2295
2296        let list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
2297        let list_data = unsafe {
2298            ArrayData::new_unchecked(
2299                list_type.clone(),
2300                5,
2301                None,
2302                Some(null_bitmap),
2303                0,
2304                vec![offsets],
2305                vec![values.to_data()],
2306            )
2307        };
2308        let list_array = arrow_array::make_array(list_data);
2309
2310        let schema = Arc::new(Schema::new(vec![Field::new("col", list_type, true)]));
2311        let batch = RecordBatch::try_new(schema, vec![list_array]).unwrap();
2312
2313        let buf = write_with_cdc_options(
2314            &[&batch],
2315            CDC_MIN_CHUNK_SIZE,
2316            CDC_MAX_CHUNK_SIZE,
2317            None,
2318            true,
2319        );
2320        let read = concat_batches(read_batches(&buf));
2321        let read_list = read.column(0).as_list::<i32>();
2322        assert_eq!(read_list.len(), 5);
2323        assert!(read_list.is_valid(0));
2324        assert!(read_list.is_null(1));
2325        assert!(read_list.is_valid(2));
2326        assert!(read_list.is_null(3));
2327        assert!(read_list.is_valid(4));
2328
2329        let get_vals = |i: usize| -> Vec<i32> {
2330            read_list
2331                .value(i)
2332                .as_primitive::<arrow_array::types::Int32Type>()
2333                .values()
2334                .iter()
2335                .copied()
2336                .collect()
2337        };
2338        assert_eq!(get_vals(0), vec![1, 2]);
2339        assert_eq!(get_vals(2), vec![6, 7]);
2340        assert_eq!(get_vals(4), vec![10]);
2341    }
2342}