Skip to main content

parquet/column/writer/
byte_budget_chunker.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! See [`ByteBudgetChunker`] for byte-budget-aware mini-batch sizing.
19
20use crate::basic::Type;
21use crate::column::writer::LevelDataRef;
22use crate::column::writer::encoder::ColumnValueEncoder;
23use crate::file::properties::WriterProperties;
24use crate::schema::types::ColumnDescriptor;
25
26/// Picks byte-budget-aware mini-batch sizes for one column.
27///
28/// The parquet column writer checks the data page byte limit only *after*
29/// each mini-batch finishes writing. Mini-batches are sized in rows
30/// (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose
31/// values are large (e.g. multi-MiB blobs) a single mini-batch can buffer
32/// GiB into one page before the limit is consulted.
33///
34/// This isolates the per-chunk decision that prevents that: given a chunk's
35/// level data and the input values, pick the largest `sub_batch_size` such
36/// that one mini-batch fits in one page byte budget. For the overwhelmingly
37/// common case (small or fixed-width values) the answer is just `chunk_size`
38/// and the decision is O(1) on the column type — only when the input might
39/// overflow does the chunker consult the encoder's byte estimate.
40pub(crate) struct ByteBudgetChunker {
41    /// Configured data page byte limit for the column.
42    page_byte_limit: usize,
43    /// Max definition level of the column; a level equal to this marks a
44    /// present (non-null) leaf value. Used to count values per chunk.
45    max_def_level: i16,
46    /// `true` when no chunk of `base_batch_size` values can ever overflow
47    /// `page_byte_limit` regardless of input. Set once at column open from
48    /// the physical type's known per-value byte size; lets the per-chunk
49    /// decision short-circuit with no work for every numeric, bool, or
50    /// narrow `FIXED_LEN_BYTE_ARRAY` column.
51    static_always_fits: bool,
52    /// Configured dictionary page byte limit for the column.
53    dict_page_byte_limit: usize,
54    /// As [`Self::static_always_fits`] but for the dictionary page: `true`
55    /// when one `base_batch_size` mini-batch of this fixed-width type cannot
56    /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth.
57    static_dict_always_fits: bool,
58}
59
60impl ByteBudgetChunker {
61    #[inline]
62    pub(crate) fn new(
63        descr: &ColumnDescriptor,
64        props: &WriterProperties,
65        base_batch_size: usize,
66    ) -> Self {
67        let page_byte_limit = props.column_data_page_size_limit(descr.path());
68        let dict_page_byte_limit = props.column_dictionary_page_size_limit(descr.path());
69        let static_bytes_per_value = match descr.physical_type() {
70            Type::BOOLEAN => Some(1),
71            Type::INT32 | Type::FLOAT => Some(std::mem::size_of::<i32>()),
72            Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::<i64>()),
73            Type::INT96 => Some(12),
74            Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize),
75            Type::BYTE_ARRAY => None,
76        };
77        let static_fits = |limit: usize| {
78            static_bytes_per_value
79                .map(|b| b.saturating_mul(base_batch_size) <= limit)
80                .unwrap_or(false)
81        };
82        Self {
83            page_byte_limit,
84            max_def_level: descr.max_def_level(),
85            static_always_fits: static_fits(page_byte_limit),
86            dict_page_byte_limit,
87            static_dict_always_fits: static_fits(dict_page_byte_limit),
88        }
89    }
90
91    /// Decide how many levels at the start of a chunk belong in one
92    /// mini-batch, so the mini-batch cannot overflow whichever page is
93    /// currently accumulating value bytes: the data page when plain-encoding,
94    /// or the *dictionary* page while dictionary-encoding. A returned value
95    /// smaller than `chunk_size` triggers granular sub-batching in
96    /// `write_batch_internal`.
97    ///
98    /// While dictionary-encoding, the data page holds only small RLE indices,
99    /// but the dictionary page accumulates the distinct values themselves —
100    /// so it is the dictionary page's remaining budget that must bound the
101    /// mini-batch. The per-mini-batch dictionary spill check would otherwise
102    /// let one mini-batch of large values balloon the dictionary page.
103    ///
104    /// Returns `chunk_size` immediately (no value inspection) when the chunk
105    /// is empty, or when the column is a fixed-width type whose mini-batches
106    /// statically cannot overshoot the relevant page.
107    ///
108    /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte
109    /// inspection lives in the out-of-line `byte_budget_sub_batch_size`.
110    #[inline]
111    pub(crate) fn pick_sub_batch_size<E: ColumnValueEncoder>(
112        &self,
113        encoder: &E,
114        values: &E::Values,
115        value_indices: Option<&[usize]>,
116        chunk_def: LevelDataRef<'_>,
117        values_offset: usize,
118        chunk_size: usize,
119    ) -> usize {
120        if chunk_size == 0 {
121            return chunk_size;
122        }
123        let budget = if encoder.has_dictionary() {
124            if self.static_dict_always_fits {
125                return chunk_size;
126            }
127            // Bound the mini-batch by the dictionary page's *remaining*
128            // budget (it accumulates across mini-batches until it spills).
129            match encoder.estimated_dict_page_size() {
130                Some(used) => self.dict_page_byte_limit.saturating_sub(used),
131                None => return chunk_size,
132            }
133        } else {
134            if self.static_always_fits {
135                return chunk_size;
136            }
137            self.page_byte_limit
138        };
139        self.byte_budget_sub_batch_size::<E>(
140            values,
141            value_indices,
142            chunk_def,
143            values_offset,
144            chunk_size,
145            budget,
146        )
147    }
148
149    /// Inspect value sizes to decide how many of the chunk's values fit in
150    /// `budget` bytes (the data page or dictionary page remaining budget).
151    ///
152    /// `#[inline(never)]` keeps this slow path out of the hot
153    /// `write_batch_internal` loop; numeric and bool columns never reach it.
154    #[inline(never)]
155    fn byte_budget_sub_batch_size<E: ColumnValueEncoder>(
156        &self,
157        values: &E::Values,
158        value_indices: Option<&[usize]>,
159        chunk_def: LevelDataRef<'_>,
160        values_offset: usize,
161        chunk_size: usize,
162        budget: usize,
163    ) -> usize {
164        // How many of this chunk's levels carry an actual value. For a
165        // non-nullable, unrepeated column every level is a value, so
166        // `value_count` is O(1) (`Absent`/`Uniform` def levels); only
167        // nullable or nested columns pay the O(chunk_size) def-level scan.
168        let vals_in_chunk = chunk_def.value_count(chunk_size, self.max_def_level);
169        if vals_in_chunk == 0 {
170            return chunk_size;
171        }
172        // Ask the encoder how many of the next values fit in one page byte
173        // budget. Dispatch on whether the caller supplied gather indices;
174        // this mirrors how `write_mini_batch` picks `write_gather` vs
175        // `write`.
176        let fit = match value_indices {
177            Some(idx) => {
178                let end = (values_offset + vals_in_chunk).min(idx.len());
179                let start = values_offset.min(end);
180                E::count_values_within_byte_budget_gather(values, &idx[start..end], budget)
181            }
182            None => {
183                E::count_values_within_byte_budget(values, values_offset, vals_in_chunk, budget)
184            }
185        };
186        match fit {
187            None => chunk_size,
188            Some(values_per_subbatch) => {
189                // Convert the value count back into a level count. For a
190                // non-nullable column this is a no-op; for nullable/nested
191                // columns scale by the chunk's observed value-to-level
192                // ratio.
193                let levels_per_subbatch = if vals_in_chunk == chunk_size {
194                    values_per_subbatch
195                } else {
196                    (values_per_subbatch * chunk_size)
197                        .div_ceil(vals_in_chunk)
198                        .max(1)
199                };
200                chunk_size.min(levels_per_subbatch.max(1))
201            }
202        }
203    }
204}