Skip to main content

parquet/column/chunker/
cdc.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "arrow")]
19use crate::column::writer::LevelDataRef;
20use crate::errors::{ParquetError, Result};
21use crate::file::properties::CdcOptions;
22use crate::schema::types::ColumnDescriptor;
23
24use super::CdcChunk;
25use super::cdc_generated::{GEARHASH_TABLE, NUM_GEARHASH_TABLES};
26
27/// CDC (Content-Defined Chunking) divides data into variable-sized chunks based on
28/// content rather than fixed-size boundaries.
29///
30/// For example, given this sequence of values in a column:
31///
32/// ```text
33/// File1:    [1,2,3,   4,5,6,   7,8,9]
34///            chunk1   chunk2   chunk3
35/// ```
36///
37/// If a value is inserted between 3 and 4:
38///
39/// ```text
40/// File2:    [1,2,3,0,   4,5,6,   7,8,9]
41///            new-chunk  chunk2   chunk3
42/// ```
43///
44/// The chunking process adjusts to maintain stable boundaries across data modifications.
45/// Each chunk defines a new parquet data page which is contiguously written to the file.
46/// Since each page is compressed independently, the files' contents look like:
47///
48/// ```text
49/// File1:    [Page1][Page2][Page3]...
50/// File2:    [Page4][Page2][Page3]...
51/// ```
52///
53/// When uploaded to a content-addressable storage (CAS) system, the CAS splits the byte
54/// stream into content-defined blobs with unique identifiers. Identical blobs are stored
55/// only once, so Page2 and Page3 are deduplicated across File1 and File2.
56///
57/// ## Implementation
58///
59/// Only the parquet writer needs to be aware of content-defined chunking; the reader is
60/// unaffected. Each parquet column writer holds a `ContentDefinedChunker` instance
61/// depending on the writer's properties. The chunker's state is maintained across the
62/// entire column without being reset between pages and row groups.
63///
64/// This implements a [FastCDC]-inspired algorithm using gear hashing. The input data is
65/// fed byte-by-byte into a rolling hash; when the hash matches a predefined mask, a new
66/// chunk boundary candidate is recorded. To reduce the exponential variance of chunk
67/// sizes inherent in a single gear hash, the algorithm requires **8 consecutive mask
68/// matches** — each against a different pre-computed gear hash table — before committing
69/// to a boundary. This [central-limit-theorem normalization] makes the chunk size
70/// distribution approximately normal between `min_chunk_size` and `max_chunk_size`.
71///
72/// The chunker receives the record-shredded column data (def_levels, rep_levels, values)
73/// and iterates over the (def_level, rep_level, value) triplets while adjusting the
74/// column-global rolling hash. Whenever the rolling hash matches, the chunker creates a
75/// new chunk. For nested data (lists, maps, structs) chunk boundaries are restricted to
76/// top-level record boundaries (`rep_level == 0`) so that a nested row is never split
77/// across chunks.
78///
79/// Note that boundaries are deterministically calculated exclusively based on the data
80/// itself, so the same data always produces the same chunks given the same configuration.
81///
82/// Ported from the C++ implementation in apache/arrow#45360
83/// (`cpp/src/parquet/chunker_internal.cc`).
84///
85/// [FastCDC]: https://www.usenix.org/conference/atc16/technical-sessions/presentation/xia
86/// [central-limit-theorem normalization]: https://www.cidrdb.org/cidr2023/papers/p43-low.pdf
87#[derive(Debug)]
88pub(crate) struct ContentDefinedChunker {
89    /// Maximum definition level for this column.
90    max_def_level: i16,
91    /// Maximum repetition level for this column.
92    max_rep_level: i16,
93    /// Definition level at the nearest REPEATED ancestor.
94    repeated_ancestor_def_level: i16,
95
96    /// Minimum chunk size in bytes.
97    /// The rolling hash will not be updated until this size is reached for each chunk.
98    /// All data sent through the hash function counts towards the chunk size, including
99    /// definition and repetition levels if present.
100    min_chunk_size: i64,
101    /// Maximum chunk size in bytes.
102    /// A new chunk is created whenever the chunk size exceeds this value. The chunk size
103    /// distribution approximates a normal distribution between `min_chunk_size` and
104    /// `max_chunk_size`. Note that the parquet writer has a related `data_pagesize`
105    /// property that controls the maximum size of a parquet data page after encoding.
106    /// While setting `data_pagesize` smaller than `max_chunk_size` doesn't affect
107    /// chunking effectiveness, it results in more small parquet data pages.
108    max_chunk_size: i64,
109    /// Mask for matching against the rolling hash.
110    rolling_hash_mask: u64,
111
112    /// Rolling hash state, never reset — initialized once for the entire column.
113    rolling_hash: u64,
114    /// Whether the rolling hash has matched the mask since the last chunk boundary.
115    has_matched: bool,
116    /// Current run count for the central-limit-theorem normalization.
117    nth_run: usize,
118    /// Current chunk size in bytes.
119    chunk_size: i64,
120}
121
122impl ContentDefinedChunker {
123    pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self> {
124        let rolling_hash_mask = Self::calculate_mask(
125            options.min_chunk_size as i64,
126            options.max_chunk_size as i64,
127            options.norm_level,
128        )?;
129        Ok(Self {
130            max_def_level: desc.max_def_level(),
131            max_rep_level: desc.max_rep_level(),
132            repeated_ancestor_def_level: desc.repeated_ancestor_def_level(),
133            min_chunk_size: options.min_chunk_size as i64,
134            max_chunk_size: options.max_chunk_size as i64,
135            rolling_hash_mask,
136            rolling_hash: 0,
137            has_matched: false,
138            nth_run: 0,
139            chunk_size: 0,
140        })
141    }
142
143    /// Calculate the mask used to determine chunk boundaries from the rolling hash.
144    ///
145    /// The mask is calculated so that the expected chunk size distribution approximates
146    /// a normal distribution between min and max chunk sizes.
147    fn calculate_mask(min_chunk_size: i64, max_chunk_size: i64, norm_level: i32) -> Result<u64> {
148        if min_chunk_size < 0 {
149            return Err(ParquetError::General(
150                "min_chunk_size must be non-negative".to_string(),
151            ));
152        }
153        if max_chunk_size <= min_chunk_size {
154            return Err(ParquetError::General(
155                "max_chunk_size must be greater than min_chunk_size".to_string(),
156            ));
157        }
158
159        let avg_chunk_size = (min_chunk_size + max_chunk_size) / 2;
160        // Target size after subtracting the min-size skip window and dividing by the
161        // number of hash tables (for central-limit-theorem normalization).
162        let target_size = (avg_chunk_size - min_chunk_size) / NUM_GEARHASH_TABLES as i64;
163
164        // floor(log2(target_size)) — equivalent to C++ NumRequiredBits(target_size) - 1
165        let mask_bits = if target_size > 0 {
166            63 - target_size.leading_zeros() as i32
167        } else {
168            0
169        };
170
171        let effective_bits = mask_bits - norm_level;
172
173        if !(1..=63).contains(&effective_bits) {
174            return Err(ParquetError::General(format!(
175                "The number of bits in the CDC mask must be between 1 and 63, got {effective_bits}"
176            )));
177        }
178
179        // Create the mask by setting the top `effective_bits` bits.
180        Ok(u64::MAX << (64 - effective_bits))
181    }
182
183    /// Feed raw bytes into the rolling hash.
184    ///
185    /// The byte count always accumulates toward `chunk_size`, but the actual hash
186    /// update is skipped until `min_chunk_size` has been reached. This "skip window"
187    /// is the FastCDC optimization that prevents boundaries from appearing too early
188    /// in a chunk.
189    #[inline]
190    fn roll(&mut self, bytes: &[u8]) {
191        self.chunk_size += bytes.len() as i64;
192        if self.chunk_size < self.min_chunk_size {
193            return;
194        }
195        for &b in bytes {
196            self.rolling_hash = self
197                .rolling_hash
198                .wrapping_shl(1)
199                .wrapping_add(GEARHASH_TABLE[self.nth_run][b as usize]);
200            self.has_matched =
201                self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
202        }
203    }
204
205    /// Feed exactly `N` bytes into the rolling hash (compile-time width).
206    ///
207    /// Like [`roll`](Self::roll), but the byte count is known at compile time,
208    /// allowing the compiler to unroll the inner loop.
209    #[inline(always)]
210    fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N]) {
211        self.chunk_size += N as i64;
212        if self.chunk_size < self.min_chunk_size {
213            return;
214        }
215        for j in 0..N {
216            self.rolling_hash = self
217                .rolling_hash
218                .wrapping_shl(1)
219                .wrapping_add(GEARHASH_TABLE[self.nth_run][bytes[j] as usize]);
220            self.has_matched =
221                self.has_matched || ((self.rolling_hash & self.rolling_hash_mask) == 0);
222        }
223    }
224
225    /// Feed a definition or repetition level (i16) into the rolling hash.
226    #[inline]
227    fn roll_level(&mut self, level: i16) {
228        self.roll_fixed(&level.to_le_bytes());
229    }
230
231    /// Check whether a new chunk boundary should be created.
232    ///
233    /// A boundary is created when **either** of two conditions holds:
234    ///
235    /// 1. **CLT normalization**: The rolling hash has matched the mask (`has_matched`)
236    ///    *and* this is the 8th consecutive such match (`nth_run` reaches
237    ///    `NUM_GEARHASH_TABLES`). Each match advances to the next gear hash table, so
238    ///    8 independent matches are required. A single hash table would yield
239    ///    exponentially distributed chunk sizes; requiring 8 independent matches
240    ///    approximates a normal (Gaussian) distribution by the central limit theorem.
241    ///
242    /// 2. **Hard size limit**: `chunk_size` has reached `max_chunk_size`. This caps
243    ///    chunk size even if the CLT normalization sequence has not completed.
244    ///
245    /// Note: when `max_chunk_size` forces a boundary, `nth_run` is **not** reset, so
246    /// the CLT sequence continues from where it left off in the next chunk. This
247    /// matches the C++ behavior.
248    #[inline]
249    fn need_new_chunk(&mut self) -> bool {
250        if self.has_matched {
251            self.has_matched = false;
252            self.nth_run += 1;
253            if self.nth_run >= NUM_GEARHASH_TABLES {
254                self.nth_run = 0;
255                self.chunk_size = 0;
256                return true;
257            }
258        }
259        if self.chunk_size >= self.max_chunk_size {
260            self.chunk_size = 0;
261            return true;
262        }
263        false
264    }
265
266    /// Compute chunk boundaries for the given column data.
267    ///
268    /// The chunking state is maintained across the entire column without being
269    /// reset between pages and row groups. This enables the chunking process to
270    /// be continued between different write calls.
271    ///
272    /// We go over the (def_level, rep_level, value) triplets one by one while
273    /// adjusting the column-global rolling hash based on the triplet. Whenever
274    /// the rolling hash matches a predefined mask it sets `has_matched` to true.
275    ///
276    /// After each triplet [`need_new_chunk`](Self::need_new_chunk) is called to
277    /// evaluate if we need to create a new chunk.
278    fn calculate<F>(
279        &mut self,
280        def_levels: LevelDataRef<'_>,
281        rep_levels: LevelDataRef<'_>,
282        num_levels: usize,
283        mut roll_value: F,
284    ) -> Vec<CdcChunk>
285    where
286        F: FnMut(&mut Self, usize),
287    {
288        let has_def_levels = self.max_def_level > 0;
289        let has_rep_levels = self.max_rep_level > 0;
290
291        let mut chunks = Vec::new();
292        let mut prev_offset: usize = 0;
293        let mut prev_value_offset: usize = 0;
294        let mut value_offset: usize = 0;
295
296        if !has_rep_levels && !has_def_levels {
297            // Fastest path: non-nested, non-null data.
298            // Every level corresponds to exactly one non-null value, so
299            // value_offset == level_offset and num_values == num_levels.
300            //
301            // Example: required Int32, array = [10, 20, 30]
302            //   level:         0   1   2
303            //   value_offset:  0   1   2
304            for offset in 0..num_levels {
305                roll_value(self, offset);
306                if self.need_new_chunk() {
307                    chunks.push(CdcChunk {
308                        level_offset: prev_offset,
309                        num_levels: offset - prev_offset,
310                        value_offset: prev_offset,
311                        num_values: offset - prev_offset,
312                    });
313                    prev_offset = offset;
314                }
315            }
316            prev_value_offset = prev_offset;
317            value_offset = num_levels;
318        } else if !has_rep_levels {
319            // Non-nested data with nulls. value_offset only increments for
320            // non-null values (def == max_def), so it diverges from the
321            // level offset when nulls are present.
322            //
323            // Example: optional Int32, array = [1, null, 2, null, 3]
324            //   def_levels:    [1, 0, 1, 0, 1]
325            //   level:          0  1  2  3  4
326            //   value_offset:   0     1     2  (only increments on def==1)
327            #[allow(clippy::needless_range_loop)]
328            for offset in 0..num_levels {
329                let def_level = def_levels
330                    .value_at(offset)
331                    .expect("def_levels required when max_def_level > 0");
332                self.roll_level(def_level);
333                if def_level == self.max_def_level {
334                    // For non-nested data, the leaf array has one slot per
335                    // level (nulls are array elements), so `offset` (the
336                    // level index) is the correct array index for hashing.
337                    roll_value(self, offset);
338                }
339                // Check boundary before incrementing value_offset so that
340                // num_values reflects only entries in the completed chunk.
341                if self.need_new_chunk() {
342                    chunks.push(CdcChunk {
343                        level_offset: prev_offset,
344                        num_levels: offset - prev_offset,
345                        value_offset: prev_value_offset,
346                        num_values: value_offset - prev_value_offset,
347                    });
348                    prev_offset = offset;
349                    prev_value_offset = value_offset;
350                }
351                if def_level == self.max_def_level {
352                    value_offset += 1;
353                }
354            }
355        } else {
356            // Nested data with nulls. Two counters are needed:
357            //
358            //   leaf_offset: index into the leaf values array for hashing,
359            //     incremented for all leaf slots (def >= repeated_ancestor_def_level),
360            //     including null elements.
361            //
362            //   value_offset: index into non_null_indices for chunk boundaries,
363            //     incremented only for non-null leaf values (def == max_def_level).
364            //
365            // These diverge when nullable elements exist inside lists.
366            //
367            // Example: List<Int32?> with repeated_ancestor_def_level=2, max_def=3
368            //   row 0: [1, null, 2]   (3 leaf slots, 2 non-null)
369            //   row 1: [3]            (1 leaf slot, 1 non-null)
370            //
371            //   leaf array:    [1, null, 2, 3]
372            //   def_levels:    [3,  2,   3, 3]
373            //   rep_levels:    [0,  1,   1, 0]
374            //
375            //   level  def  leaf_offset  value_offset  action
376            //   ─────  ───  ───────────  ────────────  ──────────────────────────
377            //     0     3       0             0        roll_value(0), value++, leaf++
378            //     1     2       1             1        leaf++ only (null element)
379            //     2     3       2             1        roll_value(2), value++, leaf++
380            //     3     3       3             2        roll_value(3), value++, leaf++
381            //
382            // roll_value(2) correctly indexes leaf array position 2 (value "2").
383            // Using value_offset=1 would index position 1 (the null slot).
384            //
385            // Using value_offset for roll_value would hash the wrong array slot.
386            let mut leaf_offset: usize = 0;
387
388            for offset in 0..num_levels {
389                let def_level = def_levels
390                    .value_at(offset)
391                    .expect("def_levels required for nested data");
392                let rep_level = rep_levels
393                    .value_at(offset)
394                    .expect("rep_levels required for nested data");
395
396                self.roll_level(def_level);
397                self.roll_level(rep_level);
398                if def_level == self.max_def_level {
399                    roll_value(self, leaf_offset);
400                }
401
402                // Check boundary before incrementing value_offset so that
403                // num_values reflects only entries in the completed chunk.
404                if rep_level == 0 && self.need_new_chunk() {
405                    let levels_to_write = offset - prev_offset;
406                    if levels_to_write > 0 {
407                        chunks.push(CdcChunk {
408                            level_offset: prev_offset,
409                            num_levels: levels_to_write,
410                            value_offset: prev_value_offset,
411                            num_values: value_offset - prev_value_offset,
412                        });
413                        prev_offset = offset;
414                        prev_value_offset = value_offset;
415                    }
416                }
417                if def_level == self.max_def_level {
418                    value_offset += 1;
419                }
420                if def_level >= self.repeated_ancestor_def_level {
421                    leaf_offset += 1;
422                }
423            }
424        }
425
426        // Add the last chunk if we have any levels left.
427        if prev_offset < num_levels {
428            chunks.push(CdcChunk {
429                level_offset: prev_offset,
430                num_levels: num_levels - prev_offset,
431                value_offset: prev_value_offset,
432                num_values: value_offset - prev_value_offset,
433            });
434        }
435
436        #[cfg(debug_assertions)]
437        self.validate_chunks(&chunks, num_levels, value_offset);
438
439        chunks
440    }
441
442    /// Compute CDC chunk boundaries by dispatching on the Arrow array's data type
443    /// to feed value bytes into the rolling hash.
444    #[cfg(feature = "arrow")]
445    pub(crate) fn get_arrow_chunks(
446        &mut self,
447        def_levels: LevelDataRef<'_>,
448        rep_levels: LevelDataRef<'_>,
449        array: &dyn arrow_array::Array,
450    ) -> Result<Vec<CdcChunk>> {
451        use arrow_array::cast::AsArray;
452        use arrow_schema::DataType;
453
454        // For nested (list) data, null list entries can own non-zero child
455        // ranges in the leaf array, so `array.len()` may exceed the number of
456        // levels.  Always drive the loop by the level count; fall back to the
457        // array length only when there are no levels at all.
458        let num_levels = match (def_levels.len(), rep_levels.len()) {
459            (0, 0) => array.len(),
460            (d, r) => d.max(r),
461        };
462
463        macro_rules! fixed_width {
464            ($N:literal) => {{
465                let data = array.to_data();
466                let buffer = data.buffers()[0].as_slice();
467                let values = &buffer[data.offset() * $N..];
468                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
469                    let offset = i * $N;
470                    let slice = &values[offset..offset + $N];
471                    c.roll_fixed::<$N>(slice.try_into().unwrap());
472                })
473            }};
474        }
475
476        macro_rules! binary_like {
477            ($a:expr) => {{
478                let a = $a;
479                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
480                    c.roll(a.value(i).as_ref());
481                })
482            }};
483        }
484
485        let dtype = array.data_type();
486        let chunks = match dtype {
487            DataType::Null => self.calculate(def_levels, rep_levels, num_levels, |_, _| {}),
488            DataType::Boolean => {
489                let a = array.as_boolean();
490                self.calculate(def_levels, rep_levels, num_levels, |c, i| {
491                    c.roll_fixed(&[a.value(i) as u8]);
492                })
493            }
494            DataType::Int8 | DataType::UInt8 => fixed_width!(1),
495            DataType::Int16 | DataType::UInt16 | DataType::Float16 => fixed_width!(2),
496            DataType::Int32
497            | DataType::UInt32
498            | DataType::Float32
499            | DataType::Date32
500            | DataType::Time32(_)
501            | DataType::Interval(arrow_schema::IntervalUnit::YearMonth)
502            | DataType::Decimal32(_, _) => fixed_width!(4),
503            DataType::Int64
504            | DataType::UInt64
505            | DataType::Float64
506            | DataType::Date64
507            | DataType::Time64(_)
508            | DataType::Timestamp(_, _)
509            | DataType::Duration(_)
510            | DataType::Interval(arrow_schema::IntervalUnit::DayTime)
511            | DataType::Decimal64(_, _) => fixed_width!(8),
512            DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
513            | DataType::Decimal128(_, _) => fixed_width!(16),
514            DataType::Decimal256(_, _) => fixed_width!(32),
515            DataType::FixedSizeBinary(_) => binary_like!(array.as_fixed_size_binary()),
516            DataType::Binary => binary_like!(array.as_binary::<i32>()),
517            DataType::LargeBinary => binary_like!(array.as_binary::<i64>()),
518            DataType::Utf8 => binary_like!(array.as_string::<i32>()),
519            DataType::LargeUtf8 => binary_like!(array.as_string::<i64>()),
520            DataType::BinaryView => binary_like!(array.as_binary_view()),
521            DataType::Utf8View => binary_like!(array.as_string_view()),
522            DataType::Dictionary(_, _) => {
523                let dict = array.as_any_dictionary();
524                self.get_arrow_chunks(def_levels, rep_levels, dict.keys())?
525            }
526            _ => {
527                return Err(ParquetError::General(format!(
528                    "content-defined chunking is not supported for data type {dtype:?}",
529                )));
530            }
531        };
532        Ok(chunks)
533    }
534
535    #[cfg(debug_assertions)]
536    fn validate_chunks(&self, chunks: &[CdcChunk], num_levels: usize, total_values: usize) {
537        assert!(!chunks.is_empty(), "chunks must be non-empty");
538
539        let first = &chunks[0];
540        assert_eq!(first.level_offset, 0, "first chunk must start at level 0");
541        assert_eq!(first.value_offset, 0, "first chunk must start at value 0");
542
543        let mut sum_levels = first.num_levels;
544        let mut sum_values = first.num_values;
545        for i in 1..chunks.len() {
546            let chunk = &chunks[i];
547            let prev = &chunks[i - 1];
548            assert!(chunk.num_levels > 0, "chunk must have levels");
549            assert_eq!(
550                chunk.level_offset,
551                prev.level_offset + prev.num_levels,
552                "level offsets must be contiguous"
553            );
554            assert_eq!(
555                chunk.value_offset,
556                prev.value_offset + prev.num_values,
557                "value offsets must be contiguous"
558            );
559            sum_levels += chunk.num_levels;
560            sum_values += chunk.num_values;
561        }
562        assert_eq!(sum_levels, num_levels, "chunks must cover all levels");
563        assert_eq!(sum_values, total_values, "chunks must cover all values");
564
565        let last = chunks.last().unwrap();
566        assert_eq!(
567            last.level_offset + last.num_levels,
568            num_levels,
569            "last chunk must end at num_levels"
570        );
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use crate::basic::Type as PhysicalType;
578    #[cfg(feature = "arrow")]
579    use crate::column::writer::LevelDataRef;
580    use crate::schema::types::{ColumnPath, Type};
581    use std::sync::Arc;
582
583    fn make_desc(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
584        let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
585            .build()
586            .unwrap();
587        ColumnDescriptor::new(
588            Arc::new(tp),
589            max_def_level,
590            max_rep_level,
591            ColumnPath::new(vec![]),
592        )
593    }
594
595    #[test]
596    fn test_calculate_mask_defaults() {
597        let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 0).unwrap();
598        // avg = 640 KiB, target = (640-256)*1024/8 = 49152, log2(49152) = 15
599        // mask = u64::MAX << (64 - 15) = top 15 bits set
600        let expected = u64::MAX << (64 - 15);
601        assert_eq!(mask, expected);
602    }
603
604    #[test]
605    fn test_calculate_mask_with_norm_level() {
606        let mask = ContentDefinedChunker::calculate_mask(256 * 1024, 1024 * 1024, 1).unwrap();
607        let expected = u64::MAX << (64 - 14);
608        assert_eq!(mask, expected);
609    }
610
611    #[test]
612    fn test_calculate_mask_invalid() {
613        assert!(ContentDefinedChunker::calculate_mask(-1, 100, 0).is_err());
614        assert!(ContentDefinedChunker::calculate_mask(100, 50, 0).is_err());
615        assert!(ContentDefinedChunker::calculate_mask(100, 100, 0).is_err());
616    }
617
618    #[test]
619    fn test_non_nested_non_null_single_chunk() {
620        let options = CdcOptions {
621            min_chunk_size: 8,
622            max_chunk_size: 1024,
623            norm_level: 0,
624        };
625        let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
626
627        // Write a small amount of data — should produce exactly 1 chunk.
628        let num_values = 4;
629        let chunks = chunker.calculate(
630            LevelDataRef::Absent,
631            LevelDataRef::Absent,
632            num_values,
633            |c, i| {
634                c.roll_fixed::<4>(&(i as i32).to_le_bytes());
635            },
636        );
637        assert_eq!(chunks.len(), 1);
638        assert_eq!(chunks[0].level_offset, 0);
639        assert_eq!(chunks[0].value_offset, 0);
640        assert_eq!(chunks[0].num_levels, 4);
641    }
642
643    #[test]
644    fn test_max_chunk_size_forces_boundary() {
645        let options = CdcOptions {
646            min_chunk_size: 256,
647            max_chunk_size: 1024,
648            norm_level: 0,
649        };
650        let mut chunker = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
651
652        // Write enough data to exceed max_chunk_size multiple times.
653        // Each i32 = 4 bytes, max_chunk_size=1024, so ~256 values per chunk max.
654        let num_values = 2000;
655        let chunks = chunker.calculate(
656            LevelDataRef::Absent,
657            LevelDataRef::Absent,
658            num_values,
659            |c, i| {
660                c.roll_fixed::<4>(&(i as i32).to_le_bytes());
661            },
662        );
663
664        // Should have multiple chunks
665        assert!(chunks.len() > 1);
666
667        // Verify contiguity
668        let mut total_levels = 0;
669        for (i, chunk) in chunks.iter().enumerate() {
670            assert_eq!(chunk.level_offset, total_levels);
671            if i < chunks.len() - 1 {
672                assert!(chunk.num_levels > 0);
673            }
674            total_levels += chunk.num_levels;
675        }
676        assert_eq!(total_levels, num_values);
677    }
678
679    #[test]
680    fn test_deterministic_chunks() {
681        let options = CdcOptions {
682            min_chunk_size: 4,
683            max_chunk_size: 64,
684            norm_level: 0,
685        };
686
687        let roll = |c: &mut ContentDefinedChunker, i: usize| {
688            c.roll_fixed::<8>(&(i as i64).to_le_bytes());
689        };
690
691        let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
692        let chunks1 = chunker1.calculate(LevelDataRef::Absent, LevelDataRef::Absent, 200, roll);
693
694        let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap();
695        let chunks2 = chunker2.calculate(LevelDataRef::Absent, LevelDataRef::Absent, 200, roll);
696
697        assert_eq!(chunks1.len(), chunks2.len());
698        for (a, b) in chunks1.iter().zip(chunks2.iter()) {
699            assert_eq!(a.level_offset, b.level_offset);
700            assert_eq!(a.num_levels, b.num_levels);
701            assert_eq!(a.value_offset, b.value_offset);
702            assert_eq!(a.num_values, b.num_values);
703        }
704    }
705
706    #[test]
707    fn test_nullable_non_nested() {
708        let options = CdcOptions {
709            min_chunk_size: 4,
710            max_chunk_size: 64,
711            norm_level: 0,
712        };
713        let mut chunker = ContentDefinedChunker::new(&make_desc(1, 0), &options).unwrap();
714
715        let num_levels = 20;
716        // def_level=1 means non-null, def_level=0 means null
717        // Pattern: null at indices 0, 3, 6, 9, 12, 15, 18 → 7 nulls, 13 non-null
718        let def_levels: Vec<i16> = (0..num_levels)
719            .map(|i| if i % 3 == 0 { 0 } else { 1 })
720            .collect();
721        let expected_non_null: usize = def_levels.iter().filter(|&&d| d == 1).count();
722
723        let chunks = chunker.calculate(
724            LevelDataRef::Materialized(&def_levels),
725            LevelDataRef::Absent,
726            num_levels,
727            |c, i| {
728                c.roll_fixed::<4>(&(i as i32).to_le_bytes());
729            },
730        );
731
732        assert!(!chunks.is_empty());
733        let total_levels: usize = chunks.iter().map(|c| c.num_levels).sum();
734        let total_values: usize = chunks.iter().map(|c| c.num_values).sum();
735        assert_eq!(total_levels, num_levels);
736        assert_eq!(total_values, expected_non_null);
737        // With nulls present, total_values < total_levels
738        assert!(total_values < total_levels);
739    }
740}
741
742/// Integration tests that exercise CDC through the Arrow writer/reader roundtrip.
743/// Ported from the C++ test suite in `chunker_internal_test.cc`.
744#[cfg(all(test, feature = "arrow"))]
745mod arrow_tests {
746    use std::borrow::Borrow;
747    use std::sync::Arc;
748
749    use arrow::util::data_gen::create_random_batch;
750    use arrow_array::cast::AsArray;
751    use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
752    use arrow_buffer::Buffer;
753    use arrow_data::ArrayData;
754    use arrow_schema::{DataType, Field, Fields, Schema};
755
756    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
757    use crate::arrow::arrow_writer::ArrowWriter;
758    use crate::column::writer::LevelDataRef;
759    use crate::file::properties::{CdcOptions, WriterProperties};
760    use crate::file::reader::{FileReader, SerializedFileReader};
761
762    // --- Constants matching C++ TestCDCSingleRowGroup ---
763
764    const CDC_MIN_CHUNK_SIZE: usize = 4 * 1024;
765    const CDC_MAX_CHUNK_SIZE: usize = 16 * 1024;
766    const CDC_PART_SIZE: usize = 128 * 1024;
767    const CDC_EDIT_SIZE: usize = 128;
768    const CDC_ROW_GROUP_LENGTH: usize = 1024 * 1024;
769
770    // --- Helpers ---
771
772    /// Deterministic hash function matching the C++ test generator.
773    fn test_hash(seed: u64, index: u64) -> u64 {
774        let mut h = (index.wrapping_add(seed)).wrapping_mul(0xc4ceb9fe1a85ec53u64);
775        h ^= h >> 33;
776        h = h.wrapping_mul(0xff51afd7ed558ccdu64);
777        h ^= h >> 33;
778        h = h.wrapping_mul(0xc4ceb9fe1a85ec53u64);
779        h ^= h >> 33;
780        h
781    }
782
783    /// Generate a deterministic array for any supported data type, matching C++ `GenerateArray`.
784    fn generate_array(dtype: &DataType, nullable: bool, length: usize, seed: u64) -> ArrayRef {
785        macro_rules! gen_primitive {
786            ($array_type:ty, $cast:expr) => {{
787                if nullable {
788                    let arr: $array_type = (0..length)
789                        .map(|i| {
790                            let val = test_hash(seed, i as u64);
791                            if val % 10 == 0 {
792                                None
793                            } else {
794                                Some($cast(val))
795                            }
796                        })
797                        .collect();
798                    Arc::new(arr) as ArrayRef
799                } else {
800                    let arr: $array_type = (0..length)
801                        .map(|i| Some($cast(test_hash(seed, i as u64))))
802                        .collect();
803                    Arc::new(arr) as ArrayRef
804                }
805            }};
806        }
807
808        match dtype {
809            DataType::Boolean => {
810                if nullable {
811                    let arr: BooleanArray = (0..length)
812                        .map(|i| {
813                            let val = test_hash(seed, i as u64);
814                            if val % 10 == 0 {
815                                None
816                            } else {
817                                Some(val % 2 == 0)
818                            }
819                        })
820                        .collect();
821                    Arc::new(arr)
822                } else {
823                    let arr: BooleanArray = (0..length)
824                        .map(|i| Some(test_hash(seed, i as u64) % 2 == 0))
825                        .collect();
826                    Arc::new(arr)
827                }
828            }
829            DataType::Int32 => gen_primitive!(Int32Array, |v: u64| v as i32),
830            DataType::Int64 => {
831                gen_primitive!(arrow_array::Int64Array, |v: u64| v as i64)
832            }
833            DataType::Float64 => {
834                gen_primitive!(arrow_array::Float64Array, |v: u64| (v % 100000) as f64
835                    / 1000.0)
836            }
837            DataType::Utf8 => {
838                let arr: arrow_array::StringArray = if nullable {
839                    (0..length)
840                        .map(|i| {
841                            let val = test_hash(seed, i as u64);
842                            if val % 10 == 0 {
843                                None
844                            } else {
845                                Some(format!("str_{val}"))
846                            }
847                        })
848                        .collect()
849                } else {
850                    (0..length)
851                        .map(|i| Some(format!("str_{}", test_hash(seed, i as u64))))
852                        .collect()
853                };
854                Arc::new(arr)
855            }
856            DataType::Binary => {
857                let arr: arrow_array::BinaryArray = if nullable {
858                    (0..length)
859                        .map(|i| {
860                            let val = test_hash(seed, i as u64);
861                            if val % 10 == 0 {
862                                None
863                            } else {
864                                Some(format!("bin_{val}").into_bytes())
865                            }
866                        })
867                        .collect()
868                } else {
869                    (0..length)
870                        .map(|i| Some(format!("bin_{}", test_hash(seed, i as u64)).into_bytes()))
871                        .collect()
872                };
873                Arc::new(arr)
874            }
875            DataType::FixedSizeBinary(size) => {
876                let size = *size;
877                let mut builder = arrow_array::builder::FixedSizeBinaryBuilder::new(size);
878                for i in 0..length {
879                    let val = test_hash(seed, i as u64);
880                    if nullable && val % 10 == 0 {
881                        builder.append_null();
882                    } else {
883                        let s = format!("bin_{val}");
884                        let bytes = s.as_bytes();
885                        let mut buf = vec![0u8; size as usize];
886                        let copy_len = bytes.len().min(size as usize);
887                        buf[..copy_len].copy_from_slice(&bytes[..copy_len]);
888                        builder.append_value(&buf).unwrap();
889                    }
890                }
891                Arc::new(builder.finish())
892            }
893            DataType::Date32 => {
894                gen_primitive!(arrow_array::Date32Array, |v: u64| v as i32)
895            }
896            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
897                gen_primitive!(arrow_array::TimestampNanosecondArray, |v: u64| v as i64)
898            }
899            _ => panic!("Unsupported test data type: {dtype:?}"),
900        }
901    }
902
903    /// Generate a RecordBatch with the given schema, matching C++ `GenerateTable`.
904    fn generate_table(schema: &Arc<Schema>, length: usize, seed: u64) -> RecordBatch {
905        let arrays: Vec<ArrayRef> = schema
906            .fields()
907            .iter()
908            .enumerate()
909            .map(|(i, field)| {
910                generate_array(
911                    field.data_type(),
912                    field.is_nullable(),
913                    length,
914                    seed + i as u64 * 10,
915                )
916            })
917            .collect();
918        RecordBatch::try_new(schema.clone(), arrays).unwrap()
919    }
920
921    /// Compute the CDC byte width for a data type, matching C++ `bytes_per_record`.
922    /// Returns 0 for variable-length types.
923    fn cdc_byte_width(dtype: &DataType) -> usize {
924        match dtype {
925            DataType::Boolean => 1,
926            DataType::Int8 | DataType::UInt8 => 1,
927            DataType::Int16 | DataType::UInt16 | DataType::Float16 => 2,
928            DataType::Int32
929            | DataType::UInt32
930            | DataType::Float32
931            | DataType::Date32
932            | DataType::Time32(_) => 4,
933            DataType::Int64
934            | DataType::UInt64
935            | DataType::Float64
936            | DataType::Date64
937            | DataType::Time64(_)
938            | DataType::Timestamp(_, _)
939            | DataType::Duration(_) => 8,
940            DataType::Decimal128(_, _) => 16,
941            DataType::Decimal256(_, _) => 32,
942            DataType::FixedSizeBinary(n) => *n as usize,
943            _ => 0, // variable-length
944        }
945    }
946
947    /// Compute bytes_per_record for determining part/edit lengths, matching C++.
948    fn bytes_per_record(dtype: &DataType, nullable: bool) -> usize {
949        let bw = cdc_byte_width(dtype);
950        if bw > 0 {
951            if nullable { bw + 2 } else { bw }
952        } else {
953            16 // variable-length fallback, matching C++
954        }
955    }
956
957    /// Compute the CDC chunk size for an array slice, matching C++ `CalculateCdcSize`.
958    fn calculate_cdc_size(array: &dyn Array, nullable: bool) -> i64 {
959        let dtype = array.data_type();
960        let bw = cdc_byte_width(dtype);
961        let result = if bw > 0 {
962            // Fixed-width: count only non-null values
963            let valid_count = array.len() - array.null_count();
964            (valid_count * bw) as i64
965        } else {
966            // Variable-length: sum of actual byte lengths
967            match dtype {
968                DataType::Utf8 => {
969                    let a = array.as_string::<i32>();
970                    (0..a.len())
971                        .filter(|&i| a.is_valid(i))
972                        .map(|i| a.value(i).len() as i64)
973                        .sum()
974                }
975                DataType::Binary => {
976                    let a = array.as_binary::<i32>();
977                    (0..a.len())
978                        .filter(|&i| a.is_valid(i))
979                        .map(|i| a.value(i).len() as i64)
980                        .sum()
981                }
982                DataType::LargeBinary => {
983                    let a = array.as_binary::<i64>();
984                    (0..a.len())
985                        .filter(|&i| a.is_valid(i))
986                        .map(|i| a.value(i).len() as i64)
987                        .sum()
988                }
989                _ => panic!("CDC size calculation not implemented for {dtype:?}"),
990            }
991        };
992
993        if nullable {
994            // Add 2 bytes per element for definition levels
995            result + array.len() as i64 * 2
996        } else {
997            result
998        }
999    }
1000
1001    /// Page-level metadata for a single column within a row group.
1002    struct ColumnInfo {
1003        page_lengths: Vec<i64>,
1004        has_dictionary_page: bool,
1005    }
1006
1007    /// Extract per-row-group column info from Parquet data.
1008    fn get_column_info(data: &[u8], column_index: usize) -> Vec<ColumnInfo> {
1009        let reader = SerializedFileReader::new(bytes::Bytes::from(data.to_vec())).unwrap();
1010        let metadata = reader.metadata();
1011        let mut result = Vec::new();
1012        for rg in 0..metadata.num_row_groups() {
1013            let rg_reader = reader.get_row_group(rg).unwrap();
1014            let col_reader = rg_reader.get_column_page_reader(column_index).unwrap();
1015            let mut info = ColumnInfo {
1016                page_lengths: Vec::new(),
1017                has_dictionary_page: false,
1018            };
1019            for page in col_reader {
1020                let page = page.unwrap();
1021                match page.page_type() {
1022                    crate::basic::PageType::DATA_PAGE | crate::basic::PageType::DATA_PAGE_V2 => {
1023                        info.page_lengths.push(page.num_values() as i64);
1024                    }
1025                    crate::basic::PageType::DICTIONARY_PAGE => {
1026                        info.has_dictionary_page = true;
1027                    }
1028                    _ => {}
1029                }
1030            }
1031            result.push(info);
1032        }
1033        result
1034    }
1035
1036    /// Assert that CDC chunk sizes are within the expected range.
1037    /// Equivalent to C++ `AssertContentDefinedChunkSizes`.
1038    fn assert_cdc_chunk_sizes(
1039        array: &ArrayRef,
1040        info: &ColumnInfo,
1041        nullable: bool,
1042        min_chunk_size: usize,
1043        max_chunk_size: usize,
1044        expect_dictionary_page: bool,
1045    ) {
1046        // Boolean and FixedSizeBinary never produce dictionary pages (matching C++)
1047        let expect_dict = match array.data_type() {
1048            DataType::Boolean | DataType::FixedSizeBinary(_) => false,
1049            _ => expect_dictionary_page,
1050        };
1051        assert_eq!(
1052            info.has_dictionary_page,
1053            expect_dict,
1054            "dictionary page mismatch for {:?}",
1055            array.data_type()
1056        );
1057
1058        let page_lengths = &info.page_lengths;
1059        assert!(
1060            page_lengths.len() > 1,
1061            "CDC should produce multiple pages, got {page_lengths:?}"
1062        );
1063
1064        let bw = cdc_byte_width(array.data_type());
1065        // Only do exact CDC size validation for fixed-width and base binary-like types
1066        if bw > 0
1067            || matches!(
1068                array.data_type(),
1069                DataType::Utf8 | DataType::Binary | DataType::LargeBinary
1070            )
1071        {
1072            let mut offset = 0i64;
1073            for (i, &page_len) in page_lengths.iter().enumerate() {
1074                let slice = array.slice(offset as usize, page_len as usize);
1075                let cdc_size = calculate_cdc_size(slice.as_ref(), nullable);
1076                if i < page_lengths.len() - 1 {
1077                    assert!(
1078                        cdc_size >= min_chunk_size as i64,
1079                        "Page {i}: CDC size {cdc_size} < min {min_chunk_size}, pages={page_lengths:?}"
1080                    );
1081                }
1082                assert!(
1083                    cdc_size <= max_chunk_size as i64,
1084                    "Page {i}: CDC size {cdc_size} > max {max_chunk_size}, pages={page_lengths:?}"
1085                );
1086                offset += page_len;
1087            }
1088            assert_eq!(
1089                offset,
1090                array.len() as i64,
1091                "page lengths must sum to array length"
1092            );
1093        }
1094    }
1095
1096    /// Write batches with CDC options and validate roundtrip.
1097    /// Matches C++ `WriteTableToBuffer`.
1098    fn write_with_cdc_options(
1099        batches: &[&RecordBatch],
1100        min_chunk_size: usize,
1101        max_chunk_size: usize,
1102        max_row_group_rows: Option<usize>,
1103        enable_dictionary: bool,
1104    ) -> Vec<u8> {
1105        assert!(!batches.is_empty());
1106        let schema = batches[0].schema();
1107        let mut builder = WriterProperties::builder()
1108            .set_dictionary_enabled(enable_dictionary)
1109            .set_content_defined_chunking(Some(CdcOptions {
1110                min_chunk_size,
1111                max_chunk_size,
1112                norm_level: 0,
1113            }));
1114        if let Some(max_rows) = max_row_group_rows {
1115            builder = builder.set_max_row_group_row_count(Some(max_rows));
1116        }
1117        let props = builder.build();
1118        let mut buf = Vec::new();
1119        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
1120        for batch in batches {
1121            writer.write(batch).unwrap();
1122        }
1123        writer.close().unwrap();
1124
1125        // Roundtrip validation (matching C++ WriteTableToBuffer)
1126        let readback = read_batches(&buf);
1127        let original_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1128        let readback_rows: usize = readback.iter().map(|b| b.num_rows()).sum();
1129        assert_eq!(original_rows, readback_rows, "Roundtrip row count mismatch");
1130        if original_rows > 0 {
1131            let original = concat_batches(batches.iter().copied());
1132            let roundtrip = concat_batches(&readback);
1133            assert_eq!(original, roundtrip, "Roundtrip validation failed");
1134        }
1135
1136        buf
1137    }
1138
1139    #[test]
1140    fn cdc_all_null_arrow_column_writes_data_pages() {
1141        let array = Arc::new(Int32Array::from(vec![None::<i32>; 4096])) as ArrayRef;
1142        let schema = Arc::new(Schema::new(vec![Field::new("f0", DataType::Int32, true)]));
1143        let batch = RecordBatch::try_new(schema, vec![array.clone()]).unwrap();
1144
1145        let data = write_with_cdc_options(&[&batch], 64, 256, Some(4096), false);
1146        let info = get_column_info(&data, 0);
1147
1148        assert_eq!(info.len(), 1);
1149        assert!(
1150            !info[0].page_lengths.is_empty(),
1151            "all-null CDC write should still emit data pages"
1152        );
1153        assert_eq!(
1154            info[0].page_lengths.iter().sum::<i64>(),
1155            array.len() as i64,
1156            "all-null CDC pages should account for every input row"
1157        );
1158    }
1159
1160    fn read_batches(data: &[u8]) -> Vec<RecordBatch> {
1161        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(data.to_vec()))
1162            .unwrap()
1163            .build()
1164            .unwrap();
1165        reader.collect::<std::result::Result<Vec<_>, _>>().unwrap()
1166    }
1167
1168    fn concat_batches(batches: impl IntoIterator<Item = impl Borrow<RecordBatch>>) -> RecordBatch {
1169        let batches: Vec<_> = batches.into_iter().collect();
1170        let schema = batches[0].borrow().schema();
1171        let batches = batches.iter().map(|b| b.borrow());
1172        arrow_select::concat::concat_batches(&schema, batches).unwrap()
1173    }
1174
1175    /// LCS-based diff between two sequences of page lengths (ported from C++).
1176    /// Includes the merge-adjacent-diffs post-processing from C++.
1177    fn find_differences(first: &[i64], second: &[i64]) -> Vec<(Vec<i64>, Vec<i64>)> {
1178        let n = first.len();
1179        let m = second.len();
1180        let mut dp = vec![vec![0usize; m + 1]; n + 1];
1181        for i in 0..n {
1182            for j in 0..m {
1183                if first[i] == second[j] {
1184                    dp[i + 1][j + 1] = dp[i][j] + 1;
1185                } else {
1186                    dp[i + 1][j + 1] = dp[i + 1][j].max(dp[i][j + 1]);
1187                }
1188            }
1189        }
1190        let mut common = Vec::new();
1191        let (mut i, mut j) = (n, m);
1192        while i > 0 && j > 0 {
1193            if first[i - 1] == second[j - 1] {
1194                common.push((i - 1, j - 1));
1195                i -= 1;
1196                j -= 1;
1197            } else if dp[i - 1][j] >= dp[i][j - 1] {
1198                i -= 1;
1199            } else {
1200                j -= 1;
1201            }
1202        }
1203        common.reverse();
1204
1205        let mut result = Vec::new();
1206        let (mut last_i, mut last_j) = (0usize, 0usize);
1207        for (ci, cj) in &common {
1208            if *ci > last_i || *cj > last_j {
1209                result.push((first[last_i..*ci].to_vec(), second[last_j..*cj].to_vec()));
1210            }
1211            last_i = ci + 1;
1212            last_j = cj + 1;
1213        }
1214        if last_i < n || last_j < m {
1215            result.push((first[last_i..].to_vec(), second[last_j..].to_vec()));
1216        }
1217
1218        // Merge adjacent diffs (matching C++ post-processing)
1219        let mut merged: Vec<(Vec<i64>, Vec<i64>)> = Vec::new();
1220        for diff in result {
1221            if let Some(prev) = merged.last_mut() {
1222                if prev.0.is_empty() && diff.1.is_empty() {
1223                    prev.0 = diff.0;
1224                    continue;
1225                } else if prev.1.is_empty() && diff.0.is_empty() {
1226                    prev.1 = diff.1;
1227                    continue;
1228                }
1229            }
1230            merged.push(diff);
1231        }
1232        merged
1233    }
1234
1235    /// Assert exact page length differences between original and modified files.
1236    /// Matches C++ `AssertPageLengthDifferences` (full version).
1237    fn assert_page_length_differences(
1238        original: &ColumnInfo,
1239        modified: &ColumnInfo,
1240        exact_equal_diffs: usize,
1241        exact_larger_diffs: usize,
1242        exact_smaller_diffs: usize,
1243        edit_length: i64,
1244    ) {
1245        let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1246        let expected = exact_equal_diffs + exact_larger_diffs + exact_smaller_diffs;
1247
1248        if diffs.len() != expected {
1249            eprintln!("Original: {:?}", original.page_lengths);
1250            eprintln!("Modified: {:?}", modified.page_lengths);
1251            for d in &diffs {
1252                eprintln!("  Diff: {:?} vs {:?}", d.0, d.1);
1253            }
1254        }
1255        assert_eq!(
1256            diffs.len(),
1257            expected,
1258            "Expected {expected} diffs, got {}",
1259            diffs.len()
1260        );
1261
1262        let (mut eq, mut larger, mut smaller) = (0usize, 0usize, 0usize);
1263        for (left, right) in &diffs {
1264            let left_sum: i64 = left.iter().sum();
1265            let right_sum: i64 = right.iter().sum();
1266            if left_sum == right_sum {
1267                eq += 1;
1268            } else if left_sum < right_sum {
1269                larger += 1;
1270                assert_eq!(
1271                    left_sum + edit_length,
1272                    right_sum,
1273                    "Larger diff mismatch: {left_sum} + {edit_length} != {right_sum}"
1274                );
1275            } else {
1276                smaller += 1;
1277                assert_eq!(
1278                    left_sum,
1279                    right_sum + edit_length,
1280                    "Smaller diff mismatch: {left_sum} != {right_sum} + {edit_length}"
1281                );
1282            }
1283        }
1284
1285        assert_eq!(eq, exact_equal_diffs, "equal diffs count");
1286        assert_eq!(larger, exact_larger_diffs, "larger diffs count");
1287        assert_eq!(smaller, exact_smaller_diffs, "smaller diffs count");
1288    }
1289
1290    /// Assert page length differences for update cases (simplified version).
1291    /// Matches C++ `AssertPageLengthDifferences` (max_equal_diffs overload).
1292    fn assert_page_length_differences_update(
1293        original: &ColumnInfo,
1294        modified: &ColumnInfo,
1295        max_equal_diffs: usize,
1296    ) {
1297        let diffs = find_differences(&original.page_lengths, &modified.page_lengths);
1298        assert!(
1299            diffs.len() <= max_equal_diffs,
1300            "Expected at most {max_equal_diffs} diffs, got {}",
1301            diffs.len()
1302        );
1303        for (left, right) in &diffs {
1304            let left_sum: i64 = left.iter().sum();
1305            let right_sum: i64 = right.iter().sum();
1306            assert_eq!(
1307                left_sum, right_sum,
1308                "Update diff should not change total row count"
1309            );
1310        }
1311    }
1312
1313    // --- FindDifferences tests (ported from C++) ---
1314
1315    #[test]
1316    fn test_find_differences_basic() {
1317        let diffs = find_differences(&[1, 2, 3, 4, 5], &[1, 7, 8, 4, 5]);
1318        assert_eq!(diffs.len(), 1);
1319        assert_eq!(diffs[0].0, vec![2, 3]);
1320        assert_eq!(diffs[0].1, vec![7, 8]);
1321    }
1322
1323    #[test]
1324    fn test_find_differences_multiple() {
1325        let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7], &[1, 8, 9, 4, 10, 6, 11]);
1326        assert_eq!(diffs.len(), 3);
1327        assert_eq!(diffs[0].0, vec![2, 3]);
1328        assert_eq!(diffs[0].1, vec![8, 9]);
1329        assert_eq!(diffs[1].0, vec![5]);
1330        assert_eq!(diffs[1].1, vec![10]);
1331        assert_eq!(diffs[2].0, vec![7]);
1332        assert_eq!(diffs[2].1, vec![11]);
1333    }
1334
1335    #[test]
1336    fn test_find_differences_different_lengths() {
1337        let diffs = find_differences(&[1, 2, 3], &[1, 2, 3, 4, 5]);
1338        assert_eq!(diffs.len(), 1);
1339        assert!(diffs[0].0.is_empty());
1340        assert_eq!(diffs[0].1, vec![4, 5]);
1341    }
1342
1343    #[test]
1344    fn test_find_differences_empty() {
1345        let diffs = find_differences(&[], &[]);
1346        assert!(diffs.is_empty());
1347    }
1348
1349    #[test]
1350    fn test_find_differences_changes_at_both_ends() {
1351        let diffs = find_differences(&[1, 2, 3, 4, 5, 6, 7, 8, 9], &[0, 0, 2, 3, 4, 5, 7, 7, 8]);
1352        assert_eq!(diffs.len(), 3);
1353        assert_eq!(diffs[0].0, vec![1]);
1354        assert_eq!(diffs[0].1, vec![0, 0]);
1355        assert_eq!(diffs[1].0, vec![6]);
1356        assert_eq!(diffs[1].1, vec![7]);
1357        assert_eq!(diffs[2].0, vec![9]);
1358        assert!(diffs[2].1.is_empty());
1359    }
1360
1361    #[test]
1362    fn test_find_differences_additional() {
1363        let diffs = find_differences(
1364            &[445, 312, 393, 401, 410, 138, 558, 457],
1365            &[445, 312, 393, 393, 410, 138, 558, 457],
1366        );
1367        assert_eq!(diffs.len(), 1);
1368        assert_eq!(diffs[0].0, vec![401]);
1369        assert_eq!(diffs[0].1, vec![393]);
1370    }
1371
1372    // --- Parameterized single-row-group tests via macro ---
1373
1374    macro_rules! cdc_single_rg_tests {
1375        ($mod_name:ident, $dtype:expr, $nullable:expr) => {
1376            mod $mod_name {
1377                use super::*;
1378
1379                fn config() -> (DataType, bool, usize, usize) {
1380                    let dtype: DataType = $dtype;
1381                    let nullable: bool = $nullable;
1382                    let bpr = bytes_per_record(&dtype, nullable);
1383                    let part_length = CDC_PART_SIZE / bpr;
1384                    let edit_length = CDC_EDIT_SIZE / bpr;
1385                    (dtype, nullable, part_length, edit_length)
1386                }
1387
1388                fn make_schema(dtype: &DataType, nullable: bool) -> Arc<Schema> {
1389                    Arc::new(Schema::new(vec![Field::new("f0", dtype.clone(), nullable)]))
1390                }
1391
1392                #[test]
1393                fn delete_once() {
1394                    let (dtype, nullable, part_length, edit_length) = config();
1395                    let schema = make_schema(&dtype, nullable);
1396
1397                    let part1 = generate_table(&schema, part_length, 0);
1398                    let part2 = generate_table(&schema, edit_length, 1);
1399                    let part3 = generate_table(&schema, part_length, part_length as u64);
1400
1401                    let base = concat_batches([&part1, &part2, &part3]);
1402                    let modified = concat_batches([&part1, &part3]);
1403
1404                    for enable_dictionary in [false, true] {
1405                        let base_data = write_with_cdc_options(
1406                            &[&base],
1407                            CDC_MIN_CHUNK_SIZE,
1408                            CDC_MAX_CHUNK_SIZE,
1409                            Some(CDC_ROW_GROUP_LENGTH),
1410                            enable_dictionary,
1411                        );
1412                        let mod_data = write_with_cdc_options(
1413                            &[&modified],
1414                            CDC_MIN_CHUNK_SIZE,
1415                            CDC_MAX_CHUNK_SIZE,
1416                            Some(CDC_ROW_GROUP_LENGTH),
1417                            enable_dictionary,
1418                        );
1419
1420                        let base_info = get_column_info(&base_data, 0);
1421                        let mod_info = get_column_info(&mod_data, 0);
1422                        assert_eq!(base_info.len(), 1);
1423                        assert_eq!(mod_info.len(), 1);
1424
1425                        assert_cdc_chunk_sizes(
1426                            &base.column(0).clone(),
1427                            &base_info[0],
1428                            nullable,
1429                            CDC_MIN_CHUNK_SIZE,
1430                            CDC_MAX_CHUNK_SIZE,
1431                            enable_dictionary,
1432                        );
1433                        assert_cdc_chunk_sizes(
1434                            &modified.column(0).clone(),
1435                            &mod_info[0],
1436                            nullable,
1437                            CDC_MIN_CHUNK_SIZE,
1438                            CDC_MAX_CHUNK_SIZE,
1439                            enable_dictionary,
1440                        );
1441
1442                        assert_page_length_differences(
1443                            &base_info[0],
1444                            &mod_info[0],
1445                            0,
1446                            0,
1447                            1,
1448                            edit_length as i64,
1449                        );
1450                    }
1451                }
1452
1453                #[test]
1454                fn delete_twice() {
1455                    let (dtype, nullable, part_length, edit_length) = config();
1456                    let schema = make_schema(&dtype, nullable);
1457
1458                    let part1 = generate_table(&schema, part_length, 0);
1459                    let part2 = generate_table(&schema, edit_length, 1);
1460                    let part3 = generate_table(&schema, part_length, part_length as u64);
1461                    let part4 = generate_table(&schema, edit_length, 2);
1462                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1463
1464                    let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1465                    let modified = concat_batches([&part1, &part3, &part5]);
1466
1467                    for enable_dictionary in [false, true] {
1468                        let base_data = write_with_cdc_options(
1469                            &[&base],
1470                            CDC_MIN_CHUNK_SIZE,
1471                            CDC_MAX_CHUNK_SIZE,
1472                            Some(CDC_ROW_GROUP_LENGTH),
1473                            enable_dictionary,
1474                        );
1475                        let mod_data = write_with_cdc_options(
1476                            &[&modified],
1477                            CDC_MIN_CHUNK_SIZE,
1478                            CDC_MAX_CHUNK_SIZE,
1479                            Some(CDC_ROW_GROUP_LENGTH),
1480                            enable_dictionary,
1481                        );
1482
1483                        let base_info = get_column_info(&base_data, 0);
1484                        let mod_info = get_column_info(&mod_data, 0);
1485                        assert_eq!(base_info.len(), 1);
1486                        assert_eq!(mod_info.len(), 1);
1487
1488                        assert_cdc_chunk_sizes(
1489                            &base.column(0).clone(),
1490                            &base_info[0],
1491                            nullable,
1492                            CDC_MIN_CHUNK_SIZE,
1493                            CDC_MAX_CHUNK_SIZE,
1494                            enable_dictionary,
1495                        );
1496                        assert_cdc_chunk_sizes(
1497                            &modified.column(0).clone(),
1498                            &mod_info[0],
1499                            nullable,
1500                            CDC_MIN_CHUNK_SIZE,
1501                            CDC_MAX_CHUNK_SIZE,
1502                            enable_dictionary,
1503                        );
1504
1505                        assert_page_length_differences(
1506                            &base_info[0],
1507                            &mod_info[0],
1508                            0,
1509                            0,
1510                            2,
1511                            edit_length as i64,
1512                        );
1513                    }
1514                }
1515
1516                #[test]
1517                fn insert_once() {
1518                    let (dtype, nullable, part_length, edit_length) = config();
1519                    let schema = make_schema(&dtype, nullable);
1520
1521                    let part1 = generate_table(&schema, part_length, 0);
1522                    let part2 = generate_table(&schema, edit_length, 1);
1523                    let part3 = generate_table(&schema, part_length, part_length as u64);
1524
1525                    let base = concat_batches([&part1, &part3]);
1526                    let modified = concat_batches([&part1, &part2, &part3]);
1527
1528                    for enable_dictionary in [false, true] {
1529                        let base_data = write_with_cdc_options(
1530                            &[&base],
1531                            CDC_MIN_CHUNK_SIZE,
1532                            CDC_MAX_CHUNK_SIZE,
1533                            Some(CDC_ROW_GROUP_LENGTH),
1534                            enable_dictionary,
1535                        );
1536                        let mod_data = write_with_cdc_options(
1537                            &[&modified],
1538                            CDC_MIN_CHUNK_SIZE,
1539                            CDC_MAX_CHUNK_SIZE,
1540                            Some(CDC_ROW_GROUP_LENGTH),
1541                            enable_dictionary,
1542                        );
1543
1544                        let base_info = get_column_info(&base_data, 0);
1545                        let mod_info = get_column_info(&mod_data, 0);
1546                        assert_eq!(base_info.len(), 1);
1547                        assert_eq!(mod_info.len(), 1);
1548
1549                        assert_cdc_chunk_sizes(
1550                            &base.column(0).clone(),
1551                            &base_info[0],
1552                            nullable,
1553                            CDC_MIN_CHUNK_SIZE,
1554                            CDC_MAX_CHUNK_SIZE,
1555                            enable_dictionary,
1556                        );
1557                        assert_cdc_chunk_sizes(
1558                            &modified.column(0).clone(),
1559                            &mod_info[0],
1560                            nullable,
1561                            CDC_MIN_CHUNK_SIZE,
1562                            CDC_MAX_CHUNK_SIZE,
1563                            enable_dictionary,
1564                        );
1565
1566                        assert_page_length_differences(
1567                            &base_info[0],
1568                            &mod_info[0],
1569                            0,
1570                            1,
1571                            0,
1572                            edit_length as i64,
1573                        );
1574                    }
1575                }
1576
1577                #[test]
1578                fn insert_twice() {
1579                    let (dtype, nullable, part_length, edit_length) = config();
1580                    let schema = make_schema(&dtype, nullable);
1581
1582                    let part1 = generate_table(&schema, part_length, 0);
1583                    let part2 = generate_table(&schema, edit_length, 1);
1584                    let part3 = generate_table(&schema, part_length, part_length as u64);
1585                    let part4 = generate_table(&schema, edit_length, 2);
1586                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1587
1588                    let base = concat_batches([&part1, &part3, &part5]);
1589                    let modified = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1590
1591                    for enable_dictionary in [false, true] {
1592                        let base_data = write_with_cdc_options(
1593                            &[&base],
1594                            CDC_MIN_CHUNK_SIZE,
1595                            CDC_MAX_CHUNK_SIZE,
1596                            Some(CDC_ROW_GROUP_LENGTH),
1597                            enable_dictionary,
1598                        );
1599                        let mod_data = write_with_cdc_options(
1600                            &[&modified],
1601                            CDC_MIN_CHUNK_SIZE,
1602                            CDC_MAX_CHUNK_SIZE,
1603                            Some(CDC_ROW_GROUP_LENGTH),
1604                            enable_dictionary,
1605                        );
1606
1607                        let base_info = get_column_info(&base_data, 0);
1608                        let mod_info = get_column_info(&mod_data, 0);
1609                        assert_eq!(base_info.len(), 1);
1610                        assert_eq!(mod_info.len(), 1);
1611
1612                        assert_cdc_chunk_sizes(
1613                            &base.column(0).clone(),
1614                            &base_info[0],
1615                            nullable,
1616                            CDC_MIN_CHUNK_SIZE,
1617                            CDC_MAX_CHUNK_SIZE,
1618                            enable_dictionary,
1619                        );
1620                        assert_cdc_chunk_sizes(
1621                            &modified.column(0).clone(),
1622                            &mod_info[0],
1623                            nullable,
1624                            CDC_MIN_CHUNK_SIZE,
1625                            CDC_MAX_CHUNK_SIZE,
1626                            enable_dictionary,
1627                        );
1628
1629                        assert_page_length_differences(
1630                            &base_info[0],
1631                            &mod_info[0],
1632                            0,
1633                            2,
1634                            0,
1635                            edit_length as i64,
1636                        );
1637                    }
1638                }
1639
1640                #[test]
1641                fn update_once() {
1642                    let (dtype, nullable, part_length, edit_length) = config();
1643                    let schema = make_schema(&dtype, nullable);
1644
1645                    let part1 = generate_table(&schema, part_length, 0);
1646                    let part2 = generate_table(&schema, edit_length, 1);
1647                    let part3 = generate_table(&schema, part_length, part_length as u64);
1648                    let part4 = generate_table(&schema, edit_length, 2);
1649
1650                    let base = concat_batches([&part1, &part2, &part3]);
1651                    let modified = concat_batches([&part1, &part4, &part3]);
1652
1653                    for enable_dictionary in [false, true] {
1654                        let base_data = write_with_cdc_options(
1655                            &[&base],
1656                            CDC_MIN_CHUNK_SIZE,
1657                            CDC_MAX_CHUNK_SIZE,
1658                            Some(CDC_ROW_GROUP_LENGTH),
1659                            enable_dictionary,
1660                        );
1661                        let mod_data = write_with_cdc_options(
1662                            &[&modified],
1663                            CDC_MIN_CHUNK_SIZE,
1664                            CDC_MAX_CHUNK_SIZE,
1665                            Some(CDC_ROW_GROUP_LENGTH),
1666                            enable_dictionary,
1667                        );
1668
1669                        let base_info = get_column_info(&base_data, 0);
1670                        let mod_info = get_column_info(&mod_data, 0);
1671                        assert_eq!(base_info.len(), 1);
1672                        assert_eq!(mod_info.len(), 1);
1673
1674                        assert_cdc_chunk_sizes(
1675                            &base.column(0).clone(),
1676                            &base_info[0],
1677                            nullable,
1678                            CDC_MIN_CHUNK_SIZE,
1679                            CDC_MAX_CHUNK_SIZE,
1680                            enable_dictionary,
1681                        );
1682                        assert_cdc_chunk_sizes(
1683                            &modified.column(0).clone(),
1684                            &mod_info[0],
1685                            nullable,
1686                            CDC_MIN_CHUNK_SIZE,
1687                            CDC_MAX_CHUNK_SIZE,
1688                            enable_dictionary,
1689                        );
1690
1691                        assert_page_length_differences_update(&base_info[0], &mod_info[0], 1);
1692                    }
1693                }
1694
1695                #[test]
1696                fn update_twice() {
1697                    let (dtype, nullable, part_length, edit_length) = config();
1698                    let schema = make_schema(&dtype, nullable);
1699
1700                    let part1 = generate_table(&schema, part_length, 0);
1701                    let part2 = generate_table(&schema, edit_length, 1);
1702                    let part3 = generate_table(&schema, part_length, part_length as u64);
1703                    let part4 = generate_table(&schema, edit_length, 2);
1704                    let part5 = generate_table(&schema, part_length, 2 * part_length as u64);
1705                    let part6 = generate_table(&schema, edit_length, 3);
1706                    let part7 = generate_table(&schema, edit_length, 4);
1707
1708                    let base = concat_batches([&part1, &part2, &part3, &part4, &part5]);
1709                    let modified = concat_batches([&part1, &part6, &part3, &part7, &part5]);
1710
1711                    for enable_dictionary in [false, true] {
1712                        let base_data = write_with_cdc_options(
1713                            &[&base],
1714                            CDC_MIN_CHUNK_SIZE,
1715                            CDC_MAX_CHUNK_SIZE,
1716                            Some(CDC_ROW_GROUP_LENGTH),
1717                            enable_dictionary,
1718                        );
1719                        let mod_data = write_with_cdc_options(
1720                            &[&modified],
1721                            CDC_MIN_CHUNK_SIZE,
1722                            CDC_MAX_CHUNK_SIZE,
1723                            Some(CDC_ROW_GROUP_LENGTH),
1724                            enable_dictionary,
1725                        );
1726
1727                        let base_info = get_column_info(&base_data, 0);
1728                        let mod_info = get_column_info(&mod_data, 0);
1729                        assert_eq!(base_info.len(), 1);
1730                        assert_eq!(mod_info.len(), 1);
1731
1732                        assert_cdc_chunk_sizes(
1733                            &base.column(0).clone(),
1734                            &base_info[0],
1735                            nullable,
1736                            CDC_MIN_CHUNK_SIZE,
1737                            CDC_MAX_CHUNK_SIZE,
1738                            enable_dictionary,
1739                        );
1740                        assert_cdc_chunk_sizes(
1741                            &modified.column(0).clone(),
1742                            &mod_info[0],
1743                            nullable,
1744                            CDC_MIN_CHUNK_SIZE,
1745                            CDC_MAX_CHUNK_SIZE,
1746                            enable_dictionary,
1747                        );
1748
1749                        assert_page_length_differences_update(&base_info[0], &mod_info[0], 2);
1750                    }
1751                }
1752
1753                #[test]
1754                fn prepend() {
1755                    let (dtype, nullable, part_length, edit_length) = config();
1756                    let schema = make_schema(&dtype, nullable);
1757
1758                    let part1 = generate_table(&schema, part_length, 0);
1759                    let part2 = generate_table(&schema, edit_length, 1);
1760                    let part3 = generate_table(&schema, part_length, part_length as u64);
1761                    let part4 = generate_table(&schema, edit_length, 2);
1762
1763                    let base = concat_batches([&part1, &part2, &part3]);
1764                    let modified = concat_batches([&part4, &part1, &part2, &part3]);
1765
1766                    for enable_dictionary in [false, true] {
1767                        let base_data = write_with_cdc_options(
1768                            &[&base],
1769                            CDC_MIN_CHUNK_SIZE,
1770                            CDC_MAX_CHUNK_SIZE,
1771                            Some(CDC_ROW_GROUP_LENGTH),
1772                            enable_dictionary,
1773                        );
1774                        let mod_data = write_with_cdc_options(
1775                            &[&modified],
1776                            CDC_MIN_CHUNK_SIZE,
1777                            CDC_MAX_CHUNK_SIZE,
1778                            Some(CDC_ROW_GROUP_LENGTH),
1779                            enable_dictionary,
1780                        );
1781
1782                        let base_info = get_column_info(&base_data, 0);
1783                        let mod_info = get_column_info(&mod_data, 0);
1784                        assert_eq!(base_info.len(), 1);
1785                        assert_eq!(mod_info.len(), 1);
1786
1787                        assert_cdc_chunk_sizes(
1788                            &base.column(0).clone(),
1789                            &base_info[0],
1790                            nullable,
1791                            CDC_MIN_CHUNK_SIZE,
1792                            CDC_MAX_CHUNK_SIZE,
1793                            enable_dictionary,
1794                        );
1795                        assert_cdc_chunk_sizes(
1796                            &modified.column(0).clone(),
1797                            &mod_info[0],
1798                            nullable,
1799                            CDC_MIN_CHUNK_SIZE,
1800                            CDC_MAX_CHUNK_SIZE,
1801                            enable_dictionary,
1802                        );
1803
1804                        assert!(
1805                            mod_info[0].page_lengths.len() >= base_info[0].page_lengths.len(),
1806                            "Modified should have same or more pages"
1807                        );
1808
1809                        assert_page_length_differences(
1810                            &base_info[0],
1811                            &mod_info[0],
1812                            0,
1813                            1,
1814                            0,
1815                            edit_length as i64,
1816                        );
1817                    }
1818                }
1819
1820                #[test]
1821                fn append() {
1822                    let (dtype, nullable, part_length, edit_length) = config();
1823                    let schema = make_schema(&dtype, nullable);
1824
1825                    let part1 = generate_table(&schema, part_length, 0);
1826                    let part2 = generate_table(&schema, edit_length, 1);
1827                    let part3 = generate_table(&schema, part_length, part_length as u64);
1828                    let part4 = generate_table(&schema, edit_length, 2);
1829
1830                    let base = concat_batches([&part1, &part2, &part3]);
1831                    let modified = concat_batches([&part1, &part2, &part3, &part4]);
1832
1833                    for enable_dictionary in [false, true] {
1834                        let base_data = write_with_cdc_options(
1835                            &[&base],
1836                            CDC_MIN_CHUNK_SIZE,
1837                            CDC_MAX_CHUNK_SIZE,
1838                            Some(CDC_ROW_GROUP_LENGTH),
1839                            enable_dictionary,
1840                        );
1841                        let mod_data = write_with_cdc_options(
1842                            &[&modified],
1843                            CDC_MIN_CHUNK_SIZE,
1844                            CDC_MAX_CHUNK_SIZE,
1845                            Some(CDC_ROW_GROUP_LENGTH),
1846                            enable_dictionary,
1847                        );
1848
1849                        let base_info = get_column_info(&base_data, 0);
1850                        let mod_info = get_column_info(&mod_data, 0);
1851                        assert_eq!(base_info.len(), 1);
1852                        assert_eq!(mod_info.len(), 1);
1853
1854                        assert_cdc_chunk_sizes(
1855                            &base.column(0).clone(),
1856                            &base_info[0],
1857                            nullable,
1858                            CDC_MIN_CHUNK_SIZE,
1859                            CDC_MAX_CHUNK_SIZE,
1860                            enable_dictionary,
1861                        );
1862                        assert_cdc_chunk_sizes(
1863                            &modified.column(0).clone(),
1864                            &mod_info[0],
1865                            nullable,
1866                            CDC_MIN_CHUNK_SIZE,
1867                            CDC_MAX_CHUNK_SIZE,
1868                            enable_dictionary,
1869                        );
1870
1871                        let bp = &base_info[0].page_lengths;
1872                        let mp = &mod_info[0].page_lengths;
1873                        assert!(mp.len() >= bp.len());
1874                        for i in 0..bp.len() - 1 {
1875                            assert_eq!(bp[i], mp[i], "Page {i} should be identical");
1876                        }
1877                        assert!(mp[bp.len() - 1] >= bp[bp.len() - 1]);
1878                    }
1879                }
1880
1881                #[test]
1882                fn empty_table() {
1883                    let (dtype, nullable, _, _) = config();
1884                    let schema = make_schema(&dtype, nullable);
1885
1886                    let empty = RecordBatch::new_empty(schema);
1887                    for enable_dictionary in [false, true] {
1888                        let data = write_with_cdc_options(
1889                            &[&empty],
1890                            CDC_MIN_CHUNK_SIZE,
1891                            CDC_MAX_CHUNK_SIZE,
1892                            Some(CDC_ROW_GROUP_LENGTH),
1893                            enable_dictionary,
1894                        );
1895                        let info = get_column_info(&data, 0);
1896                        // Empty table: either no row groups or one with no data pages
1897                        if !info.is_empty() {
1898                            assert!(info[0].page_lengths.is_empty());
1899                        }
1900                    }
1901                }
1902
1903                #[test]
1904                fn array_offsets() {
1905                    let (dtype, nullable, part_length, edit_length) = config();
1906                    let schema = make_schema(&dtype, nullable);
1907
1908                    let table = concat_batches([
1909                        &generate_table(&schema, part_length, 0),
1910                        &generate_table(&schema, edit_length, 1),
1911                        &generate_table(&schema, part_length, part_length as u64),
1912                    ]);
1913
1914                    for offset in [0usize, 512, 1024] {
1915                        if offset >= table.num_rows() {
1916                            continue;
1917                        }
1918                        let sliced = table.slice(offset, table.num_rows() - offset);
1919                        let data = write_with_cdc_options(
1920                            &[&sliced],
1921                            CDC_MIN_CHUNK_SIZE,
1922                            CDC_MAX_CHUNK_SIZE,
1923                            Some(CDC_ROW_GROUP_LENGTH),
1924                            true,
1925                        );
1926                        let info = get_column_info(&data, 0);
1927                        assert_eq!(info.len(), 1);
1928
1929                        // Verify CDC actually produced content-defined chunks
1930                        assert_cdc_chunk_sizes(
1931                            &sliced.column(0).clone(),
1932                            &info[0],
1933                            nullable,
1934                            CDC_MIN_CHUNK_SIZE,
1935                            CDC_MAX_CHUNK_SIZE,
1936                            true,
1937                        );
1938                    }
1939                }
1940            }
1941        };
1942    }
1943
1944    // Instantiate for representative types matching C++ categories
1945    cdc_single_rg_tests!(cdc_bool_non_null, DataType::Boolean, false);
1946    cdc_single_rg_tests!(cdc_i32_non_null, DataType::Int32, false);
1947    cdc_single_rg_tests!(cdc_i64_nullable, DataType::Int64, true);
1948    cdc_single_rg_tests!(cdc_f64_nullable, DataType::Float64, true);
1949    cdc_single_rg_tests!(cdc_utf8_non_null, DataType::Utf8, false);
1950    cdc_single_rg_tests!(cdc_binary_nullable, DataType::Binary, true);
1951    cdc_single_rg_tests!(cdc_fsb16_nullable, DataType::FixedSizeBinary(16), true);
1952    cdc_single_rg_tests!(cdc_date32_non_null, DataType::Date32, false);
1953    cdc_single_rg_tests!(
1954        cdc_timestamp_nullable,
1955        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1956        true
1957    );
1958
1959    // --- Multiple row group tests matching C++ TestCDCMultipleRowGroups ---
1960
1961    mod cdc_multiple_row_groups {
1962        use super::*;
1963
1964        const PART_LENGTH: usize = 128 * 1024;
1965        const EDIT_LENGTH: usize = 128;
1966        const ROW_GROUP_LENGTH: usize = 64 * 1024;
1967
1968        fn schema() -> Arc<Schema> {
1969            Arc::new(Schema::new(vec![
1970                Field::new("int32", DataType::Int32, true),
1971                Field::new("float64", DataType::Float64, true),
1972                Field::new("bool", DataType::Boolean, false),
1973            ]))
1974        }
1975
1976        #[test]
1977        fn insert_once() {
1978            let s = schema();
1979            let part1 = generate_table(&s, PART_LENGTH, 0);
1980            let part2 = generate_table(&s, PART_LENGTH, 2);
1981            let part3 = generate_table(&s, PART_LENGTH, 4);
1982            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
1983            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
1984
1985            let base = concat_batches([&part1, &edit1, &part2, &part3]);
1986            let modified = concat_batches([&part1, &edit1, &edit2, &part2, &part3]);
1987            assert_eq!(modified.num_rows(), base.num_rows() + EDIT_LENGTH);
1988
1989            let base_data = write_with_cdc_options(
1990                &[&base],
1991                CDC_MIN_CHUNK_SIZE,
1992                CDC_MAX_CHUNK_SIZE,
1993                Some(ROW_GROUP_LENGTH),
1994                false,
1995            );
1996            let mod_data = write_with_cdc_options(
1997                &[&modified],
1998                CDC_MIN_CHUNK_SIZE,
1999                CDC_MAX_CHUNK_SIZE,
2000                Some(ROW_GROUP_LENGTH),
2001                false,
2002            );
2003
2004            for col in 0..s.fields().len() {
2005                let base_info = get_column_info(&base_data, col);
2006                let mod_info = get_column_info(&mod_data, col);
2007
2008                assert_eq!(base_info.len(), 7, "expected 7 row groups for col {col}");
2009                assert_eq!(mod_info.len(), 7);
2010
2011                // First two row groups should be identical
2012                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2013                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2014
2015                // Middle row groups: 1 larger + 1 smaller diff
2016                for i in 2..mod_info.len() - 1 {
2017                    assert_page_length_differences(
2018                        &base_info[i],
2019                        &mod_info[i],
2020                        0,
2021                        1,
2022                        1,
2023                        EDIT_LENGTH as i64,
2024                    );
2025                }
2026                // Last row group: just larger
2027                assert_page_length_differences(
2028                    base_info.last().unwrap(),
2029                    mod_info.last().unwrap(),
2030                    0,
2031                    1,
2032                    0,
2033                    EDIT_LENGTH as i64,
2034                );
2035            }
2036        }
2037
2038        #[test]
2039        fn delete_once() {
2040            let s = schema();
2041            let part1 = generate_table(&s, PART_LENGTH, 0);
2042            let part2 = generate_table(&s, PART_LENGTH, 2);
2043            let part3 = generate_table(&s, PART_LENGTH, 4);
2044            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2045            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2046
2047            let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2048            let modified = concat_batches([&part1, &part2, &part3, &edit2]);
2049
2050            let base_data = write_with_cdc_options(
2051                &[&base],
2052                CDC_MIN_CHUNK_SIZE,
2053                CDC_MAX_CHUNK_SIZE,
2054                Some(ROW_GROUP_LENGTH),
2055                false,
2056            );
2057            let mod_data = write_with_cdc_options(
2058                &[&modified],
2059                CDC_MIN_CHUNK_SIZE,
2060                CDC_MAX_CHUNK_SIZE,
2061                Some(ROW_GROUP_LENGTH),
2062                false,
2063            );
2064
2065            for col in 0..s.fields().len() {
2066                let base_info = get_column_info(&base_data, col);
2067                let mod_info = get_column_info(&mod_data, col);
2068
2069                assert_eq!(base_info.len(), 7);
2070                assert_eq!(mod_info.len(), 7);
2071
2072                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2073                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2074
2075                for i in 2..mod_info.len() - 1 {
2076                    assert_page_length_differences(
2077                        &base_info[i],
2078                        &mod_info[i],
2079                        0,
2080                        1,
2081                        1,
2082                        EDIT_LENGTH as i64,
2083                    );
2084                }
2085                assert_page_length_differences(
2086                    base_info.last().unwrap(),
2087                    mod_info.last().unwrap(),
2088                    0,
2089                    0,
2090                    1,
2091                    EDIT_LENGTH as i64,
2092                );
2093            }
2094        }
2095
2096        #[test]
2097        fn update_once() {
2098            let s = schema();
2099            let part1 = generate_table(&s, PART_LENGTH, 0);
2100            let part2 = generate_table(&s, PART_LENGTH, 2);
2101            let part3 = generate_table(&s, PART_LENGTH, 4);
2102            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2103            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2104            let edit3 = generate_table(&s, EDIT_LENGTH, 5);
2105
2106            let base = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2107            let modified = concat_batches([&part1, &edit3, &part2, &part3, &edit2]);
2108
2109            let base_data = write_with_cdc_options(
2110                &[&base],
2111                CDC_MIN_CHUNK_SIZE,
2112                CDC_MAX_CHUNK_SIZE,
2113                Some(ROW_GROUP_LENGTH),
2114                false,
2115            );
2116            let mod_data = write_with_cdc_options(
2117                &[&modified],
2118                CDC_MIN_CHUNK_SIZE,
2119                CDC_MAX_CHUNK_SIZE,
2120                Some(ROW_GROUP_LENGTH),
2121                false,
2122            );
2123
2124            for col in 0..s.fields().len() {
2125                let nullable = s.field(col).is_nullable();
2126                let base_info = get_column_info(&base_data, col);
2127                let mod_info = get_column_info(&mod_data, col);
2128
2129                assert_eq!(base_info.len(), 7);
2130                assert_eq!(mod_info.len(), 7);
2131
2132                // Validate CDC chunk sizes on at least the first row group
2133                assert_cdc_chunk_sizes(
2134                    &base.column(col).slice(0, ROW_GROUP_LENGTH),
2135                    &base_info[0],
2136                    nullable,
2137                    CDC_MIN_CHUNK_SIZE,
2138                    CDC_MAX_CHUNK_SIZE,
2139                    false,
2140                );
2141
2142                assert_eq!(base_info[0].page_lengths, mod_info[0].page_lengths);
2143                assert_eq!(base_info[1].page_lengths, mod_info[1].page_lengths);
2144
2145                // Row group containing the edit
2146                assert_page_length_differences_update(&base_info[2], &mod_info[2], 1);
2147
2148                // Remaining row groups should be identical
2149                for i in 3..mod_info.len() {
2150                    assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2151                }
2152            }
2153        }
2154
2155        #[test]
2156        fn append() {
2157            let s = schema();
2158            let part1 = generate_table(&s, PART_LENGTH, 0);
2159            let part2 = generate_table(&s, PART_LENGTH, 2);
2160            let part3 = generate_table(&s, PART_LENGTH, 4);
2161            let edit1 = generate_table(&s, EDIT_LENGTH, 1);
2162            let edit2 = generate_table(&s, EDIT_LENGTH, 3);
2163
2164            let base = concat_batches([&part1, &edit1, &part2, &part3]);
2165            let modified = concat_batches([&part1, &edit1, &part2, &part3, &edit2]);
2166
2167            let base_data = write_with_cdc_options(
2168                &[&base],
2169                CDC_MIN_CHUNK_SIZE,
2170                CDC_MAX_CHUNK_SIZE,
2171                Some(ROW_GROUP_LENGTH),
2172                false,
2173            );
2174            let mod_data = write_with_cdc_options(
2175                &[&modified],
2176                CDC_MIN_CHUNK_SIZE,
2177                CDC_MAX_CHUNK_SIZE,
2178                Some(ROW_GROUP_LENGTH),
2179                false,
2180            );
2181
2182            for col in 0..s.fields().len() {
2183                let nullable = s.field(col).is_nullable();
2184                let base_info = get_column_info(&base_data, col);
2185                let mod_info = get_column_info(&mod_data, col);
2186
2187                assert_eq!(base_info.len(), 7);
2188                assert_eq!(mod_info.len(), 7);
2189
2190                // Validate CDC chunk sizes on the first row group
2191                assert_cdc_chunk_sizes(
2192                    &base.column(col).slice(0, ROW_GROUP_LENGTH),
2193                    &base_info[0],
2194                    nullable,
2195                    CDC_MIN_CHUNK_SIZE,
2196                    CDC_MAX_CHUNK_SIZE,
2197                    false,
2198                );
2199
2200                // All row groups except last should be identical
2201                for i in 0..base_info.len() - 1 {
2202                    assert_eq!(base_info[i].page_lengths, mod_info[i].page_lengths);
2203                }
2204
2205                // Last row group: pages should be identical except last
2206                let bp = &base_info.last().unwrap().page_lengths;
2207                let mp = &mod_info.last().unwrap().page_lengths;
2208                assert!(mp.len() >= bp.len());
2209                for i in 0..bp.len() - 1 {
2210                    assert_eq!(bp[i], mp[i]);
2211                }
2212            }
2213        }
2214    }
2215
2216    // --- Direct chunker test (kept from original) ---
2217
2218    #[test]
2219    fn test_cdc_array_offsets_direct() {
2220        use crate::basic::Type as PhysicalType;
2221        use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
2222
2223        let options = CdcOptions {
2224            min_chunk_size: CDC_MIN_CHUNK_SIZE,
2225            max_chunk_size: CDC_MAX_CHUNK_SIZE,
2226            norm_level: 0,
2227        };
2228        let desc = {
2229            let tp = Type::primitive_type_builder("col", PhysicalType::INT32)
2230                .build()
2231                .unwrap();
2232            ColumnDescriptor::new(Arc::new(tp), 0, 0, ColumnPath::new(vec![]))
2233        };
2234
2235        let bpr = bytes_per_record(&DataType::Int32, false);
2236        let n = CDC_PART_SIZE / bpr;
2237        let offset = 10usize;
2238
2239        let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as i32).collect();
2240        let mut chunker = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2241        let chunks = chunker
2242            .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent, &array)
2243            .unwrap();
2244
2245        let sliced = array.slice(offset, n - offset);
2246        let mut chunker2 = super::ContentDefinedChunker::new(&desc, &options).unwrap();
2247        let chunks2 = chunker2
2248            .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent, &sliced)
2249            .unwrap();
2250
2251        let values: Vec<usize> = chunks.iter().map(|c| c.num_values).collect();
2252        let values2: Vec<usize> = chunks2.iter().map(|c| c.num_values).collect();
2253
2254        assert!(values.len() > 1, "expected multiple chunks, got {values:?}");
2255        assert_eq!(values.len(), values2.len(), "chunk count must match");
2256
2257        assert_eq!(
2258            values[0] - values2[0],
2259            offset,
2260            "offsetted first chunk should be {offset} values shorter"
2261        );
2262        assert_eq!(
2263            &values[1..],
2264            &values2[1..],
2265            "all chunks after the first must be identical"
2266        );
2267    }
2268
2269    /// Regression test for <https://github.com/apache/arrow-rs/issues/9637>
2270    ///
2271    /// Writing nested list data with CDC enabled panicked with an out-of-bounds
2272    /// slice access when null list entries had non-zero child ranges.
2273    #[test]
2274    fn test_cdc_list_roundtrip() {
2275        let schema = Arc::new(Schema::new(vec![
2276            Field::new(
2277                "_1",
2278                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2279                true,
2280            ),
2281            Field::new(
2282                "_2",
2283                DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2284                true,
2285            ),
2286            Field::new(
2287                "_3",
2288                DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, true))),
2289                true,
2290            ),
2291        ]));
2292        let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2293        write_with_cdc_options(
2294            &[&batch],
2295            CDC_MIN_CHUNK_SIZE,
2296            CDC_MAX_CHUNK_SIZE,
2297            None,
2298            true,
2299        );
2300    }
2301
2302    /// Test CDC with deeply nested types: List<List<Int32>>, List<Struct<List<Int32>>>
2303    #[test]
2304    fn test_cdc_deeply_nested_roundtrip() {
2305        let inner_field = Field::new_list_field(DataType::Int32, true);
2306        let inner_type = DataType::List(Arc::new(inner_field));
2307        let outer_field = Field::new_list_field(inner_type.clone(), true);
2308        let list_list_type = DataType::List(Arc::new(outer_field));
2309
2310        let struct_inner_field = Field::new_list_field(DataType::Int32, true);
2311        let struct_inner_type = DataType::List(Arc::new(struct_inner_field));
2312        let struct_fields = Fields::from(vec![Field::new("a", struct_inner_type, true)]);
2313        let struct_type = DataType::Struct(struct_fields);
2314        let struct_list_field = Field::new_list_field(struct_type, true);
2315        let list_struct_type = DataType::List(Arc::new(struct_list_field));
2316
2317        let schema = Arc::new(Schema::new(vec![
2318            Field::new("list_list", list_list_type, true),
2319            Field::new("list_struct_list", list_struct_type, true),
2320        ]));
2321        let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
2322        write_with_cdc_options(
2323            &[&batch],
2324            CDC_MIN_CHUNK_SIZE,
2325            CDC_MAX_CHUNK_SIZE,
2326            None,
2327            true,
2328        );
2329    }
2330
2331    /// Test CDC with list arrays that have non-empty null segments.
2332    ///
2333    /// Per the Arrow columnar format spec: "a null value may correspond to a
2334    /// non-empty segment in the child array". This test constructs such arrays
2335    /// manually and verifies the CDC writer handles them correctly.
2336    #[test]
2337    fn test_cdc_list_non_empty_null_segments() {
2338        // Build List<Int32> where null entries own non-zero child ranges:
2339        //   row 0: [1, 2]     offsets[0..2]  valid
2340        //   row 1: null        offsets[2..5]  null, but owns 3 child values
2341        //   row 2: [6, 7]     offsets[5..7]  valid
2342        //   row 3: null        offsets[7..9]  null, but owns 2 child values
2343        //   row 4: [10]        offsets[9..10] valid
2344        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2345        let offsets = Buffer::from_iter([0_i32, 2, 5, 7, 9, 10]);
2346        let null_bitmap = Buffer::from([0b00010101]); // rows 0, 2, 4 valid
2347
2348        let list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
2349        let list_data = unsafe {
2350            ArrayData::new_unchecked(
2351                list_type.clone(),
2352                5,
2353                None,
2354                Some(null_bitmap),
2355                0,
2356                vec![offsets],
2357                vec![values.to_data()],
2358            )
2359        };
2360        let list_array = arrow_array::make_array(list_data);
2361
2362        let schema = Arc::new(Schema::new(vec![Field::new("col", list_type, true)]));
2363        let batch = RecordBatch::try_new(schema, vec![list_array]).unwrap();
2364
2365        let buf = write_with_cdc_options(
2366            &[&batch],
2367            CDC_MIN_CHUNK_SIZE,
2368            CDC_MAX_CHUNK_SIZE,
2369            None,
2370            true,
2371        );
2372        let read = concat_batches(read_batches(&buf));
2373        let read_list = read.column(0).as_list::<i32>();
2374        assert_eq!(read_list.len(), 5);
2375        assert!(read_list.is_valid(0));
2376        assert!(read_list.is_null(1));
2377        assert!(read_list.is_valid(2));
2378        assert!(read_list.is_null(3));
2379        assert!(read_list.is_valid(4));
2380
2381        let get_vals = |i: usize| -> Vec<i32> {
2382            read_list
2383                .value(i)
2384                .as_primitive::<arrow_array::types::Int32Type>()
2385                .values()
2386                .iter()
2387                .copied()
2388                .collect()
2389        };
2390        assert_eq!(get_vals(0), vec![1, 2]);
2391        assert_eq!(get_vals(2), vec![6, 7]);
2392        assert_eq!(get_vals(4), vec![10]);
2393    }
2394}