Skip to main content

ContentDefinedChunker

Struct ContentDefinedChunker 

Source
pub(crate) struct ContentDefinedChunker {
    max_def_level: i16,
    max_rep_level: i16,
    repeated_ancestor_def_level: i16,
    min_chunk_size: i64,
    max_chunk_size: i64,
    rolling_hash_mask: u64,
    rolling_hash: u64,
    has_matched: bool,
    nth_run: usize,
    chunk_size: i64,
}
Expand description

CDC (Content-Defined Chunking) divides data into variable-sized chunks based on content rather than fixed-size boundaries.

For example, given this sequence of values in a column:

File1:    [1,2,3,   4,5,6,   7,8,9]
           chunk1   chunk2   chunk3

If a value is inserted between 3 and 4:

File2:    [1,2,3,0,   4,5,6,   7,8,9]
           new-chunk  chunk2   chunk3

The chunking process adjusts to maintain stable boundaries across data modifications. Each chunk defines a new parquet data page which is contiguously written to the file. Since each page is compressed independently, the files’ contents look like:

File1:    [Page1][Page2][Page3]...
File2:    [Page4][Page2][Page3]...

When uploaded to a content-addressable storage (CAS) system, the CAS splits the byte stream into content-defined blobs with unique identifiers. Identical blobs are stored only once, so Page2 and Page3 are deduplicated across File1 and File2.

§Implementation

Only the parquet writer needs to be aware of content-defined chunking; the reader is unaffected. Each parquet column writer holds a ContentDefinedChunker instance depending on the writer’s properties. The chunker’s state is maintained across the entire column without being reset between pages and row groups.

This implements a FastCDC-inspired algorithm using gear hashing. The input data is fed byte-by-byte into a rolling hash; when the hash matches a predefined mask, a new chunk boundary candidate is recorded. To reduce the exponential variance of chunk sizes inherent in a single gear hash, the algorithm requires 8 consecutive mask matches — each against a different pre-computed gear hash table — before committing to a boundary. This central-limit-theorem normalization makes the chunk size distribution approximately normal between min_chunk_size and max_chunk_size.

The chunker receives the record-shredded column data (def_levels, rep_levels, values) and iterates over the (def_level, rep_level, value) triplets while adjusting the column-global rolling hash. Whenever the rolling hash matches, the chunker creates a new chunk. For nested data (lists, maps, structs) chunk boundaries are restricted to top-level record boundaries (rep_level == 0) so that a nested row is never split across chunks.

Note that boundaries are deterministically calculated exclusively based on the data itself, so the same data always produces the same chunks given the same configuration.

Ported from the C++ implementation in apache/arrow#45360 (cpp/src/parquet/chunker_internal.cc).

Fields§

§max_def_level: i16

Maximum definition level for this column.

§max_rep_level: i16

Maximum repetition level for this column.

§repeated_ancestor_def_level: i16

Definition level at the nearest REPEATED ancestor.

§min_chunk_size: i64

Minimum chunk size in bytes. The rolling hash will not be updated until this size is reached for each chunk. All data sent through the hash function counts towards the chunk size, including definition and repetition levels if present.

§max_chunk_size: i64

Maximum chunk size in bytes. A new chunk is created whenever the chunk size exceeds this value. The chunk size distribution approximates a normal distribution between min_chunk_size and max_chunk_size. Note that the parquet writer has a related data_pagesize property that controls the maximum size of a parquet data page after encoding. While setting data_pagesize smaller than max_chunk_size doesn’t affect chunking effectiveness, it results in more small parquet data pages.

§rolling_hash_mask: u64

Mask for matching against the rolling hash.

§rolling_hash: u64

Rolling hash state, never reset — initialized once for the entire column.

§has_matched: bool

Whether the rolling hash has matched the mask since the last chunk boundary.

§nth_run: usize

Current run count for the central-limit-theorem normalization.

§chunk_size: i64

Current chunk size in bytes.

Implementations§

Source§

impl ContentDefinedChunker

Source

pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self>

Source

fn calculate_mask( min_chunk_size: i64, max_chunk_size: i64, norm_level: i32, ) -> Result<u64>

Calculate the mask used to determine chunk boundaries from the rolling hash.

The mask is calculated so that the expected chunk size distribution approximates a normal distribution between min and max chunk sizes.

Source

fn roll(&mut self, bytes: &[u8])

Feed raw bytes into the rolling hash.

The byte count always accumulates toward chunk_size, but the actual hash update is skipped until min_chunk_size has been reached. This “skip window” is the FastCDC optimization that prevents boundaries from appearing too early in a chunk.

Source

fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N])

Feed exactly N bytes into the rolling hash (compile-time width).

Like roll, but the byte count is known at compile time, allowing the compiler to unroll the inner loop.

Source

fn roll_level(&mut self, level: i16)

Feed a definition or repetition level (i16) into the rolling hash.

Source

fn need_new_chunk(&mut self) -> bool

Check whether a new chunk boundary should be created.

A boundary is created when either of two conditions holds:

  1. CLT normalization: The rolling hash has matched the mask (has_matched) and this is the 8th consecutive such match (nth_run reaches NUM_GEARHASH_TABLES). Each match advances to the next gear hash table, so 8 independent matches are required. A single hash table would yield exponentially distributed chunk sizes; requiring 8 independent matches approximates a normal (Gaussian) distribution by the central limit theorem.

  2. Hard size limit: chunk_size has reached max_chunk_size. This caps chunk size even if the CLT normalization sequence has not completed.

Note: when max_chunk_size forces a boundary, nth_run is not reset, so the CLT sequence continues from where it left off in the next chunk. This matches the C++ behavior.

Source

fn calculate<F>( &mut self, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, num_levels: usize, roll_value: F, ) -> Vec<CdcChunk>
where F: FnMut(&mut Self, usize),

Compute chunk boundaries for the given column data.

The chunking state is maintained across the entire column without being reset between pages and row groups. This enables the chunking process to be continued between different write calls.

We go over the (def_level, rep_level, value) triplets one by one while adjusting the column-global rolling hash based on the triplet. Whenever the rolling hash matches a predefined mask it sets has_matched to true.

After each triplet need_new_chunk is called to evaluate if we need to create a new chunk.

Source

pub(crate) fn get_arrow_chunks( &mut self, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, array: &dyn Array, ) -> Result<Vec<CdcChunk>>

Compute CDC chunk boundaries by dispatching on the Arrow array’s data type to feed value bytes into the rolling hash.

Source

fn validate_chunks( &self, chunks: &[CdcChunk], num_levels: usize, total_values: usize, )

Trait Implementations§

Source§

impl Debug for ContentDefinedChunker

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,

§

impl<T> Ungil for T
where T: Send,