parquet/arrow/push_decoder/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30pub use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::{RowBudget, RowGroupReaderBuilder, RowGroupReaderBuilderParts};
34use remaining::{RemainingRowGroups, RemainingRowGroupsParts};
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// # let mut buffer = vec![];
64/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// # writer.write(&batch).unwrap();
67/// # writer.close().unwrap();
68/// # Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// # let start = range.start as usize;
73/// # let end = range.end as usize;
74/// # file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84/// .unwrap()
85/// // Optionally configure the decoder, e.g. batch size
86/// .with_batch_size(1024)
87/// // Build the decoder
88/// .build()
89/// .unwrap();
90///
91/// // In a loop, ask the decoder what it needs next, and provide it with the required data
92/// loop {
93/// match decoder.try_decode().unwrap() {
94/// DecodeResult::NeedsData(ranges) => {
95/// // The decoder needs more data. Fetch the data for the given ranges
96/// let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97/// // Push the data to the decoder
98/// decoder.push_ranges(ranges, data).unwrap();
99/// // After pushing the data, we can try to decode again on the next iteration
100/// }
101/// DecodeResult::Data(batch) => {
102/// // Successfully decoded a batch of data
103/// assert!(batch.num_rows() > 0);
104/// }
105/// DecodeResult::Finished => {
106/// // The decoder has finished decoding exit the loop
107/// break;
108/// }
109/// }
110/// }
111/// ```
112///
113/// # Adaptive scans
114///
115/// The scan strategy is not fixed once [`build`](Self::build) is called: it
116/// can be changed *while decoding*, at row-group boundaries.
117///
118/// The important API for this is [`ParquetPushDecoder::try_next_reader`].
119/// Unlike [`try_decode`](ParquetPushDecoder::try_decode), which barrels
120/// straight through row-group boundaries, `try_next_reader` returns once per
121/// row group — leaving a clean window *between* row groups. At any such
122/// boundary, [`ParquetPushDecoder::into_builder`] hands back a
123/// `ParquetPushDecoderBuilder` for the row groups not yet decoded. Change any
124/// option on it (projection, row filter, row selection policy, …) and
125/// [`build`](Self::build) a fresh decoder that resumes from the next row
126/// group. This is how a query engine promotes or demotes filters — for
127/// example turning a row filter on or off — based on the selectivity observed
128/// in the row groups decoded so far.
129///
130/// ```
131/// # use std::ops::Range;
132/// # use std::sync::Arc;
133/// # use bytes::Bytes;
134/// # use arrow_array::record_batch;
135/// # use parquet::DecodeResult;
136/// # use parquet::arrow::ProjectionMask;
137/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
138/// # use parquet::arrow::ArrowWriter;
139/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
140/// # use parquet::file::properties::WriterProperties;
141/// # let file_bytes = {
142/// # let batch = record_batch!(
143/// # ("a", Int32, [1, 2, 3, 4, 5, 6]),
144/// # ("b", Int32, [6, 5, 4, 3, 2, 1])
145/// # ).unwrap();
146/// # // Small row groups so the test file has two of them.
147/// # let props = WriterProperties::builder().set_max_row_group_row_count(Some(3)).build();
148/// # let mut buffer = vec![];
149/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
150/// # writer.write(&batch).unwrap();
151/// # writer.close().unwrap();
152/// # Bytes::from(buffer)
153/// # };
154/// # let get_range = |r: &Range<u64>| file_bytes.slice(r.start as usize..r.end as usize);
155/// # let file_length = file_bytes.len() as u64;
156/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
157/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
158/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!() };
159/// # let parquet_metadata = Arc::new(parquet_metadata);
160/// let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
161/// .unwrap()
162/// .build()
163/// .unwrap();
164///
165/// // Drive the decoder one row group at a time with `try_next_reader`.
166/// loop {
167/// match decoder.try_next_reader().unwrap() {
168/// DecodeResult::NeedsData(ranges) => {
169/// // Fetch and hand over the bytes the decoder asked for.
170/// let data = ranges.iter().map(|r| get_range(r)).collect();
171/// decoder.push_ranges(ranges, data).unwrap();
172/// }
173/// DecodeResult::Data(reader) => {
174/// // Decode this row group's batches.
175/// for batch in reader {
176/// assert!(batch.unwrap().num_rows() > 0);
177/// }
178/// // We are now at a row-group boundary. Based on whatever stats
179/// // were gathered, optionally change strategy for the row groups
180/// // still to come: drop or promote a row filter, narrow or widen
181/// // the projection, etc.
182/// if decoder.is_at_row_group_boundary() && decoder.row_groups_remaining() > 0 {
183/// let builder = decoder.into_builder().unwrap();
184/// // e.g. column "b" turned out not to be needed.
185/// let projection = ProjectionMask::columns(builder.parquet_schema(), ["a"]);
186/// decoder = builder.with_projection(projection).build().unwrap();
187/// }
188/// }
189/// DecodeResult::Finished => break,
190/// }
191/// }
192/// ```
193pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<PushDecoderInput>;
194
195/// The `input` of a [`ParquetPushDecoderBuilder`].
196///
197/// The shared [`ArrowReaderBuilder`] is generic over an `input`. The sync and
198/// async builders read from a file or async reader; the push decoder has no
199/// reader, so its input is the [`PushBuffers`] that caller-pushed bytes
200/// accumulate in (empty for a fresh builder).
201#[derive(Debug, Default)]
202pub struct PushDecoderInput {
203 /// Bytes pushed into the decoder, awaiting decode.
204 buffers: PushBuffers,
205}
206
207/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
208/// more options that can be configured.
209impl ParquetPushDecoderBuilder {
210 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
211 ///
212 /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
213 ///
214 /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
215 ///
216 /// See example on [`ParquetPushDecoderBuilder`]
217 pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
218 Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
219 }
220
221 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
222 /// with the given reader options.
223 ///
224 /// This is similar to [`Self::try_new_decoder`] but allows configuring
225 /// options such as Arrow schema
226 pub fn try_new_decoder_with_options(
227 parquet_metadata: Arc<ParquetMetaData>,
228 arrow_reader_options: ArrowReaderOptions,
229 ) -> Result<Self, ParquetError> {
230 let arrow_reader_metadata =
231 ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
232 Ok(Self::new_with_metadata(arrow_reader_metadata))
233 }
234
235 /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
236 ///
237 /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
238 /// the Parquet metadata and reader options.
239 pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
240 Self::new_builder(PushDecoderInput::default(), arrow_reader_metadata)
241 }
242
243 /// Provide a preexisting [`PushBuffers`] for the built decoder to read
244 /// from, so bytes already fetched are not requested again.
245 pub fn with_buffers(self, buffers: PushBuffers) -> Self {
246 Self {
247 input: PushDecoderInput { buffers },
248 ..self
249 }
250 }
251
252 /// Create a [`ParquetPushDecoder`] with the configured options
253 pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
254 let Self {
255 input: PushDecoderInput { buffers },
256 metadata: parquet_metadata,
257 schema,
258 fields,
259 batch_size,
260 row_groups,
261 projection,
262 filter,
263 selection,
264 limit,
265 offset,
266 metrics,
267 row_selection_policy,
268 max_predicate_cache_size,
269 } = self;
270
271 // If no row groups were specified, read all of them
272 let row_groups =
273 row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
274 let has_predicates = filter
275 .as_ref()
276 .is_some_and(|filter| !filter.predicates.is_empty());
277
278 // Prepare to build RowGroup readers. `buffers` carries any bytes the
279 // caller already pushed (preserved across `into_builder`); a fresh
280 // builder supplies an empty `PushBuffers`.
281 let row_group_reader_builder = RowGroupReaderBuilder::new(
282 batch_size,
283 projection,
284 Arc::clone(&parquet_metadata),
285 fields,
286 filter,
287 metrics,
288 max_predicate_cache_size,
289 buffers,
290 row_selection_policy,
291 );
292
293 // Initialize the decoder with the configured options
294 let remaining_row_groups = RemainingRowGroups::new(
295 schema,
296 parquet_metadata,
297 row_groups,
298 selection,
299 RowBudget::new(offset, limit),
300 has_predicates,
301 row_group_reader_builder,
302 );
303
304 Ok(ParquetPushDecoder {
305 state: ParquetDecoderState::ReadingRowGroup {
306 remaining_row_groups: Box::new(remaining_row_groups),
307 },
308 })
309 }
310}
311
312/// Reassemble a [`ParquetPushDecoderBuilder`] from a decoder's not-yet-decoded
313/// state — the inverse of [`ParquetPushDecoderBuilder::build`]. The rebuilt
314/// builder pins the remaining row groups and carries the remaining row
315/// selection, offset/limit budget, and buffered bytes.
316fn builder_from_remaining(parts: RemainingRowGroupsParts) -> ParquetPushDecoderBuilder {
317 let RemainingRowGroupsParts {
318 metadata,
319 schema,
320 row_groups,
321 selection,
322 offset,
323 limit,
324 reader_builder,
325 } = parts;
326 let RowGroupReaderBuilderParts {
327 batch_size,
328 projection,
329 fields,
330 filter,
331 max_predicate_cache_size,
332 metrics,
333 row_selection_policy,
334 buffers,
335 } = reader_builder;
336
337 ArrowReaderBuilder {
338 input: PushDecoderInput::default(),
339 metadata,
340 schema,
341 fields,
342 batch_size,
343 // The frontier tracks remaining row groups explicitly, so the rebuilt
344 // builder always pins them (even if the original left `row_groups` as
345 // `None` meaning "all").
346 row_groups: Some(row_groups),
347 projection,
348 filter,
349 selection,
350 row_selection_policy,
351 limit,
352 offset,
353 metrics,
354 max_predicate_cache_size,
355 }
356 // Carry the decoder's already-fetched bytes across the rebuild so the new
357 // decoder does not re-request them.
358 .with_buffers(buffers)
359}
360
361/// A push based Parquet Decoder
362///
363/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
364///
365/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
366/// underlying reader for performing IO, and thus offers fine grained control
367/// over how data is fetched and decoded.
368///
369/// When more data is needed to make progress, instead of reading data directly
370/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
371/// are needed. Once the caller provides the requested ranges via
372/// [`Self::push_ranges`], they try to decode again by calling
373/// [`Self::try_decode`].
374///
375/// The decoder's internal state tracks what has been already decoded and what
376/// is needed next.
377#[derive(Debug)]
378pub struct ParquetPushDecoder {
379 /// The inner state.
380 ///
381 /// This state is consumed on every transition and a new state is produced
382 /// so the Rust compiler can ensure that the state is always valid and
383 /// transitions are not missed.
384 state: ParquetDecoderState,
385}
386
387impl ParquetPushDecoder {
388 /// Attempt to decode the next batch of data, or return what data is needed
389 ///
390 /// The the decoder communicates the next state with a [`DecodeResult`]
391 ///
392 /// See full example in [`ParquetPushDecoderBuilder`]
393 ///
394 /// ```no_run
395 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
396 /// use parquet::DecodeResult;
397 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
398 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
399 /// let mut decoder = get_decoder();
400 /// loop {
401 /// match decoder.try_decode().unwrap() {
402 /// DecodeResult::NeedsData(ranges) => {
403 /// // The decoder needs more data. Fetch the data for the given ranges
404 /// // call decoder.push_ranges(ranges, data) and call again
405 /// push_data(&mut decoder, ranges);
406 /// }
407 /// DecodeResult::Data(batch) => {
408 /// // Successfully decoded the next batch of data
409 /// println!("Got batch with {} rows", batch.num_rows());
410 /// }
411 /// DecodeResult::Finished => {
412 /// // The decoder has finished decoding all data
413 /// break;
414 /// }
415 /// }
416 /// }
417 ///```
418 pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
419 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
420 let (new_state, decode_result) = current_state.try_next_batch()?;
421 self.state = new_state;
422 Ok(decode_result)
423 }
424
425 /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
426 /// return what data is needed to produce it.
427 ///
428 /// This API can be used to get a reader for decoding the next set of
429 /// RecordBatches while proceeding to begin fetching data for the set (e.g
430 /// row group)
431 ///
432 /// Example
433 /// ```no_run
434 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
435 /// use parquet::DecodeResult;
436 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
437 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
438 /// let mut decoder = get_decoder();
439 /// loop {
440 /// match decoder.try_next_reader().unwrap() {
441 /// DecodeResult::NeedsData(ranges) => {
442 /// // The decoder needs more data. Fetch the data for the given ranges
443 /// // call decoder.push_ranges(ranges, data) and call again
444 /// push_data(&mut decoder, ranges);
445 /// }
446 /// DecodeResult::Data(reader) => {
447 /// // spawn a thread to read the batches in parallel
448 /// // with fetching the next row group / data
449 /// std::thread::spawn(move || {
450 /// for batch in reader {
451 /// let batch = batch.unwrap();
452 /// println!("Got batch with {} rows", batch.num_rows());
453 /// }
454 /// });
455 /// }
456 /// DecodeResult::Finished => {
457 /// // The decoder has finished decoding all data
458 /// break;
459 /// }
460 /// }
461 /// }
462 ///```
463 pub fn try_next_reader(
464 &mut self,
465 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
466 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
467 let (new_state, decode_result) = current_state.try_next_reader()?;
468 self.state = new_state;
469 Ok(decode_result)
470 }
471
472 /// Push data into the decoder for processing
473 ///
474 /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
475 /// single range of data.
476 ///
477 /// Note this can be the entire file or just a part of it. If it is part of the file,
478 /// the ranges should correspond to the data ranges requested by the decoder.
479 ///
480 /// See example in [`ParquetPushDecoderBuilder`]
481 pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
482 self.push_ranges(vec![range], vec![data])
483 }
484
485 /// Push data into the decoder for processing
486 ///
487 /// This should correspond to the data ranges requested by the decoder
488 pub fn push_ranges(
489 &mut self,
490 ranges: Vec<Range<u64>>,
491 data: Vec<Bytes>,
492 ) -> Result<(), ParquetError> {
493 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
494 self.state = current_state.push_data(ranges, data)?;
495 Ok(())
496 }
497
498 /// Returns the total number of buffered bytes in the decoder
499 ///
500 /// This is the sum of the size of all [`Bytes`] that has been pushed to the
501 /// decoder but not yet consumed.
502 ///
503 /// Note that this does not include any overhead of the internal data
504 /// structures and that since [`Bytes`] are ref counted memory, this may not
505 /// reflect additional memory usage.
506 ///
507 /// This can be used to monitor memory usage of the decoder.
508 pub fn buffered_bytes(&self) -> u64 {
509 self.state.buffered_bytes()
510 }
511
512 /// Clear any staged byte ranges currently buffered for future decode work.
513 ///
514 /// This clears byte ranges still owned by the decoder's internal
515 /// `PushBuffers`. It does not affect any data that has already been handed
516 /// off to an active [`ParquetRecordBatchReader`].
517 pub fn clear_all_ranges(&mut self) {
518 self.state.clear_all_ranges();
519 }
520
521 /// True iff the decoder is at a row-group boundary, where
522 /// [`Self::into_builder`] can reconfigure the scan.
523 ///
524 /// A boundary is "between row groups": the previous row group's
525 /// [`ParquetRecordBatchReader`] has been fully extracted (via
526 /// [`Self::try_next_reader`]) or fully drained (via [`Self::try_decode`]),
527 /// and the next row group has not yet been planned. While
528 /// [`Self::try_decode`] is iterating an active row group's reader this
529 /// returns `false`; with [`Self::try_next_reader`] there is a clean
530 /// window between two consecutive returns where this is `true`.
531 pub fn is_at_row_group_boundary(&self) -> bool {
532 self.state.is_at_row_group_boundary()
533 }
534
535 /// Number of row groups left to decode after the one currently in flight.
536 /// Useful as a "should I bother reconfiguring the scan?" signal.
537 pub fn row_groups_remaining(&self) -> usize {
538 self.state.row_groups_remaining()
539 }
540
541 /// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the
542 /// row groups that have *not* yet been decoded.
543 ///
544 /// This is the API for *adaptive* scans. Drive the decoder with
545 /// [`Self::try_next_reader`]; at any row-group boundary, call
546 /// `into_builder` to recover a builder, adjust it with the usual
547 /// [`ParquetPushDecoderBuilder`] setters, and
548 /// [`build`](ParquetPushDecoderBuilder::build) a fresh decoder that resumes
549 /// from the next row group:
550 ///
551 /// ```no_run
552 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
553 /// # use parquet::arrow::arrow_reader::RowFilter;
554 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
555 /// # fn new_filter() -> RowFilter { unimplemented!() }
556 /// let mut decoder = get_decoder();
557 /// // ... drive `decoder.try_next_reader()` for a few row groups ...
558 /// if decoder.is_at_row_group_boundary() && decoder.row_groups_remaining() > 0 {
559 /// decoder = decoder
560 /// .into_builder()
561 /// .unwrap()
562 /// // any builder option can be changed here, e.g. promote a
563 /// // filter into a row filter based on observed selectivity
564 /// .with_row_filter(new_filter())
565 /// .build()
566 /// .unwrap();
567 /// }
568 /// ```
569 ///
570 /// The returned builder pins the not-yet-decoded row groups (via
571 /// [`with_row_groups`](ArrowReaderBuilder::with_row_groups)) and carries the
572 /// not-yet-consumed row selection and offset/limit budget, so rows from
573 /// already-decoded row groups are not produced again. Every other option —
574 /// projection, row filter, row selection policy, batch size, metrics,
575 /// predicate-cache size — is left exactly as the decoder had it and can be
576 /// overridden before [`build`](ParquetPushDecoderBuilder::build).
577 ///
578 /// # Errors
579 ///
580 /// Returns `Err(ParquetError::General)` when the decoder is not at a
581 /// row-group boundary (check [`Self::is_at_row_group_boundary`] first) or
582 /// has already finished. The decoder is consumed either way.
583 ///
584 /// # Buffered bytes
585 ///
586 /// The decoder's buffered bytes are carried across the rebuild: bytes
587 /// already fetched for row groups the new configuration still reads are
588 /// not re-requested. Bytes the new configuration no longer needs stay
589 /// buffered until [`clear_all_ranges`](Self::clear_all_ranges) is called
590 /// or the rebuilt decoder is dropped.
591 pub fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError> {
592 self.state.into_builder()
593 }
594}
595
596/// Internal state machine for the [`ParquetPushDecoder`]
597#[derive(Debug)]
598enum ParquetDecoderState {
599 /// Waiting for data needed to decode the next RowGroup
600 ReadingRowGroup {
601 remaining_row_groups: Box<RemainingRowGroups>,
602 },
603 /// The decoder is actively decoding a RowGroup
604 DecodingRowGroup {
605 /// Current active reader
606 record_batch_reader: Box<ParquetRecordBatchReader>,
607 remaining_row_groups: Box<RemainingRowGroups>,
608 },
609 /// The decoder has finished processing all data
610 Finished,
611}
612
613impl ParquetDecoderState {
614 /// If actively reading a RowGroup, return the currently active
615 /// ParquetRecordBatchReader and advance to the next group.
616 fn try_next_reader(
617 self,
618 ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
619 let mut current_state = self;
620 loop {
621 let (next_state, decode_result) = current_state.transition()?;
622 // if more data is needed to transition, can't proceed further without it
623 match decode_result {
624 DecodeResult::NeedsData(ranges) => {
625 return Ok((next_state, DecodeResult::NeedsData(ranges)));
626 }
627 // act next based on state
628 DecodeResult::Data(()) | DecodeResult::Finished => {}
629 }
630 match next_state {
631 // not ready to read yet, continue transitioning
632 Self::ReadingRowGroup { .. } => current_state = next_state,
633 // have a reader ready, so return it and set ourself to ReadingRowGroup
634 Self::DecodingRowGroup {
635 record_batch_reader,
636 remaining_row_groups,
637 } => {
638 let result = DecodeResult::Data(*record_batch_reader);
639 let next_state = Self::ReadingRowGroup {
640 remaining_row_groups,
641 };
642 return Ok((next_state, result));
643 }
644 Self::Finished => {
645 return Ok((Self::Finished, DecodeResult::Finished));
646 }
647 }
648 }
649 }
650
651 /// Current state --> next state + output
652 ///
653 /// This function is called to get the next RecordBatch
654 ///
655 /// This structure is used to reduce the indentation level of the main loop
656 /// in try_build
657 fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
658 let mut current_state = self;
659 loop {
660 let (new_state, decode_result) = current_state.transition()?;
661 // if more data is needed to transition, can't proceed further without it
662 match decode_result {
663 DecodeResult::NeedsData(ranges) => {
664 return Ok((new_state, DecodeResult::NeedsData(ranges)));
665 }
666 // act next based on state
667 DecodeResult::Data(()) | DecodeResult::Finished => {}
668 }
669 match new_state {
670 // not ready to read yet, continue transitioning
671 Self::ReadingRowGroup { .. } => current_state = new_state,
672 // have a reader ready, so decode the next batch
673 Self::DecodingRowGroup {
674 mut record_batch_reader,
675 remaining_row_groups,
676 } => {
677 match record_batch_reader.next() {
678 // Successfully decoded a batch, return it
679 Some(Ok(batch)) => {
680 let result = DecodeResult::Data(batch);
681 let next_state = Self::DecodingRowGroup {
682 record_batch_reader,
683 remaining_row_groups,
684 };
685 return Ok((next_state, result));
686 }
687 // No more batches in this row group, move to the next row group
688 None => {
689 current_state = Self::ReadingRowGroup {
690 remaining_row_groups,
691 }
692 }
693 // some error occurred while decoding, so return that
694 Some(Err(e)) => {
695 // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
696 return Err(ParquetError::ArrowError(e.to_string()));
697 }
698 }
699 }
700 Self::Finished => {
701 return Ok((Self::Finished, DecodeResult::Finished));
702 }
703 }
704 }
705 }
706
707 /// Transition to the next state with a reader (data can be produced), if not end of stream
708 ///
709 /// This function is called in a loop until the decoder is ready to return
710 /// data (has the required pages buffered) or is finished.
711 fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
712 // result returned when there is data ready
713 let data_ready = DecodeResult::Data(());
714 match self {
715 Self::ReadingRowGroup {
716 mut remaining_row_groups,
717 } => {
718 match remaining_row_groups.try_next_reader()? {
719 // If we have a next reader, we can transition to decoding it
720 DecodeResult::Data(record_batch_reader) => {
721 // Transition to decoding the row group
722 Ok((
723 Self::DecodingRowGroup {
724 record_batch_reader: Box::new(record_batch_reader),
725 remaining_row_groups,
726 },
727 data_ready,
728 ))
729 }
730 DecodeResult::NeedsData(ranges) => {
731 // If we need more data, we return the ranges needed and stay in Reading
732 // RowGroup state
733 Ok((
734 Self::ReadingRowGroup {
735 remaining_row_groups,
736 },
737 DecodeResult::NeedsData(ranges),
738 ))
739 }
740 // If there are no more readers, we are finished
741 DecodeResult::Finished => {
742 // No more row groups to read, we are finished
743 Ok((Self::Finished, DecodeResult::Finished))
744 }
745 }
746 }
747 // if we are already in DecodingRowGroup, just return data ready
748 Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
749 // if finished, just return finished
750 Self::Finished => Ok((self, DecodeResult::Finished)),
751 }
752 }
753
754 /// Push data, and transition state if needed
755 ///
756 /// This should correspond to the data ranges requested by the decoder
757 pub fn push_data(
758 self,
759 ranges: Vec<Range<u64>>,
760 data: Vec<Bytes>,
761 ) -> Result<Self, ParquetError> {
762 match self {
763 ParquetDecoderState::ReadingRowGroup {
764 mut remaining_row_groups,
765 } => {
766 // Push data to the RowGroupReaderBuilder
767 remaining_row_groups.push_data(ranges, data);
768 Ok(ParquetDecoderState::ReadingRowGroup {
769 remaining_row_groups,
770 })
771 }
772 // it is ok to get data before we asked for it
773 ParquetDecoderState::DecodingRowGroup {
774 record_batch_reader,
775 mut remaining_row_groups,
776 } => {
777 remaining_row_groups.push_data(ranges, data);
778 Ok(ParquetDecoderState::DecodingRowGroup {
779 record_batch_reader,
780 remaining_row_groups,
781 })
782 }
783 ParquetDecoderState::Finished => Err(ParquetError::General(
784 "Cannot push data to a finished decoder".to_string(),
785 )),
786 }
787 }
788
789 /// How many bytes are currently buffered in the decoder?
790 fn buffered_bytes(&self) -> u64 {
791 match self {
792 ParquetDecoderState::ReadingRowGroup {
793 remaining_row_groups,
794 } => remaining_row_groups.buffered_bytes(),
795 ParquetDecoderState::DecodingRowGroup {
796 record_batch_reader: _,
797 remaining_row_groups,
798 } => remaining_row_groups.buffered_bytes(),
799 ParquetDecoderState::Finished => 0,
800 }
801 }
802
803 /// Clear any staged ranges currently buffered in the decoder.
804 fn clear_all_ranges(&mut self) {
805 match self {
806 ParquetDecoderState::ReadingRowGroup {
807 remaining_row_groups,
808 } => remaining_row_groups.clear_all_ranges(),
809 ParquetDecoderState::DecodingRowGroup {
810 record_batch_reader: _,
811 remaining_row_groups,
812 } => remaining_row_groups.clear_all_ranges(),
813 ParquetDecoderState::Finished => {}
814 }
815 }
816
817 fn is_at_row_group_boundary(&self) -> bool {
818 match self {
819 ParquetDecoderState::ReadingRowGroup {
820 remaining_row_groups,
821 } => remaining_row_groups.is_at_row_group_boundary(),
822 // Mid-row-group: the active reader holds an `ArrayReader` and
823 // `ReadPlan` keyed to the *current* projection/filter; rebuilding
824 // would require throwing that work away.
825 ParquetDecoderState::DecodingRowGroup { .. } => false,
826 ParquetDecoderState::Finished => false,
827 }
828 }
829
830 fn row_groups_remaining(&self) -> usize {
831 match self {
832 ParquetDecoderState::ReadingRowGroup {
833 remaining_row_groups,
834 } => remaining_row_groups.row_groups_remaining(),
835 ParquetDecoderState::DecodingRowGroup {
836 remaining_row_groups,
837 ..
838 } => remaining_row_groups.row_groups_remaining(),
839 ParquetDecoderState::Finished => 0,
840 }
841 }
842
843 fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError> {
844 let remaining_row_groups = match self {
845 ParquetDecoderState::ReadingRowGroup {
846 remaining_row_groups,
847 } => remaining_row_groups,
848 ParquetDecoderState::DecodingRowGroup { .. } => {
849 return Err(ParquetError::General(
850 "into_builder called while a row group is being decoded; \
851 check is_at_row_group_boundary() first"
852 .to_string(),
853 ));
854 }
855 ParquetDecoderState::Finished => {
856 return Err(ParquetError::General(
857 "into_builder called on a finished decoder".to_string(),
858 ));
859 }
860 };
861 if !remaining_row_groups.is_at_row_group_boundary() {
862 return Err(ParquetError::General(
863 "into_builder called mid-row-group; check is_at_row_group_boundary() first"
864 .to_string(),
865 ));
866 }
867 Ok(builder_from_remaining(remaining_row_groups.into_parts()))
868 }
869}
870
871#[cfg(test)]
872mod test {
873 use super::*;
874 use crate::DecodeResult;
875 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
876 use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
877 use crate::arrow::{ArrowWriter, ProjectionMask};
878 use crate::errors::ParquetError;
879 use crate::file::metadata::ParquetMetaDataPushDecoder;
880 use crate::file::properties::WriterProperties;
881 use arrow::compute::kernels::cmp::{gt, lt};
882 use arrow_array::cast::AsArray;
883 use arrow_array::types::Int64Type;
884 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
885 use arrow_select::concat::concat_batches;
886 use bytes::Bytes;
887 use std::fmt::Debug;
888 use std::ops::Range;
889 use std::sync::{Arc, LazyLock};
890
891 /// Test decoder struct size (as they are copied around on each transition, they
892 /// should not grow too large)
893 #[test]
894 fn test_decoder_size() {
895 assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
896 }
897
898 /// Decode the entire file at once, simulating a scenario where all data is
899 /// available in memory
900 #[test]
901 fn test_decoder_all_data() {
902 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
903 .unwrap()
904 .build()
905 .unwrap();
906
907 decoder
908 .push_range(test_file_range(), TEST_FILE_DATA.clone())
909 .unwrap();
910
911 let results = vec![
912 // first row group should be decoded without needing more data
913 expect_data(decoder.try_decode()),
914 // second row group should be decoded without needing more data
915 expect_data(decoder.try_decode()),
916 ];
917 expect_finished(decoder.try_decode());
918
919 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
920 // Check that the output matches the input batch
921 assert_eq!(all_output, *TEST_BATCH);
922 }
923
924 /// Decode the entire file incrementally, simulating a scenario where data is
925 /// fetched as needed
926 #[test]
927 fn test_decoder_incremental() {
928 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
929 .unwrap()
930 .build()
931 .unwrap();
932
933 let mut results = vec![];
934
935 // First row group, expect a single request
936 let ranges = expect_needs_data(decoder.try_decode());
937 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
938 push_ranges_to_decoder(&mut decoder, ranges);
939 // The decoder should currently only store the data it needs to decode the first row group
940 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
941 results.push(expect_data(decoder.try_decode()));
942 // the decoder should have consumed the data for the first row group and freed it
943 assert_eq!(decoder.buffered_bytes(), 0);
944
945 // Second row group,
946 let ranges = expect_needs_data(decoder.try_decode());
947 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
948 push_ranges_to_decoder(&mut decoder, ranges);
949 // The decoder should currently only store the data it needs to decode the second row group
950 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
951 results.push(expect_data(decoder.try_decode()));
952 // the decoder should have consumed the data for the second row group and freed it
953 assert_eq!(decoder.buffered_bytes(), 0);
954 expect_finished(decoder.try_decode());
955
956 // Check that the output matches the input batch
957 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
958 assert_eq!(all_output, *TEST_BATCH);
959 }
960
961 /// Releasing staged ranges should free speculative buffers without affecting
962 /// the active row group reader.
963 #[test]
964 fn test_decoder_clear_all_ranges() {
965 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
966 .unwrap()
967 .with_batch_size(100)
968 .build()
969 .unwrap();
970
971 decoder
972 .push_range(test_file_range(), TEST_FILE_DATA.clone())
973 .unwrap();
974 assert_eq!(decoder.buffered_bytes(), test_file_len());
975
976 // The current row group reader is built from the prefetched bytes, but
977 // the speculative full-file range remains staged in the decoder.
978 let batch1 = expect_data(decoder.try_decode());
979 assert_eq!(batch1, TEST_BATCH.slice(0, 100));
980 assert_eq!(decoder.buffered_bytes(), test_file_len());
981
982 // All of the buffer is released
983 decoder.clear_all_ranges();
984 assert_eq!(decoder.buffered_bytes(), 0);
985
986 // The active reader still owns the current row group's bytes, so it can
987 // continue decoding without consulting PushBuffers.
988 let batch2 = expect_data(decoder.try_decode());
989 assert_eq!(batch2, TEST_BATCH.slice(100, 100));
990 assert_eq!(decoder.buffered_bytes(), 0);
991
992 // Moving to the next row group now requires the decoder to ask for data
993 // again because the staged speculative ranges were released.
994 let ranges = expect_needs_data(decoder.try_decode());
995 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
996 push_ranges_to_decoder(&mut decoder, ranges);
997 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
998
999 let batch3 = expect_data(decoder.try_decode());
1000 assert_eq!(batch3, TEST_BATCH.slice(200, 100));
1001 assert_eq!(decoder.buffered_bytes(), 0);
1002
1003 let batch4 = expect_data(decoder.try_decode());
1004 assert_eq!(batch4, TEST_BATCH.slice(300, 100));
1005 assert_eq!(decoder.buffered_bytes(), 0);
1006
1007 expect_finished(decoder.try_decode());
1008 }
1009
1010 /// Decode the entire file incrementally, simulating partial reads
1011 #[test]
1012 fn test_decoder_partial() {
1013 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1014 .unwrap()
1015 .build()
1016 .unwrap();
1017
1018 // First row group, expect a single request for all data needed to read "a" and "b"
1019 let ranges = expect_needs_data(decoder.try_decode());
1020 push_ranges_to_decoder(&mut decoder, ranges);
1021
1022 let batch1 = expect_data(decoder.try_decode());
1023 let expected1 = TEST_BATCH.slice(0, 200);
1024 assert_eq!(batch1, expected1);
1025
1026 // Second row group, this time provide the data in two steps
1027 let ranges = expect_needs_data(decoder.try_decode());
1028 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
1029 assert!(!ranges1.is_empty());
1030 assert!(!ranges2.is_empty());
1031 // push first half to simulate partial read
1032 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
1033
1034 // still expect more data
1035 let ranges = expect_needs_data(decoder.try_decode());
1036 assert_eq!(ranges, ranges2); // should be the remaining ranges
1037 // push empty ranges should be a no-op
1038 push_ranges_to_decoder(&mut decoder, vec![]);
1039 let ranges = expect_needs_data(decoder.try_decode());
1040 assert_eq!(ranges, ranges2); // should be the remaining ranges
1041 push_ranges_to_decoder(&mut decoder, ranges);
1042
1043 let batch2 = expect_data(decoder.try_decode());
1044 let expected2 = TEST_BATCH.slice(200, 200);
1045 assert_eq!(batch2, expected2);
1046
1047 expect_finished(decoder.try_decode());
1048 }
1049
1050 /// Decode multiple columns "a" and "b", expect that the decoder requests
1051 /// only a single request per row group
1052 #[test]
1053 fn test_decoder_selection_does_one_request() {
1054 let builder =
1055 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1056
1057 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1058
1059 let mut decoder = builder
1060 .with_projection(
1061 ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
1062 )
1063 .build()
1064 .unwrap();
1065
1066 // First row group, expect a single request for all data needed to read "a" and "b"
1067 let ranges = expect_needs_data(decoder.try_decode());
1068 push_ranges_to_decoder(&mut decoder, ranges);
1069
1070 let batch1 = expect_data(decoder.try_decode());
1071 let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
1072 assert_eq!(batch1, expected1);
1073
1074 // Second row group, similarly expect a single request for all data needed to read "a" and "b"
1075 let ranges = expect_needs_data(decoder.try_decode());
1076 push_ranges_to_decoder(&mut decoder, ranges);
1077
1078 let batch2 = expect_data(decoder.try_decode());
1079 let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
1080 assert_eq!(batch2, expected2);
1081
1082 expect_finished(decoder.try_decode());
1083 }
1084
1085 /// Decode with a filter that requires multiple requests, but only provide part
1086 /// of the data needed for the filter at a time simulating partial reads.
1087 #[test]
1088 fn test_decoder_single_filter_partial() {
1089 let builder =
1090 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1091
1092 // Values in column "a" range 0..399
1093 // First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1)
1094 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1095
1096 // a > 250
1097 let row_filter_a = ArrowPredicateFn::new(
1098 // claim to use both a and b so we get two ranges requests for the filter pages
1099 ProjectionMask::columns(&schema_descr, ["a", "b"]),
1100 |batch: RecordBatch| {
1101 let scalar_250 = Int64Array::new_scalar(250);
1102 let column = batch.column(0).as_primitive::<Int64Type>();
1103 gt(column, &scalar_250)
1104 },
1105 );
1106
1107 let mut decoder = builder
1108 .with_projection(
1109 // read only column "a" to test that filter pages are reused
1110 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1111 )
1112 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1113 .build()
1114 .unwrap();
1115
1116 // First row group, evaluating filters
1117 let ranges = expect_needs_data(decoder.try_decode());
1118 // only provide half the ranges
1119 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
1120 assert!(!ranges1.is_empty());
1121 assert!(!ranges2.is_empty());
1122 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
1123 // still expect more data
1124 let ranges = expect_needs_data(decoder.try_decode());
1125 assert_eq!(ranges, ranges2); // should be the remaining ranges
1126 let ranges = expect_needs_data(decoder.try_decode());
1127 assert_eq!(ranges, ranges2); // should be the remaining ranges
1128 push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
1129
1130 // Since no rows in the first row group pass the filters, there is no
1131 // additional requests to read data pages for "b" here
1132
1133 // Second row group
1134 let ranges = expect_needs_data(decoder.try_decode());
1135 push_ranges_to_decoder(&mut decoder, ranges);
1136
1137 let batch = expect_data(decoder.try_decode());
1138 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1139 assert_eq!(batch, expected);
1140
1141 expect_finished(decoder.try_decode());
1142 }
1143
1144 /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
1145 #[test]
1146 fn test_decoder_single_filter_and_row_selection() {
1147 let builder =
1148 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1149
1150 // Values in column "a" range 0..399
1151 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
1152 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1153
1154 // a > 250
1155 let row_filter_a = ArrowPredicateFn::new(
1156 ProjectionMask::columns(&schema_descr, ["a"]),
1157 |batch: RecordBatch| {
1158 let scalar_250 = Int64Array::new_scalar(250);
1159 let column = batch.column(0).as_primitive::<Int64Type>();
1160 gt(column, &scalar_250)
1161 },
1162 );
1163
1164 let mut decoder = builder
1165 .with_projection(
1166 // read only column "a" to test that filter pages are reused
1167 ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
1168 )
1169 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1170 .with_row_selection(RowSelection::from(vec![
1171 RowSelector::skip(200), // skip first row group
1172 RowSelector::select(100), // first 100 rows of second row group
1173 RowSelector::skip(100),
1174 ]))
1175 .build()
1176 .unwrap();
1177
1178 // expect the first row group to be filtered out (no filter is evaluated due to row selection)
1179
1180 // First row group, first filter (a > 250)
1181 let ranges = expect_needs_data(decoder.try_decode());
1182 push_ranges_to_decoder(&mut decoder, ranges);
1183
1184 // Second row group
1185 let ranges = expect_needs_data(decoder.try_decode());
1186 push_ranges_to_decoder(&mut decoder, ranges);
1187
1188 let batch = expect_data(decoder.try_decode());
1189 let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
1190 assert_eq!(batch, expected);
1191
1192 expect_finished(decoder.try_decode());
1193 }
1194
1195 /// Decode with multiple filters that require multiple requests
1196 #[test]
1197 fn test_decoder_multi_filters() {
1198 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1199 let builder =
1200 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1201
1202 // Values in column "a" range 0..399
1203 // Values in column "b" range 400..799
1204 // First filter: "a" > 175 (last data page in Row Group 0)
1205 // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
1206 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1207
1208 // a > 175
1209 let row_filter_a = ArrowPredicateFn::new(
1210 ProjectionMask::columns(&schema_descr, ["a"]),
1211 |batch: RecordBatch| {
1212 let scalar_175 = Int64Array::new_scalar(175);
1213 let column = batch.column(0).as_primitive::<Int64Type>();
1214 gt(column, &scalar_175)
1215 },
1216 );
1217
1218 // b < 625
1219 let row_filter_b = ArrowPredicateFn::new(
1220 ProjectionMask::columns(&schema_descr, ["b"]),
1221 |batch: RecordBatch| {
1222 let scalar_625 = Int64Array::new_scalar(625);
1223 let column = batch.column(0).as_primitive::<Int64Type>();
1224 lt(column, &scalar_625)
1225 },
1226 );
1227
1228 let mut decoder = builder
1229 .with_projection(
1230 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1231 )
1232 .with_row_filter(RowFilter::new(vec![
1233 Box::new(row_filter_a),
1234 Box::new(row_filter_b),
1235 ]))
1236 .build()
1237 .unwrap();
1238
1239 // First row group, first filter (a > 175)
1240 let ranges = expect_needs_data(decoder.try_decode());
1241 push_ranges_to_decoder(&mut decoder, ranges);
1242
1243 // first row group, second filter (b < 625)
1244 let ranges = expect_needs_data(decoder.try_decode());
1245 push_ranges_to_decoder(&mut decoder, ranges);
1246
1247 // first row group, data pages for "c"
1248 let ranges = expect_needs_data(decoder.try_decode());
1249 push_ranges_to_decoder(&mut decoder, ranges);
1250
1251 // expect the first batch to be decoded: rows 176..199, column "c"
1252 let batch1 = expect_data(decoder.try_decode());
1253 let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
1254 assert_eq!(batch1, expected1);
1255
1256 // Second row group, first filter (a > 175)
1257 let ranges = expect_needs_data(decoder.try_decode());
1258 push_ranges_to_decoder(&mut decoder, ranges);
1259
1260 // Second row group, second filter (b < 625)
1261 let ranges = expect_needs_data(decoder.try_decode());
1262 push_ranges_to_decoder(&mut decoder, ranges);
1263
1264 // Second row group, data pages for "c"
1265 let ranges = expect_needs_data(decoder.try_decode());
1266 push_ranges_to_decoder(&mut decoder, ranges);
1267
1268 // expect the second batch to be decoded: rows 200..224, column "c"
1269 let batch2 = expect_data(decoder.try_decode());
1270 let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
1271 assert_eq!(batch2, expected2);
1272
1273 expect_finished(decoder.try_decode());
1274 }
1275
1276 /// Decode with a filter that uses a column that is also projected, and expect
1277 /// that the filter pages are reused (don't refetch them)
1278 #[test]
1279 fn test_decoder_reuses_filter_pages() {
1280 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1281 let builder =
1282 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1283
1284 // Values in column "a" range 0..399
1285 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
1286 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1287
1288 // a > 250
1289 let row_filter_a = ArrowPredicateFn::new(
1290 ProjectionMask::columns(&schema_descr, ["a"]),
1291 |batch: RecordBatch| {
1292 let scalar_250 = Int64Array::new_scalar(250);
1293 let column = batch.column(0).as_primitive::<Int64Type>();
1294 gt(column, &scalar_250)
1295 },
1296 );
1297
1298 let mut decoder = builder
1299 .with_projection(
1300 // read only column "a" to test that filter pages are reused
1301 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1302 )
1303 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1304 .build()
1305 .unwrap();
1306
1307 // First row group, first filter (a > 175)
1308 let ranges = expect_needs_data(decoder.try_decode());
1309 push_ranges_to_decoder(&mut decoder, ranges);
1310
1311 // expect the first row group to be filtered out (no rows match)
1312
1313 // Second row group, first filter (a > 250)
1314 let ranges = expect_needs_data(decoder.try_decode());
1315 push_ranges_to_decoder(&mut decoder, ranges);
1316
1317 // expect that the second row group is decoded: rows 251..399, column "a"
1318 // Note that the filter pages for "a" should be reused and no additional data
1319 // should be requested
1320 let batch = expect_data(decoder.try_decode());
1321 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1322 assert_eq!(batch, expected);
1323
1324 expect_finished(decoder.try_decode());
1325 }
1326
1327 #[test]
1328 fn test_decoder_empty_filters() {
1329 let builder =
1330 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1331 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1332
1333 // only read column "c", but with empty filters
1334 let mut decoder = builder
1335 .with_projection(
1336 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1337 )
1338 .with_row_filter(RowFilter::new(vec![
1339 // empty filters should be ignored
1340 ]))
1341 .build()
1342 .unwrap();
1343
1344 // First row group
1345 let ranges = expect_needs_data(decoder.try_decode());
1346 push_ranges_to_decoder(&mut decoder, ranges);
1347
1348 // expect the first batch to be decoded: rows 0..199, column "c"
1349 let batch1 = expect_data(decoder.try_decode());
1350 let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1351 assert_eq!(batch1, expected1);
1352
1353 // Second row group,
1354 let ranges = expect_needs_data(decoder.try_decode());
1355 push_ranges_to_decoder(&mut decoder, ranges);
1356
1357 // expect the second batch to be decoded: rows 200..399, column "c"
1358 let batch2 = expect_data(decoder.try_decode());
1359 let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1360
1361 assert_eq!(batch2, expected2);
1362
1363 expect_finished(decoder.try_decode());
1364 }
1365
1366 /// When filter pushdown is combined with a `LIMIT`, the predicate must
1367 /// not be evaluated for rows beyond the `limit`-th match.
1368 ///
1369 /// Filter `a > 175` produces 24 matches in row group 0 (rows 176..199).
1370 /// With `limit = 10`, only the first 10 matches (rows 176..185) should be
1371 /// emitted, AND the predicate counter should observe that evaluation was
1372 /// short-circuited.
1373 #[test]
1374 fn test_decoder_filter_with_limit_short_circuits_within_row_group() {
1375 use std::sync::atomic::{AtomicUsize, Ordering};
1376
1377 let builder =
1378 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1379 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1380
1381 let rows_filtered = Arc::new(AtomicUsize::new(0));
1382 let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1383
1384 let row_filter_a = ArrowPredicateFn::new(
1385 ProjectionMask::columns(&schema_descr, ["a"]),
1386 move |batch: RecordBatch| {
1387 rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1388 let scalar_175 = Int64Array::new_scalar(175);
1389 let column = batch.column(0).as_primitive::<Int64Type>();
1390 gt(column, &scalar_175)
1391 },
1392 );
1393
1394 // Use a small batch size so the row group is evaluated across
1395 // multiple predicate batches; that is the regime where Layer 2's
1396 // short-circuit saves predicate evaluation work. Matching rows are
1397 // 176..199 (24 rows); with batch_size = 10 those span batches 17, 18,
1398 // and 19 (rows 170..199). A limit of 10 should stop filter evaluation
1399 // in the middle of batch 18.
1400 let mut decoder = builder
1401 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1402 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1403 .with_batch_size(10)
1404 .with_limit(10)
1405 .build()
1406 .unwrap();
1407
1408 // First row group: filter columns fetch (predicate is evaluated here)
1409 let ranges = expect_needs_data(decoder.try_decode());
1410 push_ranges_to_decoder(&mut decoder, ranges);
1411
1412 // The first 10 matching rows come out: 176..185, column "a"
1413 let batch = expect_data(decoder.try_decode());
1414 let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1415 assert_eq!(batch, expected);
1416
1417 // no data for row group 1 should be requested — the limit
1418 // was satisfied by row group 0 and the `Start` state for row group 1
1419 // short-circuits to `Finished`.
1420 expect_finished(decoder.try_decode());
1421
1422 // Row 186 is the 11th match; the scan should stop no later than the
1423 // batch containing it (batch 18 of 10 rows = rows 180..189), so at
1424 // most 190 rows are evaluated.
1425 let evaluated = rows_filtered.load(Ordering::Relaxed);
1426 assert!(
1427 evaluated <= 190,
1428 "predicate evaluated {evaluated} rows; expected ≤ 190 (stop within batch containing 11th match)"
1429 );
1430 }
1431
1432 /// Once the limit has been satisfied by a prior row group, subsequent
1433 /// row groups should be skipped entirely — no data request for their
1434 /// filter columns.
1435 #[test]
1436 fn test_decoder_filter_with_limit_skips_later_row_groups() {
1437 let builder =
1438 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1439 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1440
1441 // `a > 175` matches rows 176..199 in row group 0 (24 matches) and
1442 // 200..399 in row group 1 (200 matches). With limit = 5, all matches
1443 // should come from row group 0.
1444 let row_filter_a = ArrowPredicateFn::new(
1445 ProjectionMask::columns(&schema_descr, ["a"]),
1446 |batch: RecordBatch| {
1447 let scalar_175 = Int64Array::new_scalar(175);
1448 let column = batch.column(0).as_primitive::<Int64Type>();
1449 gt(column, &scalar_175)
1450 },
1451 );
1452
1453 let mut decoder = builder
1454 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1455 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1456 .with_limit(5)
1457 .build()
1458 .unwrap();
1459
1460 // Row group 0: fetch filter pages
1461 let ranges = expect_needs_data(decoder.try_decode());
1462 push_ranges_to_decoder(&mut decoder, ranges);
1463
1464 // First 5 matches: 176..180
1465 let batch = expect_data(decoder.try_decode());
1466 let expected = TEST_BATCH.slice(176, 5).project(&[0]).unwrap();
1467 assert_eq!(batch, expected);
1468
1469 // Row group 1 must NOT request data — the limit is already satisfied
1470 // so `Start` in row group 1 short-circuits to `Finished`.
1471 expect_finished(decoder.try_decode());
1472 }
1473
1474 /// The predicate short-circuit must account for `self.offset` as well as
1475 /// `self.limit`. The post-predicate `with_offset` step skips that many
1476 /// already-selected rows before `with_limit` counts output rows — so the
1477 /// predicate must retain at least `offset + limit` matches. Without the
1478 /// fix, Layer 2 caps at just `limit` and the later `with_offset` consumes
1479 /// all of them, producing 0 rows instead of `limit`.
1480 ///
1481 /// `a > 175` matches rows 176..199 in row group 0 (24 matches). With
1482 /// `offset = 10, limit = 5`, the expected output is rows 186..190 (the
1483 /// 11th through 15th matches).
1484 #[test]
1485 fn test_decoder_filter_with_offset_and_limit() {
1486 let builder =
1487 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1488 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1489
1490 let row_filter_a = ArrowPredicateFn::new(
1491 ProjectionMask::columns(&schema_descr, ["a"]),
1492 |batch: RecordBatch| {
1493 let scalar_175 = Int64Array::new_scalar(175);
1494 let column = batch.column(0).as_primitive::<Int64Type>();
1495 gt(column, &scalar_175)
1496 },
1497 );
1498
1499 let mut decoder = builder
1500 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1501 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1502 .with_offset(10)
1503 .with_limit(5)
1504 .build()
1505 .unwrap();
1506
1507 let ranges = expect_needs_data(decoder.try_decode());
1508 push_ranges_to_decoder(&mut decoder, ranges);
1509
1510 let batch = expect_data(decoder.try_decode());
1511 let expected = TEST_BATCH.slice(186, 5).project(&[0]).unwrap();
1512 assert_eq!(batch, expected);
1513
1514 expect_finished(decoder.try_decode());
1515 }
1516
1517 /// The limit short-circuit must also be correct when the limited predicate
1518 /// is the last predicate in a multi-predicate chain.
1519 ///
1520 /// `a > 175` first narrows row group 0 to rows 176..199. The final
1521 /// predicate `b < 625` is then evaluated only over those 24 rows, all of
1522 /// which match. With `limit = 10`, the final output should still be rows
1523 /// 176..185, and the second predicate should stop before consuming all 24
1524 /// selected rows.
1525 #[test]
1526 fn test_decoder_multi_filters_with_limit() {
1527 use std::sync::atomic::{AtomicUsize, Ordering};
1528
1529 let builder =
1530 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1531 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1532
1533 let first_predicate_rows = Arc::new(AtomicUsize::new(0));
1534 let second_predicate_rows = Arc::new(AtomicUsize::new(0));
1535
1536 let first_predicate_rows_for_filter = Arc::clone(&first_predicate_rows);
1537 let row_filter_a = ArrowPredicateFn::new(
1538 ProjectionMask::columns(&schema_descr, ["a"]),
1539 move |batch: RecordBatch| {
1540 first_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1541 let scalar_175 = Int64Array::new_scalar(175);
1542 let column = batch.column(0).as_primitive::<Int64Type>();
1543 gt(column, &scalar_175)
1544 },
1545 );
1546
1547 let second_predicate_rows_for_filter = Arc::clone(&second_predicate_rows);
1548 let row_filter_b = ArrowPredicateFn::new(
1549 ProjectionMask::columns(&schema_descr, ["b"]),
1550 move |batch: RecordBatch| {
1551 second_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1552 let scalar_625 = Int64Array::new_scalar(625);
1553 let column = batch.column(0).as_primitive::<Int64Type>();
1554 lt(column, &scalar_625)
1555 },
1556 );
1557
1558 let mut decoder = builder
1559 .with_projection(ProjectionMask::columns(&schema_descr, ["c"]))
1560 .with_row_filter(RowFilter::new(vec![
1561 Box::new(row_filter_a),
1562 Box::new(row_filter_b),
1563 ]))
1564 .with_batch_size(10)
1565 .with_limit(10)
1566 .build()
1567 .unwrap();
1568
1569 // Row group 0, first predicate
1570 let ranges = expect_needs_data(decoder.try_decode());
1571 push_ranges_to_decoder(&mut decoder, ranges);
1572
1573 // Row group 0, second predicate
1574 let ranges = expect_needs_data(decoder.try_decode());
1575 push_ranges_to_decoder(&mut decoder, ranges);
1576
1577 // Final projected data
1578 let ranges = expect_needs_data(decoder.try_decode());
1579 push_ranges_to_decoder(&mut decoder, ranges);
1580
1581 let batch = expect_data(decoder.try_decode());
1582 let expected = TEST_BATCH.slice(176, 10).project(&[2]).unwrap();
1583 assert_eq!(batch, expected);
1584
1585 // The overall limit was satisfied by row group 0.
1586 expect_finished(decoder.try_decode());
1587
1588 assert_eq!(first_predicate_rows.load(Ordering::Relaxed), 200);
1589 assert!(
1590 second_predicate_rows.load(Ordering::Relaxed) < 24,
1591 "final predicate should short-circuit before consuming all 24 rows selected by the first predicate"
1592 );
1593 }
1594
1595 /// When a row selection already exists, limiting the predicate must still
1596 /// preserve alignment with that prior selection.
1597 ///
1598 /// The explicit selection narrows row group 0 to rows 150..199. Applying
1599 /// `a > 175` over that selection yields rows 176..199. With `limit = 10`,
1600 /// the decoder should emit rows 176..185 and stop without evaluating the
1601 /// remaining selected rows.
1602 #[test]
1603 fn test_decoder_filter_with_row_selection_and_limit() {
1604 use std::sync::atomic::{AtomicUsize, Ordering};
1605
1606 let builder =
1607 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1608 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1609
1610 let rows_filtered = Arc::new(AtomicUsize::new(0));
1611 let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1612
1613 let row_filter_a = ArrowPredicateFn::new(
1614 ProjectionMask::columns(&schema_descr, ["a"]),
1615 move |batch: RecordBatch| {
1616 rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1617 let scalar_175 = Int64Array::new_scalar(175);
1618 let column = batch.column(0).as_primitive::<Int64Type>();
1619 gt(column, &scalar_175)
1620 },
1621 );
1622
1623 let mut decoder = builder
1624 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1625 .with_row_selection(RowSelection::from(vec![
1626 RowSelector::skip(150),
1627 RowSelector::select(50),
1628 ]))
1629 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1630 .with_batch_size(10)
1631 .with_limit(10)
1632 .build()
1633 .unwrap();
1634
1635 let ranges = expect_needs_data(decoder.try_decode());
1636 push_ranges_to_decoder(&mut decoder, ranges);
1637
1638 let batch = expect_data(decoder.try_decode());
1639 let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1640 assert_eq!(batch, expected);
1641
1642 expect_finished(decoder.try_decode());
1643
1644 assert!(
1645 rows_filtered.load(Ordering::Relaxed) < 50,
1646 "predicate should short-circuit before consuming all 50 rows from the explicit row selection"
1647 );
1648 }
1649
1650 #[test]
1651 fn test_decoder_offset_limit() {
1652 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1653 .unwrap()
1654 // skip entire first row group (200 rows) and first 25 rows of second row group
1655 .with_offset(225)
1656 // and limit to 20 rows
1657 .with_limit(20)
1658 .build()
1659 .unwrap();
1660
1661 // First row group should be skipped,
1662
1663 // Second row group
1664 let ranges = expect_needs_data(decoder.try_decode());
1665 push_ranges_to_decoder(&mut decoder, ranges);
1666
1667 // expect the first and only batch to be decoded
1668 let batch1 = expect_data(decoder.try_decode());
1669 let expected1 = TEST_BATCH.slice(225, 20);
1670 assert_eq!(batch1, expected1);
1671
1672 expect_finished(decoder.try_decode());
1673 }
1674
1675 #[test]
1676 fn test_decoder_try_next_reader_offset_limit() {
1677 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1678 .unwrap()
1679 .with_offset(225)
1680 .with_limit(20)
1681 .build()
1682 .unwrap();
1683
1684 let ranges = expect_needs_data(decoder.try_next_reader());
1685 push_ranges_to_decoder(&mut decoder, ranges);
1686
1687 let reader = expect_data(decoder.try_next_reader());
1688 let batches = reader
1689 .map(|batch| batch.expect("expected decoded batch"))
1690 .collect::<Vec<_>>();
1691 let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
1692 assert_eq!(output, TEST_BATCH.slice(225, 20));
1693
1694 expect_finished(decoder.try_next_reader());
1695 }
1696
1697 #[test]
1698 fn test_decoder_row_group_selection() {
1699 // take only the second row group
1700 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1701 .unwrap()
1702 .with_row_groups(vec![1])
1703 .build()
1704 .unwrap();
1705
1706 // First row group should be skipped,
1707
1708 // Second row group
1709 let ranges = expect_needs_data(decoder.try_decode());
1710 push_ranges_to_decoder(&mut decoder, ranges);
1711
1712 // expect the first and only batch to be decoded
1713 let batch1 = expect_data(decoder.try_decode());
1714 let expected1 = TEST_BATCH.slice(200, 200);
1715 assert_eq!(batch1, expected1);
1716
1717 expect_finished(decoder.try_decode());
1718 }
1719
1720 #[test]
1721 fn test_decoder_row_selection() {
1722 // take only the second row group
1723 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1724 .unwrap()
1725 .with_row_selection(RowSelection::from(vec![
1726 RowSelector::skip(225), // skip first row group and 25 rows of second])
1727 RowSelector::select(20), // take 20 rows
1728 ]))
1729 .build()
1730 .unwrap();
1731
1732 // First row group should be skipped,
1733
1734 // Second row group
1735 let ranges = expect_needs_data(decoder.try_decode());
1736 push_ranges_to_decoder(&mut decoder, ranges);
1737
1738 // expect the first ane only batch to be decoded
1739 let batch1 = expect_data(decoder.try_decode());
1740 let expected1 = TEST_BATCH.slice(225, 20);
1741 assert_eq!(batch1, expected1);
1742
1743 expect_finished(decoder.try_decode());
1744 }
1745
1746 /// `into_builder` between row groups recovers a builder for the
1747 /// not-yet-decoded row groups; rebuilding it with a new row filter
1748 /// applies that filter to the subsequent row groups while leaving the
1749 /// already-decoded row group's results untouched.
1750 ///
1751 /// See the "Adaptive scans" section of [`ParquetPushDecoderBuilder`] for
1752 /// the high-level overview.
1753 #[test]
1754 fn test_into_builder_installs_filter_between_row_groups() {
1755 let schema_descr = test_file_parquet_metadata()
1756 .file_metadata()
1757 .schema_descr_ptr();
1758 let mut decoder = prefetched_decoder(1024);
1759
1760 // Reader for row group 0 — no filter.
1761 let reader0 = expect_data(decoder.try_next_reader());
1762 let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1763 let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
1764 assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1765
1766 // We're between row groups now. Rebuild with a filter on column "a".
1767 assert!(decoder.is_at_row_group_boundary());
1768 assert_eq!(decoder.row_groups_remaining(), 1);
1769 let filter =
1770 ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr, ["a"]), |batch| {
1771 gt(batch.column(0), &Int64Array::new_scalar(250))
1772 });
1773 let mut decoder = decoder
1774 .into_builder()
1775 .unwrap()
1776 .with_row_filter(RowFilter::new(vec![Box::new(filter)]))
1777 .build()
1778 .unwrap();
1779
1780 // Reader for row group 1 — filter applied. The rebuilt decoder kept
1781 // the buffered bytes (see `test_into_builder_preserves_buffered_bytes`)
1782 // so no data needs to be re-supplied. Column "a" in RG1 has values
1783 // 200..399; `a > 250` keeps 251..399 = 149 rows.
1784 let reader1 = expect_data(decoder.try_next_reader());
1785 let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1786 let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1787 assert_eq!(batch1, TEST_BATCH.slice(251, 149));
1788 expect_finished(decoder.try_next_reader());
1789 }
1790
1791 /// `into_builder` is rejected while a row group's reader is being
1792 /// drained (`DecodingRowGroup`); the error points at
1793 /// `is_at_row_group_boundary`.
1794 #[test]
1795 fn test_into_builder_rejected_mid_row_group() {
1796 let mut decoder = prefetched_decoder(50);
1797
1798 // Decode one batch to land mid-row-group, inside `DecodingRowGroup`
1799 // with an active reader — not a boundary.
1800 expect_data(decoder.try_decode());
1801 assert!(!decoder.is_at_row_group_boundary());
1802
1803 let err = decoder.into_builder().unwrap_err();
1804 let err_msg = format!("{err}");
1805 assert!(
1806 err_msg.contains("is_at_row_group_boundary"),
1807 "unexpected error: {err_msg}"
1808 );
1809 }
1810
1811 /// `into_builder` is rejected once the decoder has finished.
1812 #[test]
1813 fn test_into_builder_rejected_on_finished_decoder() {
1814 let mut decoder = prefetched_decoder(1024);
1815 expect_data(decoder.try_decode());
1816 expect_data(decoder.try_decode());
1817 expect_finished(decoder.try_decode());
1818 assert!(!decoder.is_at_row_group_boundary());
1819
1820 let err = decoder.into_builder().unwrap_err();
1821 assert!(
1822 format!("{err}").contains("finished"),
1823 "unexpected error: {err}"
1824 );
1825 }
1826
1827 /// `try_next_reader` hands the active reader off to the caller and
1828 /// transitions the decoder back to `ReadingRowGroup` — so the caller
1829 /// can call `into_builder` even while still holding the returned
1830 /// reader. (The handed-off reader has no link back to the decoder's
1831 /// projection/filter; it has its own `ArrayReader` and `ReadPlan`.)
1832 #[test]
1833 fn test_into_builder_allowed_while_iterating_handed_off_reader() {
1834 let mut decoder = prefetched_decoder(1024);
1835
1836 let reader0 = expect_data(decoder.try_next_reader());
1837 // Decoder no longer owns the reader, so it considers itself
1838 // "between row groups".
1839 assert!(decoder.is_at_row_group_boundary());
1840 // Recovering the builder consumes the decoder but leaves `reader0`
1841 // valid: iterating it is independent of the decoder's state.
1842 let _builder = decoder.into_builder().unwrap();
1843 let batches: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1844 let batch0 = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
1845 assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1846 }
1847
1848 /// `into_builder` recovers a builder for the *remaining* row groups and
1849 /// carries the not-yet-consumed offset/limit budget, so a rebuilt
1850 /// decoder resumes where the original left off rather than restarting.
1851 #[test]
1852 fn test_into_builder_resumes_remaining_budget() {
1853 // limit = 250 spans both 200-row row groups: all 200 rows of RG0
1854 // plus the first 50 rows of RG1.
1855 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1856 .unwrap()
1857 .with_batch_size(1024)
1858 .with_limit(250)
1859 .build()
1860 .unwrap();
1861 prefetch_test_file(&mut decoder);
1862
1863 // RG0 contributes all 200 of its rows.
1864 let reader0 = expect_data(decoder.try_next_reader());
1865 let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1866 let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
1867 assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1868
1869 // Rebuild without changing anything: the remaining 50-row limit and
1870 // the not-yet-decoded RG1 must carry through (as do the buffers, so
1871 // no data needs re-supplying).
1872 assert!(decoder.is_at_row_group_boundary());
1873 let mut decoder = decoder.into_builder().unwrap().build().unwrap();
1874
1875 let reader1 = expect_data(decoder.try_next_reader());
1876 let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1877 let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1878 // Only the first 50 rows of RG1 (200..249) — the rest of the limit.
1879 assert_eq!(batch1, TEST_BATCH.slice(200, 50));
1880 expect_finished(decoder.try_next_reader());
1881 }
1882
1883 /// `into_builder` carries the decoder's buffered bytes across the
1884 /// rebuild: the rebuilt decoder keeps them and does not re-request data
1885 /// it already holds.
1886 #[test]
1887 fn test_into_builder_preserves_buffered_bytes() {
1888 let mut decoder = prefetched_decoder(1024);
1889 assert_eq!(decoder.buffered_bytes(), test_file_len());
1890
1891 // Drain RG0.
1892 let reader0 = expect_data(decoder.try_next_reader());
1893 let _: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1894 // RG1's bytes are still staged inside the decoder.
1895 let buffered = decoder.buffered_bytes();
1896 assert!(buffered > 0);
1897
1898 // Rebuilding via into_builder keeps the staged bytes.
1899 let mut decoder = decoder.into_builder().unwrap().build().unwrap();
1900 assert_eq!(decoder.buffered_bytes(), buffered);
1901
1902 // RG1's bytes are already buffered, so it decodes without a
1903 // `NeedsData` round-trip.
1904 let reader1 = expect_data(decoder.try_next_reader());
1905 let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1906 let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1907 assert_eq!(batch1, TEST_BATCH.slice(200, 200));
1908 expect_finished(decoder.try_next_reader());
1909 }
1910
1911 /// Drive the decoder incrementally. Start with a narrow projection,
1912 /// drain RG0, then `into_builder` and widen the projection to all three
1913 /// columns. The rebuilt decoder's `NeedsData` for RG1 must request
1914 /// bytes for *all three* columns, not just the originally-projected
1915 /// "a". The expected ranges are hardcoded because `TEST_BATCH` and the
1916 /// writer settings are static; this pins the layout cleanly without a
1917 /// parallel reference decoder.
1918 #[test]
1919 fn test_into_builder_expand_projection_requests_new_bytes() {
1920 let metadata = test_file_parquet_metadata();
1921 let schema_descr = metadata.file_metadata().schema_descr_ptr();
1922
1923 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
1924 .unwrap()
1925 .with_batch_size(1024)
1926 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1927 .build()
1928 .unwrap();
1929
1930 // RG0: incrementally satisfy the narrow request — a single
1931 // contiguous range for the "a"-only projection.
1932 let ranges_rg0 = expect_needs_data(decoder.try_next_reader());
1933 assert_eq!(ranges_rg0, vec![4..1860]);
1934 push_ranges_to_decoder(&mut decoder, ranges_rg0);
1935
1936 let reader0 = expect_data(decoder.try_next_reader());
1937 let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1938 let batch0 = concat_batches(&batches0[0].schema(), &batches0).unwrap();
1939 assert_eq!(batch0, TEST_BATCH.slice(0, 200).project(&[0]).unwrap());
1940
1941 // Widen the projection at the boundary.
1942 assert!(decoder.is_at_row_group_boundary());
1943 let mut decoder = decoder
1944 .into_builder()
1945 .unwrap()
1946 .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b", "c"]))
1947 .build()
1948 .unwrap();
1949
1950 // RG1 now requests "a", "b", and "c" column chunks: ~1.8KiB each
1951 // for "a" and "b", ~7.5KiB for the StringView column "c".
1952 let ranges_rg1 = expect_needs_data(decoder.try_next_reader());
1953 assert_eq!(ranges_rg1, vec![11062..12918, 12918..14774, 14774..22230]);
1954 push_ranges_to_decoder(&mut decoder, ranges_rg1);
1955
1956 let reader1 = expect_data(decoder.try_next_reader());
1957 let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1958 let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1959 assert_eq!(batch1, TEST_BATCH.slice(200, 200));
1960 expect_finished(decoder.try_next_reader());
1961 }
1962
1963 /// Mirror of [`test_into_builder_expand_projection_requests_new_bytes`]:
1964 /// start with the full projection, drain RG0, then `into_builder` and
1965 /// narrow the projection to just column "a". RG1's `NeedsData` must
1966 /// request only the single "a" column-chunk range, not the three a
1967 /// wide projection would.
1968 #[test]
1969 fn test_into_builder_narrow_projection_requests_fewer_bytes() {
1970 let metadata = test_file_parquet_metadata();
1971 let schema_descr = metadata.file_metadata().schema_descr_ptr();
1972
1973 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
1974 .unwrap()
1975 .with_batch_size(1024)
1976 .build()
1977 .unwrap();
1978
1979 // RG0 with the default (full) projection — three column ranges.
1980 let ranges_rg0 = expect_needs_data(decoder.try_next_reader());
1981 assert_eq!(ranges_rg0, vec![4..1860, 1860..3716, 3716..11062]);
1982 push_ranges_to_decoder(&mut decoder, ranges_rg0);
1983
1984 let reader0 = expect_data(decoder.try_next_reader());
1985 let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1986 let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
1987 assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1988
1989 // Narrow the projection at the boundary.
1990 assert!(decoder.is_at_row_group_boundary());
1991 let mut decoder = decoder
1992 .into_builder()
1993 .unwrap()
1994 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1995 .build()
1996 .unwrap();
1997
1998 // RG1 now requests column "a" only — a single 1856-byte range.
1999 let ranges_rg1 = expect_needs_data(decoder.try_next_reader());
2000 assert_eq!(ranges_rg1, vec![11062..12918]);
2001 push_ranges_to_decoder(&mut decoder, ranges_rg1);
2002
2003 let reader1 = expect_data(decoder.try_next_reader());
2004 let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
2005 let batch1 = concat_batches(&batches1[0].schema(), &batches1).unwrap();
2006 assert_eq!(batch1, TEST_BATCH.slice(200, 200).project(&[0]).unwrap());
2007 expect_finished(decoder.try_next_reader());
2008 }
2009
2010 /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
2011 ///
2012 /// Note c is a different types (so the data page sizes will be different)
2013 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
2014 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
2015 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
2016 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
2017 if i % 2 == 0 {
2018 format!("string_{i}")
2019 } else {
2020 format!("A string larger than 12 bytes and thus not inlined {i}")
2021 }
2022 })));
2023
2024 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
2025 });
2026
2027 /// Create a parquet file in memory for testing.
2028 ///
2029 /// See [`TEST_BATCH`] for the data in the file.
2030 ///
2031 /// Each column is written in 4 data pages, each with 100 rows, across 2
2032 /// row groups. Each column in each row group has two data pages.
2033 ///
2034 /// The data is split across row groups like this
2035 ///
2036 /// Column | Values | Data Page | Row Group
2037 /// -------|------------------------|-----------|-----------
2038 /// a | 0..99 | 1 | 0
2039 /// a | 100..199 | 2 | 0
2040 /// a | 200..299 | 1 | 1
2041 /// a | 300..399 | 2 | 1
2042 ///
2043 /// b | 400..499 | 1 | 0
2044 /// b | 500..599 | 2 | 0
2045 /// b | 600..699 | 1 | 1
2046 /// b | 700..799 | 2 | 1
2047 ///
2048 /// c | "string_0".."string_99" | 1 | 0
2049 /// c | "string_100".."string_199" | 2 | 0
2050 /// c | "string_200".."string_299" | 1 | 1
2051 /// c | "string_300".."string_399" | 2 | 1
2052 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
2053 let input_batch = &TEST_BATCH;
2054 let mut output = Vec::new();
2055
2056 let writer_options = WriterProperties::builder()
2057 .set_max_row_group_row_count(Some(200))
2058 .set_data_page_row_count_limit(100)
2059 .build();
2060 let mut writer =
2061 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
2062
2063 // since the limits are only enforced on batch boundaries, write the input
2064 // batch in chunks of 50
2065 let mut row_remain = input_batch.num_rows();
2066 while row_remain > 0 {
2067 let chunk_size = row_remain.min(50);
2068 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
2069 writer.write(&chunk).unwrap();
2070 row_remain -= chunk_size;
2071 }
2072 writer.close().unwrap();
2073 Bytes::from(output)
2074 });
2075
2076 /// Return the length of [`TEST_FILE_DATA`], in bytes
2077 fn test_file_len() -> u64 {
2078 TEST_FILE_DATA.len() as u64
2079 }
2080
2081 /// Return a range that covers the entire [`TEST_FILE_DATA`]
2082 fn test_file_range() -> Range<u64> {
2083 0..test_file_len()
2084 }
2085
2086 /// Return a slice of the test file data from the given range
2087 pub fn test_file_slice(range: Range<u64>) -> Bytes {
2088 let start: usize = range.start.try_into().unwrap();
2089 let end: usize = range.end.try_into().unwrap();
2090 TEST_FILE_DATA.slice(start..end)
2091 }
2092
2093 /// return the metadata for the test file
2094 pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
2095 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
2096 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
2097 let metadata = metadata_decoder.try_decode().unwrap();
2098 let DecodeResult::Data(metadata) = metadata else {
2099 panic!("Expected metadata to be decoded successfully");
2100 };
2101 Arc::new(metadata)
2102 }
2103
2104 /// Push the given ranges to the metadata decoder, simulating reading from a file
2105 fn push_ranges_to_metadata_decoder(
2106 metadata_decoder: &mut ParquetMetaDataPushDecoder,
2107 ranges: Vec<Range<u64>>,
2108 ) {
2109 let data = ranges
2110 .iter()
2111 .map(|range| test_file_slice(range.clone()))
2112 .collect::<Vec<_>>();
2113 metadata_decoder.push_ranges(ranges, data).unwrap();
2114 }
2115
2116 /// Push the entire test file into `decoder`.
2117 fn prefetch_test_file(decoder: &mut ParquetPushDecoder) {
2118 decoder
2119 .push_range(test_file_range(), TEST_FILE_DATA.clone())
2120 .unwrap();
2121 }
2122
2123 /// Build a decoder over the test file with the given batch size and
2124 /// prefetch the whole file into it.
2125 fn prefetched_decoder(batch_size: usize) -> ParquetPushDecoder {
2126 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
2127 .unwrap()
2128 .with_batch_size(batch_size)
2129 .build()
2130 .unwrap();
2131 prefetch_test_file(&mut decoder);
2132 decoder
2133 }
2134
2135 fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
2136 let data = ranges
2137 .iter()
2138 .map(|range| test_file_slice(range.clone()))
2139 .collect::<Vec<_>>();
2140 decoder.push_ranges(ranges, data).unwrap();
2141 }
2142
2143 /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
2144 fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
2145 match result.expect("Expected Ok(DecodeResult::Data(T))") {
2146 DecodeResult::Data(data) => data,
2147 result => panic!("Expected DecodeResult::Data, got {result:?}"),
2148 }
2149 }
2150
2151 /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
2152 fn expect_needs_data<T: Debug>(
2153 result: Result<DecodeResult<T>, ParquetError>,
2154 ) -> Vec<Range<u64>> {
2155 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
2156 DecodeResult::NeedsData(ranges) => ranges,
2157 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
2158 }
2159 }
2160
2161 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
2162 match result.expect("Expected Ok(DecodeResult::Finished)") {
2163 DecodeResult::Finished => {}
2164 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
2165 }
2166 }
2167}