pub(crate) struct ContentDefinedChunker {
max_def_level: i16,
max_rep_level: i16,
repeated_ancestor_def_level: i16,
min_chunk_size: i64,
max_chunk_size: i64,
rolling_hash_mask: u64,
rolling_hash: u64,
has_matched: bool,
nth_run: usize,
chunk_size: i64,
}Expand description
CDC (Content-Defined Chunking) divides data into variable-sized chunks based on content rather than fixed-size boundaries.
For example, given this sequence of values in a column:
File1: [1,2,3, 4,5,6, 7,8,9]
chunk1 chunk2 chunk3If a value is inserted between 3 and 4:
File2: [1,2,3,0, 4,5,6, 7,8,9]
new-chunk chunk2 chunk3The chunking process adjusts to maintain stable boundaries across data modifications. Each chunk defines a new parquet data page which is contiguously written to the file. Since each page is compressed independently, the files’ contents look like:
File1: [Page1][Page2][Page3]...
File2: [Page4][Page2][Page3]...When uploaded to a content-addressable storage (CAS) system, the CAS splits the byte stream into content-defined blobs with unique identifiers. Identical blobs are stored only once, so Page2 and Page3 are deduplicated across File1 and File2.
§Implementation
Only the parquet writer needs to be aware of content-defined chunking; the reader is
unaffected. Each parquet column writer holds a ContentDefinedChunker instance
depending on the writer’s properties. The chunker’s state is maintained across the
entire column without being reset between pages and row groups.
This implements a FastCDC-inspired algorithm using gear hashing. The input data is
fed byte-by-byte into a rolling hash; when the hash matches a predefined mask, a new
chunk boundary candidate is recorded. To reduce the exponential variance of chunk
sizes inherent in a single gear hash, the algorithm requires 8 consecutive mask
matches — each against a different pre-computed gear hash table — before committing
to a boundary. This central-limit-theorem normalization makes the chunk size
distribution approximately normal between min_chunk_size and max_chunk_size.
The chunker receives the record-shredded column data (def_levels, rep_levels, values)
and iterates over the (def_level, rep_level, value) triplets while adjusting the
column-global rolling hash. Whenever the rolling hash matches, the chunker creates a
new chunk. For nested data (lists, maps, structs) chunk boundaries are restricted to
top-level record boundaries (rep_level == 0) so that a nested row is never split
across chunks.
Note that boundaries are deterministically calculated exclusively based on the data itself, so the same data always produces the same chunks given the same configuration.
Ported from the C++ implementation in apache/arrow#45360
(cpp/src/parquet/chunker_internal.cc).
Fields§
§max_def_level: i16Maximum definition level for this column.
max_rep_level: i16Maximum repetition level for this column.
repeated_ancestor_def_level: i16Definition level at the nearest REPEATED ancestor.
min_chunk_size: i64Minimum chunk size in bytes. The rolling hash will not be updated until this size is reached for each chunk. All data sent through the hash function counts towards the chunk size, including definition and repetition levels if present.
max_chunk_size: i64Maximum chunk size in bytes.
A new chunk is created whenever the chunk size exceeds this value. The chunk size
distribution approximates a normal distribution between min_chunk_size and
max_chunk_size. Note that the parquet writer has a related data_pagesize
property that controls the maximum size of a parquet data page after encoding.
While setting data_pagesize smaller than max_chunk_size doesn’t affect
chunking effectiveness, it results in more small parquet data pages.
rolling_hash_mask: u64Mask for matching against the rolling hash.
rolling_hash: u64Rolling hash state, never reset — initialized once for the entire column.
has_matched: boolWhether the rolling hash has matched the mask since the last chunk boundary.
nth_run: usizeCurrent run count for the central-limit-theorem normalization.
chunk_size: i64Current chunk size in bytes.
Implementations§
Source§impl ContentDefinedChunker
impl ContentDefinedChunker
pub fn new(desc: &ColumnDescriptor, options: &CdcOptions) -> Result<Self>
Sourcefn calculate_mask(
min_chunk_size: i64,
max_chunk_size: i64,
norm_level: i32,
) -> Result<u64>
fn calculate_mask( min_chunk_size: i64, max_chunk_size: i64, norm_level: i32, ) -> Result<u64>
Calculate the mask used to determine chunk boundaries from the rolling hash.
The mask is calculated so that the expected chunk size distribution approximates a normal distribution between min and max chunk sizes.
Sourcefn roll(&mut self, bytes: &[u8])
fn roll(&mut self, bytes: &[u8])
Feed raw bytes into the rolling hash.
The byte count always accumulates toward chunk_size, but the actual hash
update is skipped until min_chunk_size has been reached. This “skip window”
is the FastCDC optimization that prevents boundaries from appearing too early
in a chunk.
Sourcefn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N])
fn roll_fixed<const N: usize>(&mut self, bytes: &[u8; N])
Feed exactly N bytes into the rolling hash (compile-time width).
Like roll, but the byte count is known at compile time,
allowing the compiler to unroll the inner loop.
Sourcefn roll_level(&mut self, level: i16)
fn roll_level(&mut self, level: i16)
Feed a definition or repetition level (i16) into the rolling hash.
Sourcefn need_new_chunk(&mut self) -> bool
fn need_new_chunk(&mut self) -> bool
Check whether a new chunk boundary should be created.
A boundary is created when either of two conditions holds:
-
CLT normalization: The rolling hash has matched the mask (
has_matched) and this is the 8th consecutive such match (nth_runreachesNUM_GEARHASH_TABLES). Each match advances to the next gear hash table, so 8 independent matches are required. A single hash table would yield exponentially distributed chunk sizes; requiring 8 independent matches approximates a normal (Gaussian) distribution by the central limit theorem. -
Hard size limit:
chunk_sizehas reachedmax_chunk_size. This caps chunk size even if the CLT normalization sequence has not completed.
Note: when max_chunk_size forces a boundary, nth_run is not reset, so
the CLT sequence continues from where it left off in the next chunk. This
matches the C++ behavior.
Sourcefn calculate<F>(
&mut self,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
num_levels: usize,
roll_value: F,
) -> Vec<CdcChunk>
fn calculate<F>( &mut self, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, num_levels: usize, roll_value: F, ) -> Vec<CdcChunk>
Compute chunk boundaries for the given column data.
The chunking state is maintained across the entire column without being reset between pages and row groups. This enables the chunking process to be continued between different write calls.
We go over the (def_level, rep_level, value) triplets one by one while
adjusting the column-global rolling hash based on the triplet. Whenever
the rolling hash matches a predefined mask it sets has_matched to true.
After each triplet need_new_chunk is called to
evaluate if we need to create a new chunk.
Sourcepub(crate) fn get_arrow_chunks(
&mut self,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
array: &dyn Array,
) -> Result<Vec<CdcChunk>>
pub(crate) fn get_arrow_chunks( &mut self, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, array: &dyn Array, ) -> Result<Vec<CdcChunk>>
Compute CDC chunk boundaries by dispatching on the Arrow array’s data type to feed value bytes into the rolling hash.