parquet/column/writer/byte_budget_chunker.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! See [`ByteBudgetChunker`] for byte-budget-aware mini-batch sizing.
19
20use crate::basic::Type;
21use crate::column::writer::LevelDataRef;
22use crate::column::writer::encoder::ColumnValueEncoder;
23use crate::file::properties::WriterProperties;
24use crate::schema::types::ColumnDescriptor;
25
26/// Picks byte-budget-aware mini-batch sizes for one column.
27///
28/// The parquet column writer checks the data page byte limit only *after*
29/// each mini-batch finishes writing. Mini-batches are sized in rows
30/// (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose
31/// values are large (e.g. multi-MiB blobs) a single mini-batch can buffer
32/// GiB into one page before the limit is consulted.
33///
34/// This isolates the per-chunk decision that prevents that: given a chunk's
35/// level data and the input values, pick the largest `sub_batch_size` such
36/// that one mini-batch fits in one page byte budget. For the overwhelmingly
37/// common case (small or fixed-width values) the answer is just `chunk_size`
38/// and the decision is O(1) on the column type — only when the input might
39/// overflow does the chunker consult the encoder's byte estimate.
40pub(crate) struct ByteBudgetChunker {
41 /// Configured data page byte limit for the column.
42 page_byte_limit: usize,
43 /// Max definition level of the column; a level equal to this marks a
44 /// present (non-null) leaf value. Used to count values per chunk.
45 max_def_level: i16,
46 /// `true` when no chunk of `base_batch_size` values can ever overflow
47 /// `page_byte_limit` regardless of input. Set once at column open from
48 /// the physical type's known per-value byte size; lets the per-chunk
49 /// decision short-circuit with no work for every numeric, bool, or
50 /// narrow `FIXED_LEN_BYTE_ARRAY` column.
51 static_always_fits: bool,
52 /// Configured dictionary page byte limit for the column.
53 dict_page_byte_limit: usize,
54 /// As [`Self::static_always_fits`] but for the dictionary page: `true`
55 /// when one `base_batch_size` mini-batch of this fixed-width type cannot
56 /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth.
57 static_dict_always_fits: bool,
58}
59
60impl ByteBudgetChunker {
61 #[inline]
62 pub(crate) fn new(
63 descr: &ColumnDescriptor,
64 props: &WriterProperties,
65 base_batch_size: usize,
66 ) -> Self {
67 let page_byte_limit = props.column_data_page_size_limit(descr.path());
68 let dict_page_byte_limit = props.column_dictionary_page_size_limit(descr.path());
69 let static_bytes_per_value = match descr.physical_type() {
70 Type::BOOLEAN => Some(1),
71 Type::INT32 | Type::FLOAT => Some(std::mem::size_of::<i32>()),
72 Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::<i64>()),
73 Type::INT96 => Some(12),
74 Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize),
75 Type::BYTE_ARRAY => None,
76 };
77 let static_fits = |limit: usize| {
78 static_bytes_per_value
79 .map(|b| b.saturating_mul(base_batch_size) <= limit)
80 .unwrap_or(false)
81 };
82 Self {
83 page_byte_limit,
84 max_def_level: descr.max_def_level(),
85 static_always_fits: static_fits(page_byte_limit),
86 dict_page_byte_limit,
87 static_dict_always_fits: static_fits(dict_page_byte_limit),
88 }
89 }
90
91 /// Decide how many levels at the start of a chunk belong in one
92 /// mini-batch, so the mini-batch cannot overflow whichever page is
93 /// currently accumulating value bytes: the data page when plain-encoding,
94 /// or the *dictionary* page while dictionary-encoding. A returned value
95 /// smaller than `chunk_size` triggers granular sub-batching in
96 /// `write_batch_internal`.
97 ///
98 /// While dictionary-encoding, the data page holds only small RLE indices,
99 /// but the dictionary page accumulates the distinct values themselves —
100 /// so it is the dictionary page's remaining budget that must bound the
101 /// mini-batch. The per-mini-batch dictionary spill check would otherwise
102 /// let one mini-batch of large values balloon the dictionary page.
103 ///
104 /// Returns `chunk_size` immediately (no value inspection) when the chunk
105 /// is empty, or when the column is a fixed-width type whose mini-batches
106 /// statically cannot overshoot the relevant page.
107 ///
108 /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte
109 /// inspection lives in the out-of-line `byte_budget_sub_batch_size`.
110 #[inline]
111 pub(crate) fn pick_sub_batch_size<E: ColumnValueEncoder>(
112 &self,
113 encoder: &E,
114 values: &E::Values,
115 value_indices: Option<&[usize]>,
116 chunk_def: LevelDataRef<'_>,
117 values_offset: usize,
118 chunk_size: usize,
119 ) -> usize {
120 if chunk_size == 0 {
121 return chunk_size;
122 }
123 let budget = if encoder.has_dictionary() {
124 if self.static_dict_always_fits {
125 return chunk_size;
126 }
127 // Bound the mini-batch by the dictionary page's *remaining*
128 // budget (it accumulates across mini-batches until it spills).
129 match encoder.estimated_dict_page_size() {
130 Some(used) => self.dict_page_byte_limit.saturating_sub(used),
131 None => return chunk_size,
132 }
133 } else {
134 if self.static_always_fits {
135 return chunk_size;
136 }
137 self.page_byte_limit
138 };
139 self.byte_budget_sub_batch_size::<E>(
140 values,
141 value_indices,
142 chunk_def,
143 values_offset,
144 chunk_size,
145 budget,
146 )
147 }
148
149 /// Inspect value sizes to decide how many of the chunk's values fit in
150 /// `budget` bytes (the data page or dictionary page remaining budget).
151 ///
152 /// `#[inline(never)]` keeps this slow path out of the hot
153 /// `write_batch_internal` loop; numeric and bool columns never reach it.
154 #[inline(never)]
155 fn byte_budget_sub_batch_size<E: ColumnValueEncoder>(
156 &self,
157 values: &E::Values,
158 value_indices: Option<&[usize]>,
159 chunk_def: LevelDataRef<'_>,
160 values_offset: usize,
161 chunk_size: usize,
162 budget: usize,
163 ) -> usize {
164 // How many of this chunk's levels carry an actual value. For a
165 // non-nullable, unrepeated column every level is a value, so
166 // `value_count` is O(1) (`Absent`/`Uniform` def levels); only
167 // nullable or nested columns pay the O(chunk_size) def-level scan.
168 let vals_in_chunk = chunk_def.value_count(chunk_size, self.max_def_level);
169 if vals_in_chunk == 0 {
170 return chunk_size;
171 }
172 // Ask the encoder how many of the next values fit in one page byte
173 // budget. Dispatch on whether the caller supplied gather indices;
174 // this mirrors how `write_mini_batch` picks `write_gather` vs
175 // `write`.
176 let fit = match value_indices {
177 Some(idx) => {
178 let end = (values_offset + vals_in_chunk).min(idx.len());
179 let start = values_offset.min(end);
180 E::count_values_within_byte_budget_gather(values, &idx[start..end], budget)
181 }
182 None => {
183 E::count_values_within_byte_budget(values, values_offset, vals_in_chunk, budget)
184 }
185 };
186 match fit {
187 None => chunk_size,
188 Some(values_per_subbatch) => {
189 // Convert the value count back into a level count. For a
190 // non-nullable column this is a no-op; for nullable/nested
191 // columns scale by the chunk's observed value-to-level
192 // ratio.
193 let levels_per_subbatch = if vals_in_chunk == chunk_size {
194 values_per_subbatch
195 } else {
196 (values_per_subbatch * chunk_size)
197 .div_ceil(vals_in_chunk)
198 .max(1)
199 };
200 chunk_size.min(levels_per_subbatch.max(1))
201 }
202 }
203 }
204}