Skip to main content

parquet/arrow/push_decoder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30pub use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::{RowBudget, RowGroupReaderBuilder, RowGroupReaderBuilderParts};
34use remaining::{RemainingRowGroups, RemainingRowGroupsParts};
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// #   let mut buffer = vec![];
64/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// #   writer.write(&batch).unwrap();
67/// #   writer.close().unwrap();
68/// #   Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// #    let start = range.start as usize;
73/// #     let end = range.end as usize;
74/// #    file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83///     ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84///       .unwrap()
85///       // Optionally configure the decoder, e.g. batch size
86///       .with_batch_size(1024)
87///       // Build the decoder
88///       .build()
89///       .unwrap();
90///
91///     // In a loop, ask the decoder what it needs next, and provide it with the required data
92///     loop {
93///         match decoder.try_decode().unwrap() {
94///             DecodeResult::NeedsData(ranges) => {
95///                 // The decoder needs more data. Fetch the data for the given ranges
96///                 let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97///                 // Push the data to the decoder
98///                 decoder.push_ranges(ranges, data).unwrap();
99///                 // After pushing the data, we can try to decode again on the next iteration
100///             }
101///             DecodeResult::Data(batch) => {
102///                 // Successfully decoded a batch of data
103///                 assert!(batch.num_rows() > 0);
104///             }
105///             DecodeResult::Finished => {
106///                 // The decoder has finished decoding exit the loop
107///                 break;
108///             }
109///         }
110///     }
111/// ```
112///
113/// # Adaptive scans
114///
115/// The scan strategy is not fixed once [`build`](Self::build) is called: it
116/// can be changed *while decoding*, at row-group boundaries.
117///
118/// The important API for this is [`ParquetPushDecoder::try_next_reader`].
119/// Unlike [`try_decode`](ParquetPushDecoder::try_decode), which barrels
120/// straight through row-group boundaries, `try_next_reader` returns once per
121/// row group — leaving a clean window *between* row groups. At any such
122/// boundary, [`ParquetPushDecoder::into_builder`] hands back a
123/// `ParquetPushDecoderBuilder` for the row groups not yet decoded. Change any
124/// option on it (projection, row filter, row selection policy, …) and
125/// [`build`](Self::build) a fresh decoder that resumes from the next row
126/// group. This is how a query engine promotes or demotes filters — for
127/// example turning a row filter on or off — based on the selectivity observed
128/// in the row groups decoded so far.
129///
130/// ```
131/// # use std::ops::Range;
132/// # use std::sync::Arc;
133/// # use bytes::Bytes;
134/// # use arrow_array::record_batch;
135/// # use parquet::DecodeResult;
136/// # use parquet::arrow::ProjectionMask;
137/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
138/// # use parquet::arrow::ArrowWriter;
139/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
140/// # use parquet::file::properties::WriterProperties;
141/// # let file_bytes = {
142/// #   let batch = record_batch!(
143/// #       ("a", Int32, [1, 2, 3, 4, 5, 6]),
144/// #       ("b", Int32, [6, 5, 4, 3, 2, 1])
145/// #   ).unwrap();
146/// #   // Small row groups so the test file has two of them.
147/// #   let props = WriterProperties::builder().set_max_row_group_row_count(Some(3)).build();
148/// #   let mut buffer = vec![];
149/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
150/// #   writer.write(&batch).unwrap();
151/// #   writer.close().unwrap();
152/// #   Bytes::from(buffer)
153/// # };
154/// # let get_range = |r: &Range<u64>| file_bytes.slice(r.start as usize..r.end as usize);
155/// # let file_length = file_bytes.len() as u64;
156/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
157/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
158/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!() };
159/// # let parquet_metadata = Arc::new(parquet_metadata);
160/// let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
161///     .unwrap()
162///     .build()
163///     .unwrap();
164///
165/// // Drive the decoder one row group at a time with `try_next_reader`.
166/// loop {
167///     match decoder.try_next_reader().unwrap() {
168///         DecodeResult::NeedsData(ranges) => {
169///             // Fetch and hand over the bytes the decoder asked for.
170///             let data = ranges.iter().map(|r| get_range(r)).collect();
171///             decoder.push_ranges(ranges, data).unwrap();
172///         }
173///         DecodeResult::Data(reader) => {
174///             // Decode this row group's batches.
175///             for batch in reader {
176///                 assert!(batch.unwrap().num_rows() > 0);
177///             }
178///             // We are now at a row-group boundary. Based on whatever stats
179///             // were gathered, optionally change strategy for the row groups
180///             // still to come: drop or promote a row filter, narrow or widen
181///             // the projection, etc.
182///             if decoder.is_at_row_group_boundary() && decoder.row_groups_remaining() > 0 {
183///                 let builder = decoder.into_builder().unwrap();
184///                 // e.g. column "b" turned out not to be needed.
185///                 let projection = ProjectionMask::columns(builder.parquet_schema(), ["a"]);
186///                 decoder = builder.with_projection(projection).build().unwrap();
187///             }
188///         }
189///         DecodeResult::Finished => break,
190///     }
191/// }
192/// ```
193pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<PushDecoderInput>;
194
195/// The `input` of a [`ParquetPushDecoderBuilder`].
196///
197/// The shared [`ArrowReaderBuilder`] is generic over an `input`. The sync and
198/// async builders read from a file or async reader; the push decoder has no
199/// reader, so its input is the [`PushBuffers`] that caller-pushed bytes
200/// accumulate in (empty for a fresh builder).
201#[derive(Debug, Default)]
202pub struct PushDecoderInput {
203    /// Bytes pushed into the decoder, awaiting decode.
204    buffers: PushBuffers,
205}
206
207/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
208/// more options that can be configured.
209impl ParquetPushDecoderBuilder {
210    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
211    ///
212    /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
213    ///
214    /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
215    ///
216    /// See example on [`ParquetPushDecoderBuilder`]
217    pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
218        Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
219    }
220
221    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
222    /// with the given reader options.
223    ///
224    /// This is similar to [`Self::try_new_decoder`] but allows configuring
225    /// options such as Arrow schema
226    pub fn try_new_decoder_with_options(
227        parquet_metadata: Arc<ParquetMetaData>,
228        arrow_reader_options: ArrowReaderOptions,
229    ) -> Result<Self, ParquetError> {
230        let arrow_reader_metadata =
231            ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
232        Ok(Self::new_with_metadata(arrow_reader_metadata))
233    }
234
235    /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
236    ///
237    /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
238    /// the Parquet metadata and reader options.
239    pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
240        Self::new_builder(PushDecoderInput::default(), arrow_reader_metadata)
241    }
242
243    /// Provide a preexisting [`PushBuffers`] for the built decoder to read
244    /// from, so bytes already fetched are not requested again.
245    pub fn with_buffers(self, buffers: PushBuffers) -> Self {
246        Self {
247            input: PushDecoderInput { buffers },
248            ..self
249        }
250    }
251
252    /// Create a [`ParquetPushDecoder`] with the configured options
253    pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
254        let Self {
255            input: PushDecoderInput { buffers },
256            metadata: parquet_metadata,
257            schema,
258            fields,
259            batch_size,
260            row_groups,
261            projection,
262            filter,
263            selection,
264            limit,
265            offset,
266            metrics,
267            row_selection_policy,
268            max_predicate_cache_size,
269        } = self;
270
271        // If no row groups were specified, read all of them
272        let row_groups =
273            row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
274        let has_predicates = filter
275            .as_ref()
276            .is_some_and(|filter| !filter.predicates.is_empty());
277
278        // Prepare to build RowGroup readers. `buffers` carries any bytes the
279        // caller already pushed (preserved across `into_builder`); a fresh
280        // builder supplies an empty `PushBuffers`.
281        let row_group_reader_builder = RowGroupReaderBuilder::new(
282            batch_size,
283            projection,
284            Arc::clone(&parquet_metadata),
285            fields,
286            filter,
287            metrics,
288            max_predicate_cache_size,
289            buffers,
290            row_selection_policy,
291        );
292
293        // Initialize the decoder with the configured options
294        let remaining_row_groups = RemainingRowGroups::new(
295            schema,
296            parquet_metadata,
297            row_groups,
298            selection,
299            RowBudget::new(offset, limit),
300            has_predicates,
301            row_group_reader_builder,
302        );
303
304        Ok(ParquetPushDecoder {
305            state: ParquetDecoderState::ReadingRowGroup {
306                remaining_row_groups: Box::new(remaining_row_groups),
307            },
308        })
309    }
310}
311
312/// Reassemble a [`ParquetPushDecoderBuilder`] from a decoder's not-yet-decoded
313/// state — the inverse of [`ParquetPushDecoderBuilder::build`]. The rebuilt
314/// builder pins the remaining row groups and carries the remaining row
315/// selection, offset/limit budget, and buffered bytes.
316fn builder_from_remaining(parts: RemainingRowGroupsParts) -> ParquetPushDecoderBuilder {
317    let RemainingRowGroupsParts {
318        metadata,
319        schema,
320        row_groups,
321        selection,
322        offset,
323        limit,
324        reader_builder,
325    } = parts;
326    let RowGroupReaderBuilderParts {
327        batch_size,
328        projection,
329        fields,
330        filter,
331        max_predicate_cache_size,
332        metrics,
333        row_selection_policy,
334        buffers,
335    } = reader_builder;
336
337    ArrowReaderBuilder {
338        input: PushDecoderInput::default(),
339        metadata,
340        schema,
341        fields,
342        batch_size,
343        // The frontier tracks remaining row groups explicitly, so the rebuilt
344        // builder always pins them (even if the original left `row_groups` as
345        // `None` meaning "all").
346        row_groups: Some(row_groups),
347        projection,
348        filter,
349        selection,
350        row_selection_policy,
351        limit,
352        offset,
353        metrics,
354        max_predicate_cache_size,
355    }
356    // Carry the decoder's already-fetched bytes across the rebuild so the new
357    // decoder does not re-request them.
358    .with_buffers(buffers)
359}
360
361/// A push based Parquet Decoder
362///
363/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
364///
365/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
366/// underlying reader for performing IO, and thus offers fine grained control
367/// over how data is fetched and decoded.
368///
369/// When more data is needed to make progress, instead of reading data directly
370/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
371/// are needed. Once the caller provides the requested ranges via
372/// [`Self::push_ranges`], they try to decode again by calling
373/// [`Self::try_decode`].
374///
375/// The decoder's internal state tracks what has been already decoded and what
376/// is needed next.
377#[derive(Debug)]
378pub struct ParquetPushDecoder {
379    /// The inner state.
380    ///
381    /// This state is consumed on every transition and a new state is produced
382    /// so the Rust compiler can ensure that the state is always valid and
383    /// transitions are not missed.
384    state: ParquetDecoderState,
385}
386
387impl ParquetPushDecoder {
388    /// Attempt to decode the next batch of data, or return what data is needed
389    ///
390    /// The the decoder communicates the next state with a [`DecodeResult`]
391    ///
392    /// See full example in [`ParquetPushDecoderBuilder`]
393    ///
394    /// ```no_run
395    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
396    /// use parquet::DecodeResult;
397    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
398    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
399    /// let mut decoder = get_decoder();
400    /// loop {
401    ///    match decoder.try_decode().unwrap() {
402    ///       DecodeResult::NeedsData(ranges) => {
403    ///         // The decoder needs more data. Fetch the data for the given ranges
404    ///         // call decoder.push_ranges(ranges, data) and call again
405    ///         push_data(&mut decoder, ranges);
406    ///       }
407    ///       DecodeResult::Data(batch) => {
408    ///         // Successfully decoded the next batch of data
409    ///         println!("Got batch with {} rows", batch.num_rows());
410    ///       }
411    ///       DecodeResult::Finished => {
412    ///         // The decoder has finished decoding all data
413    ///         break;
414    ///       }
415    ///    }
416    /// }
417    ///```
418    pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
419        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
420        let (new_state, decode_result) = current_state.try_next_batch()?;
421        self.state = new_state;
422        Ok(decode_result)
423    }
424
425    /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
426    /// return what data is needed to produce it.
427    ///
428    /// This API can be used to get a reader for decoding the next set of
429    /// RecordBatches while proceeding to begin fetching data for the set (e.g
430    /// row group)
431    ///
432    /// Example
433    /// ```no_run
434    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
435    /// use parquet::DecodeResult;
436    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
437    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
438    /// let mut decoder = get_decoder();
439    /// loop {
440    ///    match decoder.try_next_reader().unwrap() {
441    ///       DecodeResult::NeedsData(ranges) => {
442    ///         // The decoder needs more data. Fetch the data for the given ranges
443    ///         // call decoder.push_ranges(ranges, data) and call again
444    ///         push_data(&mut decoder, ranges);
445    ///       }
446    ///       DecodeResult::Data(reader) => {
447    ///          // spawn a thread to read the batches in parallel
448    ///          // with fetching the next row group / data
449    ///          std::thread::spawn(move || {
450    ///            for batch in reader {
451    ///              let batch = batch.unwrap();
452    ///              println!("Got batch with {} rows", batch.num_rows());
453    ///            }
454    ///         });
455    ///       }
456    ///       DecodeResult::Finished => {
457    ///         // The decoder has finished decoding all data
458    ///         break;
459    ///       }
460    ///    }
461    /// }
462    ///```
463    pub fn try_next_reader(
464        &mut self,
465    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
466        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
467        let (new_state, decode_result) = current_state.try_next_reader()?;
468        self.state = new_state;
469        Ok(decode_result)
470    }
471
472    /// Push data into the decoder for processing
473    ///
474    /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
475    /// single range of data.
476    ///
477    /// Note this can be the entire file or just a part of it. If it is part of the file,
478    /// the ranges should correspond to the data ranges requested by the decoder.
479    ///
480    /// See example in [`ParquetPushDecoderBuilder`]
481    pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
482        self.push_ranges(vec![range], vec![data])
483    }
484
485    /// Push data into the decoder for processing
486    ///
487    /// This should correspond to the data ranges requested by the decoder
488    pub fn push_ranges(
489        &mut self,
490        ranges: Vec<Range<u64>>,
491        data: Vec<Bytes>,
492    ) -> Result<(), ParquetError> {
493        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
494        self.state = current_state.push_data(ranges, data)?;
495        Ok(())
496    }
497
498    /// Returns the total number of buffered bytes in the decoder
499    ///
500    /// This is the sum of the size of all [`Bytes`] that has been pushed to the
501    /// decoder but not yet consumed.
502    ///
503    /// Note that this does not include any overhead of the internal data
504    /// structures and that since [`Bytes`] are ref counted memory, this may not
505    /// reflect additional memory usage.
506    ///
507    /// This can be used to monitor memory usage of the decoder.
508    pub fn buffered_bytes(&self) -> u64 {
509        self.state.buffered_bytes()
510    }
511
512    /// Clear any staged byte ranges currently buffered for future decode work.
513    ///
514    /// This clears byte ranges still owned by the decoder's internal
515    /// `PushBuffers`. It does not affect any data that has already been handed
516    /// off to an active [`ParquetRecordBatchReader`].
517    pub fn clear_all_ranges(&mut self) {
518        self.state.clear_all_ranges();
519    }
520
521    /// True iff the decoder is at a row-group boundary, where
522    /// [`Self::into_builder`] can reconfigure the scan.
523    ///
524    /// A boundary is "between row groups": the previous row group's
525    /// [`ParquetRecordBatchReader`] has been fully extracted (via
526    /// [`Self::try_next_reader`]) or fully drained (via [`Self::try_decode`]),
527    /// and the next row group has not yet been planned. While
528    /// [`Self::try_decode`] is iterating an active row group's reader this
529    /// returns `false`; with [`Self::try_next_reader`] there is a clean
530    /// window between two consecutive returns where this is `true`.
531    pub fn is_at_row_group_boundary(&self) -> bool {
532        self.state.is_at_row_group_boundary()
533    }
534
535    /// Number of row groups left to decode after the one currently in flight.
536    /// Useful as a "should I bother reconfiguring the scan?" signal.
537    pub fn row_groups_remaining(&self) -> usize {
538        self.state.row_groups_remaining()
539    }
540
541    /// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the
542    /// row groups that have *not* yet been decoded.
543    ///
544    /// This is the API for *adaptive* scans. Drive the decoder with
545    /// [`Self::try_next_reader`]; at any row-group boundary, call
546    /// `into_builder` to recover a builder, adjust it with the usual
547    /// [`ParquetPushDecoderBuilder`] setters, and
548    /// [`build`](ParquetPushDecoderBuilder::build) a fresh decoder that resumes
549    /// from the next row group:
550    ///
551    /// ```no_run
552    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
553    /// # use parquet::arrow::arrow_reader::RowFilter;
554    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
555    /// # fn new_filter() -> RowFilter { unimplemented!() }
556    /// let mut decoder = get_decoder();
557    /// // ... drive `decoder.try_next_reader()` for a few row groups ...
558    /// if decoder.is_at_row_group_boundary() && decoder.row_groups_remaining() > 0 {
559    ///     decoder = decoder
560    ///         .into_builder()
561    ///         .unwrap()
562    ///         // any builder option can be changed here, e.g. promote a
563    ///         // filter into a row filter based on observed selectivity
564    ///         .with_row_filter(new_filter())
565    ///         .build()
566    ///         .unwrap();
567    /// }
568    /// ```
569    ///
570    /// The returned builder pins the not-yet-decoded row groups (via
571    /// [`with_row_groups`](ArrowReaderBuilder::with_row_groups)) and carries the
572    /// not-yet-consumed row selection and offset/limit budget, so rows from
573    /// already-decoded row groups are not produced again. Every other option —
574    /// projection, row filter, row selection policy, batch size, metrics,
575    /// predicate-cache size — is left exactly as the decoder had it and can be
576    /// overridden before [`build`](ParquetPushDecoderBuilder::build).
577    ///
578    /// # Errors
579    ///
580    /// Returns `Err(ParquetError::General)` when the decoder is not at a
581    /// row-group boundary (check [`Self::is_at_row_group_boundary`] first) or
582    /// has already finished. The decoder is consumed either way.
583    ///
584    /// # Buffered bytes
585    ///
586    /// The decoder's buffered bytes are carried across the rebuild: bytes
587    /// already fetched for row groups the new configuration still reads are
588    /// not re-requested. Bytes the new configuration no longer needs stay
589    /// buffered until [`clear_all_ranges`](Self::clear_all_ranges) is called
590    /// or the rebuilt decoder is dropped.
591    pub fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError> {
592        self.state.into_builder()
593    }
594}
595
596/// Internal state machine for the [`ParquetPushDecoder`]
597#[derive(Debug)]
598enum ParquetDecoderState {
599    /// Waiting for data needed to decode the next RowGroup
600    ReadingRowGroup {
601        remaining_row_groups: Box<RemainingRowGroups>,
602    },
603    /// The decoder is actively decoding a RowGroup
604    DecodingRowGroup {
605        /// Current active reader
606        record_batch_reader: Box<ParquetRecordBatchReader>,
607        remaining_row_groups: Box<RemainingRowGroups>,
608    },
609    /// The decoder has finished processing all data
610    Finished,
611}
612
613impl ParquetDecoderState {
614    /// If actively reading a RowGroup, return the currently active
615    /// ParquetRecordBatchReader and advance to the next group.
616    fn try_next_reader(
617        self,
618    ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
619        let mut current_state = self;
620        loop {
621            let (next_state, decode_result) = current_state.transition()?;
622            // if more data is needed to transition, can't proceed further without it
623            match decode_result {
624                DecodeResult::NeedsData(ranges) => {
625                    return Ok((next_state, DecodeResult::NeedsData(ranges)));
626                }
627                // act next based on state
628                DecodeResult::Data(()) | DecodeResult::Finished => {}
629            }
630            match next_state {
631                // not ready to read yet, continue transitioning
632                Self::ReadingRowGroup { .. } => current_state = next_state,
633                // have a reader ready, so return it and set ourself to ReadingRowGroup
634                Self::DecodingRowGroup {
635                    record_batch_reader,
636                    remaining_row_groups,
637                } => {
638                    let result = DecodeResult::Data(*record_batch_reader);
639                    let next_state = Self::ReadingRowGroup {
640                        remaining_row_groups,
641                    };
642                    return Ok((next_state, result));
643                }
644                Self::Finished => {
645                    return Ok((Self::Finished, DecodeResult::Finished));
646                }
647            }
648        }
649    }
650
651    /// Current state --> next state + output
652    ///
653    /// This function is called to get the next RecordBatch
654    ///
655    /// This structure is used to reduce the indentation level of the main loop
656    /// in try_build
657    fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
658        let mut current_state = self;
659        loop {
660            let (new_state, decode_result) = current_state.transition()?;
661            // if more data is needed to transition, can't proceed further without it
662            match decode_result {
663                DecodeResult::NeedsData(ranges) => {
664                    return Ok((new_state, DecodeResult::NeedsData(ranges)));
665                }
666                // act next based on state
667                DecodeResult::Data(()) | DecodeResult::Finished => {}
668            }
669            match new_state {
670                // not ready to read yet, continue transitioning
671                Self::ReadingRowGroup { .. } => current_state = new_state,
672                // have a reader ready, so decode the next batch
673                Self::DecodingRowGroup {
674                    mut record_batch_reader,
675                    remaining_row_groups,
676                } => {
677                    match record_batch_reader.next() {
678                        // Successfully decoded a batch, return it
679                        Some(Ok(batch)) => {
680                            let result = DecodeResult::Data(batch);
681                            let next_state = Self::DecodingRowGroup {
682                                record_batch_reader,
683                                remaining_row_groups,
684                            };
685                            return Ok((next_state, result));
686                        }
687                        // No more batches in this row group, move to the next row group
688                        None => {
689                            current_state = Self::ReadingRowGroup {
690                                remaining_row_groups,
691                            }
692                        }
693                        // some error occurred while decoding, so return that
694                        Some(Err(e)) => {
695                            // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
696                            return Err(ParquetError::ArrowError(e.to_string()));
697                        }
698                    }
699                }
700                Self::Finished => {
701                    return Ok((Self::Finished, DecodeResult::Finished));
702                }
703            }
704        }
705    }
706
707    /// Transition to the next state with a reader (data can be produced), if not end of stream
708    ///
709    /// This function is called in a loop until the decoder is ready to return
710    /// data (has the required pages buffered) or is finished.
711    fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
712        // result returned when there is data ready
713        let data_ready = DecodeResult::Data(());
714        match self {
715            Self::ReadingRowGroup {
716                mut remaining_row_groups,
717            } => {
718                match remaining_row_groups.try_next_reader()? {
719                    // If we have a next reader, we can transition to decoding it
720                    DecodeResult::Data(record_batch_reader) => {
721                        // Transition to decoding the row group
722                        Ok((
723                            Self::DecodingRowGroup {
724                                record_batch_reader: Box::new(record_batch_reader),
725                                remaining_row_groups,
726                            },
727                            data_ready,
728                        ))
729                    }
730                    DecodeResult::NeedsData(ranges) => {
731                        // If we need more data, we return the ranges needed and stay in Reading
732                        // RowGroup state
733                        Ok((
734                            Self::ReadingRowGroup {
735                                remaining_row_groups,
736                            },
737                            DecodeResult::NeedsData(ranges),
738                        ))
739                    }
740                    // If there are no more readers, we are finished
741                    DecodeResult::Finished => {
742                        // No more row groups to read, we are finished
743                        Ok((Self::Finished, DecodeResult::Finished))
744                    }
745                }
746            }
747            // if we are already in DecodingRowGroup, just return data ready
748            Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
749            // if finished, just return finished
750            Self::Finished => Ok((self, DecodeResult::Finished)),
751        }
752    }
753
754    /// Push data, and transition state if needed
755    ///
756    /// This should correspond to the data ranges requested by the decoder
757    pub fn push_data(
758        self,
759        ranges: Vec<Range<u64>>,
760        data: Vec<Bytes>,
761    ) -> Result<Self, ParquetError> {
762        match self {
763            ParquetDecoderState::ReadingRowGroup {
764                mut remaining_row_groups,
765            } => {
766                // Push data to the RowGroupReaderBuilder
767                remaining_row_groups.push_data(ranges, data);
768                Ok(ParquetDecoderState::ReadingRowGroup {
769                    remaining_row_groups,
770                })
771            }
772            // it is ok to get data before we asked for it
773            ParquetDecoderState::DecodingRowGroup {
774                record_batch_reader,
775                mut remaining_row_groups,
776            } => {
777                remaining_row_groups.push_data(ranges, data);
778                Ok(ParquetDecoderState::DecodingRowGroup {
779                    record_batch_reader,
780                    remaining_row_groups,
781                })
782            }
783            ParquetDecoderState::Finished => Err(ParquetError::General(
784                "Cannot push data to a finished decoder".to_string(),
785            )),
786        }
787    }
788
789    /// How many bytes are currently buffered in the decoder?
790    fn buffered_bytes(&self) -> u64 {
791        match self {
792            ParquetDecoderState::ReadingRowGroup {
793                remaining_row_groups,
794            } => remaining_row_groups.buffered_bytes(),
795            ParquetDecoderState::DecodingRowGroup {
796                record_batch_reader: _,
797                remaining_row_groups,
798            } => remaining_row_groups.buffered_bytes(),
799            ParquetDecoderState::Finished => 0,
800        }
801    }
802
803    /// Clear any staged ranges currently buffered in the decoder.
804    fn clear_all_ranges(&mut self) {
805        match self {
806            ParquetDecoderState::ReadingRowGroup {
807                remaining_row_groups,
808            } => remaining_row_groups.clear_all_ranges(),
809            ParquetDecoderState::DecodingRowGroup {
810                record_batch_reader: _,
811                remaining_row_groups,
812            } => remaining_row_groups.clear_all_ranges(),
813            ParquetDecoderState::Finished => {}
814        }
815    }
816
817    fn is_at_row_group_boundary(&self) -> bool {
818        match self {
819            ParquetDecoderState::ReadingRowGroup {
820                remaining_row_groups,
821            } => remaining_row_groups.is_at_row_group_boundary(),
822            // Mid-row-group: the active reader holds an `ArrayReader` and
823            // `ReadPlan` keyed to the *current* projection/filter; rebuilding
824            // would require throwing that work away.
825            ParquetDecoderState::DecodingRowGroup { .. } => false,
826            ParquetDecoderState::Finished => false,
827        }
828    }
829
830    fn row_groups_remaining(&self) -> usize {
831        match self {
832            ParquetDecoderState::ReadingRowGroup {
833                remaining_row_groups,
834            } => remaining_row_groups.row_groups_remaining(),
835            ParquetDecoderState::DecodingRowGroup {
836                remaining_row_groups,
837                ..
838            } => remaining_row_groups.row_groups_remaining(),
839            ParquetDecoderState::Finished => 0,
840        }
841    }
842
843    fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError> {
844        let remaining_row_groups = match self {
845            ParquetDecoderState::ReadingRowGroup {
846                remaining_row_groups,
847            } => remaining_row_groups,
848            ParquetDecoderState::DecodingRowGroup { .. } => {
849                return Err(ParquetError::General(
850                    "into_builder called while a row group is being decoded; \
851                     check is_at_row_group_boundary() first"
852                        .to_string(),
853                ));
854            }
855            ParquetDecoderState::Finished => {
856                return Err(ParquetError::General(
857                    "into_builder called on a finished decoder".to_string(),
858                ));
859            }
860        };
861        if !remaining_row_groups.is_at_row_group_boundary() {
862            return Err(ParquetError::General(
863                "into_builder called mid-row-group; check is_at_row_group_boundary() first"
864                    .to_string(),
865            ));
866        }
867        Ok(builder_from_remaining(remaining_row_groups.into_parts()))
868    }
869}
870
871#[cfg(test)]
872mod test {
873    use super::*;
874    use crate::DecodeResult;
875    use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
876    use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
877    use crate::arrow::{ArrowWriter, ProjectionMask};
878    use crate::errors::ParquetError;
879    use crate::file::metadata::ParquetMetaDataPushDecoder;
880    use crate::file::properties::WriterProperties;
881    use arrow::compute::kernels::cmp::{gt, lt};
882    use arrow_array::cast::AsArray;
883    use arrow_array::types::Int64Type;
884    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
885    use arrow_select::concat::concat_batches;
886    use bytes::Bytes;
887    use std::fmt::Debug;
888    use std::ops::Range;
889    use std::sync::{Arc, LazyLock};
890
891    /// Test decoder struct size (as they are copied around on each transition, they
892    /// should not grow too large)
893    #[test]
894    fn test_decoder_size() {
895        assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
896    }
897
898    /// Decode the entire file at once, simulating a scenario where all data is
899    /// available in memory
900    #[test]
901    fn test_decoder_all_data() {
902        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
903            .unwrap()
904            .build()
905            .unwrap();
906
907        decoder
908            .push_range(test_file_range(), TEST_FILE_DATA.clone())
909            .unwrap();
910
911        let results = vec![
912            // first row group should be decoded without needing more data
913            expect_data(decoder.try_decode()),
914            // second row group should be decoded without needing more data
915            expect_data(decoder.try_decode()),
916        ];
917        expect_finished(decoder.try_decode());
918
919        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
920        // Check that the output matches the input batch
921        assert_eq!(all_output, *TEST_BATCH);
922    }
923
924    /// Decode the entire file incrementally, simulating a scenario where data is
925    /// fetched as needed
926    #[test]
927    fn test_decoder_incremental() {
928        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
929            .unwrap()
930            .build()
931            .unwrap();
932
933        let mut results = vec![];
934
935        // First row group, expect a single request
936        let ranges = expect_needs_data(decoder.try_decode());
937        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
938        push_ranges_to_decoder(&mut decoder, ranges);
939        // The decoder should currently only store the data it needs to decode the first row group
940        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
941        results.push(expect_data(decoder.try_decode()));
942        // the decoder should have consumed the data for the first row group and freed it
943        assert_eq!(decoder.buffered_bytes(), 0);
944
945        // Second row group,
946        let ranges = expect_needs_data(decoder.try_decode());
947        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
948        push_ranges_to_decoder(&mut decoder, ranges);
949        // The decoder should currently only store the data it needs to decode the second row group
950        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
951        results.push(expect_data(decoder.try_decode()));
952        // the decoder should have consumed the data for the second row group and freed it
953        assert_eq!(decoder.buffered_bytes(), 0);
954        expect_finished(decoder.try_decode());
955
956        // Check that the output matches the input batch
957        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
958        assert_eq!(all_output, *TEST_BATCH);
959    }
960
961    /// Releasing staged ranges should free speculative buffers without affecting
962    /// the active row group reader.
963    #[test]
964    fn test_decoder_clear_all_ranges() {
965        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
966            .unwrap()
967            .with_batch_size(100)
968            .build()
969            .unwrap();
970
971        decoder
972            .push_range(test_file_range(), TEST_FILE_DATA.clone())
973            .unwrap();
974        assert_eq!(decoder.buffered_bytes(), test_file_len());
975
976        // The current row group reader is built from the prefetched bytes, but
977        // the speculative full-file range remains staged in the decoder.
978        let batch1 = expect_data(decoder.try_decode());
979        assert_eq!(batch1, TEST_BATCH.slice(0, 100));
980        assert_eq!(decoder.buffered_bytes(), test_file_len());
981
982        // All of the buffer is released
983        decoder.clear_all_ranges();
984        assert_eq!(decoder.buffered_bytes(), 0);
985
986        // The active reader still owns the current row group's bytes, so it can
987        // continue decoding without consulting PushBuffers.
988        let batch2 = expect_data(decoder.try_decode());
989        assert_eq!(batch2, TEST_BATCH.slice(100, 100));
990        assert_eq!(decoder.buffered_bytes(), 0);
991
992        // Moving to the next row group now requires the decoder to ask for data
993        // again because the staged speculative ranges were released.
994        let ranges = expect_needs_data(decoder.try_decode());
995        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
996        push_ranges_to_decoder(&mut decoder, ranges);
997        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
998
999        let batch3 = expect_data(decoder.try_decode());
1000        assert_eq!(batch3, TEST_BATCH.slice(200, 100));
1001        assert_eq!(decoder.buffered_bytes(), 0);
1002
1003        let batch4 = expect_data(decoder.try_decode());
1004        assert_eq!(batch4, TEST_BATCH.slice(300, 100));
1005        assert_eq!(decoder.buffered_bytes(), 0);
1006
1007        expect_finished(decoder.try_decode());
1008    }
1009
1010    /// Decode the entire file incrementally, simulating partial reads
1011    #[test]
1012    fn test_decoder_partial() {
1013        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1014            .unwrap()
1015            .build()
1016            .unwrap();
1017
1018        // First row group, expect a single request for all data needed to read "a" and "b"
1019        let ranges = expect_needs_data(decoder.try_decode());
1020        push_ranges_to_decoder(&mut decoder, ranges);
1021
1022        let batch1 = expect_data(decoder.try_decode());
1023        let expected1 = TEST_BATCH.slice(0, 200);
1024        assert_eq!(batch1, expected1);
1025
1026        // Second row group, this time provide the data in two steps
1027        let ranges = expect_needs_data(decoder.try_decode());
1028        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
1029        assert!(!ranges1.is_empty());
1030        assert!(!ranges2.is_empty());
1031        // push first half to simulate partial read
1032        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
1033
1034        // still expect more data
1035        let ranges = expect_needs_data(decoder.try_decode());
1036        assert_eq!(ranges, ranges2); // should be the remaining ranges
1037        // push empty ranges should be a no-op
1038        push_ranges_to_decoder(&mut decoder, vec![]);
1039        let ranges = expect_needs_data(decoder.try_decode());
1040        assert_eq!(ranges, ranges2); // should be the remaining ranges
1041        push_ranges_to_decoder(&mut decoder, ranges);
1042
1043        let batch2 = expect_data(decoder.try_decode());
1044        let expected2 = TEST_BATCH.slice(200, 200);
1045        assert_eq!(batch2, expected2);
1046
1047        expect_finished(decoder.try_decode());
1048    }
1049
1050    /// Decode multiple columns "a" and "b", expect that the decoder requests
1051    /// only a single request per row group
1052    #[test]
1053    fn test_decoder_selection_does_one_request() {
1054        let builder =
1055            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1056
1057        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1058
1059        let mut decoder = builder
1060            .with_projection(
1061                ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
1062            )
1063            .build()
1064            .unwrap();
1065
1066        // First row group, expect a single request for all data needed to read "a" and "b"
1067        let ranges = expect_needs_data(decoder.try_decode());
1068        push_ranges_to_decoder(&mut decoder, ranges);
1069
1070        let batch1 = expect_data(decoder.try_decode());
1071        let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
1072        assert_eq!(batch1, expected1);
1073
1074        // Second row group, similarly expect a single request for all data needed to read "a" and "b"
1075        let ranges = expect_needs_data(decoder.try_decode());
1076        push_ranges_to_decoder(&mut decoder, ranges);
1077
1078        let batch2 = expect_data(decoder.try_decode());
1079        let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
1080        assert_eq!(batch2, expected2);
1081
1082        expect_finished(decoder.try_decode());
1083    }
1084
1085    /// Decode with a filter that requires multiple requests, but only provide part
1086    /// of the data needed for the filter at a time simulating partial reads.
1087    #[test]
1088    fn test_decoder_single_filter_partial() {
1089        let builder =
1090            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1091
1092        // Values in column "a" range 0..399
1093        // First filter: "a" > 250  (nothing in Row Group 0, both data pages in Row Group 1)
1094        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1095
1096        // a > 250
1097        let row_filter_a = ArrowPredicateFn::new(
1098            // claim to use both a and b so we get two ranges requests for the filter pages
1099            ProjectionMask::columns(&schema_descr, ["a", "b"]),
1100            |batch: RecordBatch| {
1101                let scalar_250 = Int64Array::new_scalar(250);
1102                let column = batch.column(0).as_primitive::<Int64Type>();
1103                gt(column, &scalar_250)
1104            },
1105        );
1106
1107        let mut decoder = builder
1108            .with_projection(
1109                // read only column "a" to test that filter pages are reused
1110                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1111            )
1112            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1113            .build()
1114            .unwrap();
1115
1116        // First row group, evaluating filters
1117        let ranges = expect_needs_data(decoder.try_decode());
1118        // only provide half the ranges
1119        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
1120        assert!(!ranges1.is_empty());
1121        assert!(!ranges2.is_empty());
1122        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
1123        // still expect more data
1124        let ranges = expect_needs_data(decoder.try_decode());
1125        assert_eq!(ranges, ranges2); // should be the remaining ranges
1126        let ranges = expect_needs_data(decoder.try_decode());
1127        assert_eq!(ranges, ranges2); // should be the remaining ranges
1128        push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
1129
1130        // Since no rows in the first row group pass the filters, there is no
1131        // additional requests to read data pages for "b" here
1132
1133        // Second row group
1134        let ranges = expect_needs_data(decoder.try_decode());
1135        push_ranges_to_decoder(&mut decoder, ranges);
1136
1137        let batch = expect_data(decoder.try_decode());
1138        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1139        assert_eq!(batch, expected);
1140
1141        expect_finished(decoder.try_decode());
1142    }
1143
1144    /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
1145    #[test]
1146    fn test_decoder_single_filter_and_row_selection() {
1147        let builder =
1148            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1149
1150        // Values in column "a" range 0..399
1151        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
1152        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1153
1154        // a > 250
1155        let row_filter_a = ArrowPredicateFn::new(
1156            ProjectionMask::columns(&schema_descr, ["a"]),
1157            |batch: RecordBatch| {
1158                let scalar_250 = Int64Array::new_scalar(250);
1159                let column = batch.column(0).as_primitive::<Int64Type>();
1160                gt(column, &scalar_250)
1161            },
1162        );
1163
1164        let mut decoder = builder
1165            .with_projection(
1166                // read only column "a" to test that filter pages are reused
1167                ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
1168            )
1169            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1170            .with_row_selection(RowSelection::from(vec![
1171                RowSelector::skip(200),   // skip first row group
1172                RowSelector::select(100), // first 100 rows of second row group
1173                RowSelector::skip(100),
1174            ]))
1175            .build()
1176            .unwrap();
1177
1178        // expect the first row group to be filtered out (no filter is evaluated due to row selection)
1179
1180        // First row group, first filter (a > 250)
1181        let ranges = expect_needs_data(decoder.try_decode());
1182        push_ranges_to_decoder(&mut decoder, ranges);
1183
1184        // Second row group
1185        let ranges = expect_needs_data(decoder.try_decode());
1186        push_ranges_to_decoder(&mut decoder, ranges);
1187
1188        let batch = expect_data(decoder.try_decode());
1189        let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
1190        assert_eq!(batch, expected);
1191
1192        expect_finished(decoder.try_decode());
1193    }
1194
1195    /// Decode with multiple filters that require multiple requests
1196    #[test]
1197    fn test_decoder_multi_filters() {
1198        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1199        let builder =
1200            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1201
1202        // Values in column "a" range 0..399
1203        // Values in column "b" range 400..799
1204        // First filter: "a" > 175  (last data page in Row Group 0)
1205        // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
1206        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1207
1208        // a > 175
1209        let row_filter_a = ArrowPredicateFn::new(
1210            ProjectionMask::columns(&schema_descr, ["a"]),
1211            |batch: RecordBatch| {
1212                let scalar_175 = Int64Array::new_scalar(175);
1213                let column = batch.column(0).as_primitive::<Int64Type>();
1214                gt(column, &scalar_175)
1215            },
1216        );
1217
1218        // b < 625
1219        let row_filter_b = ArrowPredicateFn::new(
1220            ProjectionMask::columns(&schema_descr, ["b"]),
1221            |batch: RecordBatch| {
1222                let scalar_625 = Int64Array::new_scalar(625);
1223                let column = batch.column(0).as_primitive::<Int64Type>();
1224                lt(column, &scalar_625)
1225            },
1226        );
1227
1228        let mut decoder = builder
1229            .with_projection(
1230                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1231            )
1232            .with_row_filter(RowFilter::new(vec![
1233                Box::new(row_filter_a),
1234                Box::new(row_filter_b),
1235            ]))
1236            .build()
1237            .unwrap();
1238
1239        // First row group, first filter (a > 175)
1240        let ranges = expect_needs_data(decoder.try_decode());
1241        push_ranges_to_decoder(&mut decoder, ranges);
1242
1243        // first row group, second filter (b < 625)
1244        let ranges = expect_needs_data(decoder.try_decode());
1245        push_ranges_to_decoder(&mut decoder, ranges);
1246
1247        // first row group, data pages for "c"
1248        let ranges = expect_needs_data(decoder.try_decode());
1249        push_ranges_to_decoder(&mut decoder, ranges);
1250
1251        // expect the first batch to be decoded: rows 176..199, column "c"
1252        let batch1 = expect_data(decoder.try_decode());
1253        let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
1254        assert_eq!(batch1, expected1);
1255
1256        // Second row group, first filter (a > 175)
1257        let ranges = expect_needs_data(decoder.try_decode());
1258        push_ranges_to_decoder(&mut decoder, ranges);
1259
1260        // Second row group, second filter (b < 625)
1261        let ranges = expect_needs_data(decoder.try_decode());
1262        push_ranges_to_decoder(&mut decoder, ranges);
1263
1264        // Second row group, data pages for "c"
1265        let ranges = expect_needs_data(decoder.try_decode());
1266        push_ranges_to_decoder(&mut decoder, ranges);
1267
1268        // expect the second batch to be decoded: rows 200..224, column "c"
1269        let batch2 = expect_data(decoder.try_decode());
1270        let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
1271        assert_eq!(batch2, expected2);
1272
1273        expect_finished(decoder.try_decode());
1274    }
1275
1276    /// Decode with a filter that uses a column that is also projected, and expect
1277    /// that the filter pages are reused (don't refetch them)
1278    #[test]
1279    fn test_decoder_reuses_filter_pages() {
1280        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1281        let builder =
1282            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1283
1284        // Values in column "a" range 0..399
1285        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
1286        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1287
1288        // a > 250
1289        let row_filter_a = ArrowPredicateFn::new(
1290            ProjectionMask::columns(&schema_descr, ["a"]),
1291            |batch: RecordBatch| {
1292                let scalar_250 = Int64Array::new_scalar(250);
1293                let column = batch.column(0).as_primitive::<Int64Type>();
1294                gt(column, &scalar_250)
1295            },
1296        );
1297
1298        let mut decoder = builder
1299            .with_projection(
1300                // read only column "a" to test that filter pages are reused
1301                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1302            )
1303            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1304            .build()
1305            .unwrap();
1306
1307        // First row group, first filter (a > 175)
1308        let ranges = expect_needs_data(decoder.try_decode());
1309        push_ranges_to_decoder(&mut decoder, ranges);
1310
1311        // expect the first row group to be filtered out (no rows match)
1312
1313        // Second row group, first filter (a > 250)
1314        let ranges = expect_needs_data(decoder.try_decode());
1315        push_ranges_to_decoder(&mut decoder, ranges);
1316
1317        // expect that the second row group is decoded: rows 251..399, column "a"
1318        // Note that the filter pages for "a" should be reused and no additional data
1319        // should be requested
1320        let batch = expect_data(decoder.try_decode());
1321        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1322        assert_eq!(batch, expected);
1323
1324        expect_finished(decoder.try_decode());
1325    }
1326
1327    #[test]
1328    fn test_decoder_empty_filters() {
1329        let builder =
1330            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1331        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1332
1333        // only read column "c", but with empty filters
1334        let mut decoder = builder
1335            .with_projection(
1336                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1337            )
1338            .with_row_filter(RowFilter::new(vec![
1339                // empty filters should be ignored
1340            ]))
1341            .build()
1342            .unwrap();
1343
1344        // First row group
1345        let ranges = expect_needs_data(decoder.try_decode());
1346        push_ranges_to_decoder(&mut decoder, ranges);
1347
1348        // expect the first batch to be decoded: rows 0..199, column "c"
1349        let batch1 = expect_data(decoder.try_decode());
1350        let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1351        assert_eq!(batch1, expected1);
1352
1353        // Second row group,
1354        let ranges = expect_needs_data(decoder.try_decode());
1355        push_ranges_to_decoder(&mut decoder, ranges);
1356
1357        // expect the second batch to be decoded: rows 200..399, column "c"
1358        let batch2 = expect_data(decoder.try_decode());
1359        let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1360
1361        assert_eq!(batch2, expected2);
1362
1363        expect_finished(decoder.try_decode());
1364    }
1365
1366    /// When filter pushdown is combined with a `LIMIT`, the predicate must
1367    /// not be evaluated for rows beyond the `limit`-th match.
1368    ///
1369    /// Filter `a > 175` produces 24 matches in row group 0 (rows 176..199).
1370    /// With `limit = 10`, only the first 10 matches (rows 176..185) should be
1371    /// emitted, AND the predicate counter should observe that evaluation was
1372    /// short-circuited.
1373    #[test]
1374    fn test_decoder_filter_with_limit_short_circuits_within_row_group() {
1375        use std::sync::atomic::{AtomicUsize, Ordering};
1376
1377        let builder =
1378            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1379        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1380
1381        let rows_filtered = Arc::new(AtomicUsize::new(0));
1382        let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1383
1384        let row_filter_a = ArrowPredicateFn::new(
1385            ProjectionMask::columns(&schema_descr, ["a"]),
1386            move |batch: RecordBatch| {
1387                rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1388                let scalar_175 = Int64Array::new_scalar(175);
1389                let column = batch.column(0).as_primitive::<Int64Type>();
1390                gt(column, &scalar_175)
1391            },
1392        );
1393
1394        // Use a small batch size so the row group is evaluated across
1395        // multiple predicate batches; that is the regime where Layer 2's
1396        // short-circuit saves predicate evaluation work. Matching rows are
1397        // 176..199 (24 rows); with batch_size = 10 those span batches 17, 18,
1398        // and 19 (rows 170..199). A limit of 10 should stop filter evaluation
1399        // in the middle of batch 18.
1400        let mut decoder = builder
1401            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1402            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1403            .with_batch_size(10)
1404            .with_limit(10)
1405            .build()
1406            .unwrap();
1407
1408        // First row group: filter columns fetch (predicate is evaluated here)
1409        let ranges = expect_needs_data(decoder.try_decode());
1410        push_ranges_to_decoder(&mut decoder, ranges);
1411
1412        // The first 10 matching rows come out: 176..185, column "a"
1413        let batch = expect_data(decoder.try_decode());
1414        let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1415        assert_eq!(batch, expected);
1416
1417        // no data for row group 1 should be requested — the limit
1418        // was satisfied by row group 0 and the `Start` state for row group 1
1419        // short-circuits to `Finished`.
1420        expect_finished(decoder.try_decode());
1421
1422        // Row 186 is the 11th match; the scan should stop no later than the
1423        // batch containing it (batch 18 of 10 rows = rows 180..189), so at
1424        // most 190 rows are evaluated.
1425        let evaluated = rows_filtered.load(Ordering::Relaxed);
1426        assert!(
1427            evaluated <= 190,
1428            "predicate evaluated {evaluated} rows; expected ≤ 190 (stop within batch containing 11th match)"
1429        );
1430    }
1431
1432    /// Once the limit has been satisfied by a prior row group, subsequent
1433    /// row groups should be skipped entirely — no data request for their
1434    /// filter columns.
1435    #[test]
1436    fn test_decoder_filter_with_limit_skips_later_row_groups() {
1437        let builder =
1438            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1439        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1440
1441        // `a > 175` matches rows 176..199 in row group 0 (24 matches) and
1442        // 200..399 in row group 1 (200 matches). With limit = 5, all matches
1443        // should come from row group 0.
1444        let row_filter_a = ArrowPredicateFn::new(
1445            ProjectionMask::columns(&schema_descr, ["a"]),
1446            |batch: RecordBatch| {
1447                let scalar_175 = Int64Array::new_scalar(175);
1448                let column = batch.column(0).as_primitive::<Int64Type>();
1449                gt(column, &scalar_175)
1450            },
1451        );
1452
1453        let mut decoder = builder
1454            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1455            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1456            .with_limit(5)
1457            .build()
1458            .unwrap();
1459
1460        // Row group 0: fetch filter pages
1461        let ranges = expect_needs_data(decoder.try_decode());
1462        push_ranges_to_decoder(&mut decoder, ranges);
1463
1464        // First 5 matches: 176..180
1465        let batch = expect_data(decoder.try_decode());
1466        let expected = TEST_BATCH.slice(176, 5).project(&[0]).unwrap();
1467        assert_eq!(batch, expected);
1468
1469        // Row group 1 must NOT request data — the limit is already satisfied
1470        // so `Start` in row group 1 short-circuits to `Finished`.
1471        expect_finished(decoder.try_decode());
1472    }
1473
1474    /// The predicate short-circuit must account for `self.offset` as well as
1475    /// `self.limit`. The post-predicate `with_offset` step skips that many
1476    /// already-selected rows before `with_limit` counts output rows — so the
1477    /// predicate must retain at least `offset + limit` matches. Without the
1478    /// fix, Layer 2 caps at just `limit` and the later `with_offset` consumes
1479    /// all of them, producing 0 rows instead of `limit`.
1480    ///
1481    /// `a > 175` matches rows 176..199 in row group 0 (24 matches). With
1482    /// `offset = 10, limit = 5`, the expected output is rows 186..190 (the
1483    /// 11th through 15th matches).
1484    #[test]
1485    fn test_decoder_filter_with_offset_and_limit() {
1486        let builder =
1487            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1488        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1489
1490        let row_filter_a = ArrowPredicateFn::new(
1491            ProjectionMask::columns(&schema_descr, ["a"]),
1492            |batch: RecordBatch| {
1493                let scalar_175 = Int64Array::new_scalar(175);
1494                let column = batch.column(0).as_primitive::<Int64Type>();
1495                gt(column, &scalar_175)
1496            },
1497        );
1498
1499        let mut decoder = builder
1500            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1501            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1502            .with_offset(10)
1503            .with_limit(5)
1504            .build()
1505            .unwrap();
1506
1507        let ranges = expect_needs_data(decoder.try_decode());
1508        push_ranges_to_decoder(&mut decoder, ranges);
1509
1510        let batch = expect_data(decoder.try_decode());
1511        let expected = TEST_BATCH.slice(186, 5).project(&[0]).unwrap();
1512        assert_eq!(batch, expected);
1513
1514        expect_finished(decoder.try_decode());
1515    }
1516
1517    /// The limit short-circuit must also be correct when the limited predicate
1518    /// is the last predicate in a multi-predicate chain.
1519    ///
1520    /// `a > 175` first narrows row group 0 to rows 176..199. The final
1521    /// predicate `b < 625` is then evaluated only over those 24 rows, all of
1522    /// which match. With `limit = 10`, the final output should still be rows
1523    /// 176..185, and the second predicate should stop before consuming all 24
1524    /// selected rows.
1525    #[test]
1526    fn test_decoder_multi_filters_with_limit() {
1527        use std::sync::atomic::{AtomicUsize, Ordering};
1528
1529        let builder =
1530            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1531        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1532
1533        let first_predicate_rows = Arc::new(AtomicUsize::new(0));
1534        let second_predicate_rows = Arc::new(AtomicUsize::new(0));
1535
1536        let first_predicate_rows_for_filter = Arc::clone(&first_predicate_rows);
1537        let row_filter_a = ArrowPredicateFn::new(
1538            ProjectionMask::columns(&schema_descr, ["a"]),
1539            move |batch: RecordBatch| {
1540                first_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1541                let scalar_175 = Int64Array::new_scalar(175);
1542                let column = batch.column(0).as_primitive::<Int64Type>();
1543                gt(column, &scalar_175)
1544            },
1545        );
1546
1547        let second_predicate_rows_for_filter = Arc::clone(&second_predicate_rows);
1548        let row_filter_b = ArrowPredicateFn::new(
1549            ProjectionMask::columns(&schema_descr, ["b"]),
1550            move |batch: RecordBatch| {
1551                second_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1552                let scalar_625 = Int64Array::new_scalar(625);
1553                let column = batch.column(0).as_primitive::<Int64Type>();
1554                lt(column, &scalar_625)
1555            },
1556        );
1557
1558        let mut decoder = builder
1559            .with_projection(ProjectionMask::columns(&schema_descr, ["c"]))
1560            .with_row_filter(RowFilter::new(vec![
1561                Box::new(row_filter_a),
1562                Box::new(row_filter_b),
1563            ]))
1564            .with_batch_size(10)
1565            .with_limit(10)
1566            .build()
1567            .unwrap();
1568
1569        // Row group 0, first predicate
1570        let ranges = expect_needs_data(decoder.try_decode());
1571        push_ranges_to_decoder(&mut decoder, ranges);
1572
1573        // Row group 0, second predicate
1574        let ranges = expect_needs_data(decoder.try_decode());
1575        push_ranges_to_decoder(&mut decoder, ranges);
1576
1577        // Final projected data
1578        let ranges = expect_needs_data(decoder.try_decode());
1579        push_ranges_to_decoder(&mut decoder, ranges);
1580
1581        let batch = expect_data(decoder.try_decode());
1582        let expected = TEST_BATCH.slice(176, 10).project(&[2]).unwrap();
1583        assert_eq!(batch, expected);
1584
1585        // The overall limit was satisfied by row group 0.
1586        expect_finished(decoder.try_decode());
1587
1588        assert_eq!(first_predicate_rows.load(Ordering::Relaxed), 200);
1589        assert!(
1590            second_predicate_rows.load(Ordering::Relaxed) < 24,
1591            "final predicate should short-circuit before consuming all 24 rows selected by the first predicate"
1592        );
1593    }
1594
1595    /// When a row selection already exists, limiting the predicate must still
1596    /// preserve alignment with that prior selection.
1597    ///
1598    /// The explicit selection narrows row group 0 to rows 150..199. Applying
1599    /// `a > 175` over that selection yields rows 176..199. With `limit = 10`,
1600    /// the decoder should emit rows 176..185 and stop without evaluating the
1601    /// remaining selected rows.
1602    #[test]
1603    fn test_decoder_filter_with_row_selection_and_limit() {
1604        use std::sync::atomic::{AtomicUsize, Ordering};
1605
1606        let builder =
1607            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1608        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1609
1610        let rows_filtered = Arc::new(AtomicUsize::new(0));
1611        let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1612
1613        let row_filter_a = ArrowPredicateFn::new(
1614            ProjectionMask::columns(&schema_descr, ["a"]),
1615            move |batch: RecordBatch| {
1616                rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1617                let scalar_175 = Int64Array::new_scalar(175);
1618                let column = batch.column(0).as_primitive::<Int64Type>();
1619                gt(column, &scalar_175)
1620            },
1621        );
1622
1623        let mut decoder = builder
1624            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1625            .with_row_selection(RowSelection::from(vec![
1626                RowSelector::skip(150),
1627                RowSelector::select(50),
1628            ]))
1629            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1630            .with_batch_size(10)
1631            .with_limit(10)
1632            .build()
1633            .unwrap();
1634
1635        let ranges = expect_needs_data(decoder.try_decode());
1636        push_ranges_to_decoder(&mut decoder, ranges);
1637
1638        let batch = expect_data(decoder.try_decode());
1639        let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1640        assert_eq!(batch, expected);
1641
1642        expect_finished(decoder.try_decode());
1643
1644        assert!(
1645            rows_filtered.load(Ordering::Relaxed) < 50,
1646            "predicate should short-circuit before consuming all 50 rows from the explicit row selection"
1647        );
1648    }
1649
1650    #[test]
1651    fn test_decoder_offset_limit() {
1652        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1653            .unwrap()
1654            // skip entire first row group (200 rows) and first 25 rows of second row group
1655            .with_offset(225)
1656            // and limit to 20 rows
1657            .with_limit(20)
1658            .build()
1659            .unwrap();
1660
1661        // First row group should be skipped,
1662
1663        // Second row group
1664        let ranges = expect_needs_data(decoder.try_decode());
1665        push_ranges_to_decoder(&mut decoder, ranges);
1666
1667        // expect the first and only batch to be decoded
1668        let batch1 = expect_data(decoder.try_decode());
1669        let expected1 = TEST_BATCH.slice(225, 20);
1670        assert_eq!(batch1, expected1);
1671
1672        expect_finished(decoder.try_decode());
1673    }
1674
1675    #[test]
1676    fn test_decoder_try_next_reader_offset_limit() {
1677        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1678            .unwrap()
1679            .with_offset(225)
1680            .with_limit(20)
1681            .build()
1682            .unwrap();
1683
1684        let ranges = expect_needs_data(decoder.try_next_reader());
1685        push_ranges_to_decoder(&mut decoder, ranges);
1686
1687        let reader = expect_data(decoder.try_next_reader());
1688        let batches = reader
1689            .map(|batch| batch.expect("expected decoded batch"))
1690            .collect::<Vec<_>>();
1691        let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
1692        assert_eq!(output, TEST_BATCH.slice(225, 20));
1693
1694        expect_finished(decoder.try_next_reader());
1695    }
1696
1697    #[test]
1698    fn test_decoder_row_group_selection() {
1699        // take only the second row group
1700        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1701            .unwrap()
1702            .with_row_groups(vec![1])
1703            .build()
1704            .unwrap();
1705
1706        // First row group should be skipped,
1707
1708        // Second row group
1709        let ranges = expect_needs_data(decoder.try_decode());
1710        push_ranges_to_decoder(&mut decoder, ranges);
1711
1712        // expect the first and only batch to be decoded
1713        let batch1 = expect_data(decoder.try_decode());
1714        let expected1 = TEST_BATCH.slice(200, 200);
1715        assert_eq!(batch1, expected1);
1716
1717        expect_finished(decoder.try_decode());
1718    }
1719
1720    #[test]
1721    fn test_decoder_row_selection() {
1722        // take only the second row group
1723        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1724            .unwrap()
1725            .with_row_selection(RowSelection::from(vec![
1726                RowSelector::skip(225),  // skip first row group and 25 rows of second])
1727                RowSelector::select(20), // take 20 rows
1728            ]))
1729            .build()
1730            .unwrap();
1731
1732        // First row group should be skipped,
1733
1734        // Second row group
1735        let ranges = expect_needs_data(decoder.try_decode());
1736        push_ranges_to_decoder(&mut decoder, ranges);
1737
1738        // expect the first ane only batch to be decoded
1739        let batch1 = expect_data(decoder.try_decode());
1740        let expected1 = TEST_BATCH.slice(225, 20);
1741        assert_eq!(batch1, expected1);
1742
1743        expect_finished(decoder.try_decode());
1744    }
1745
1746    /// `into_builder` between row groups recovers a builder for the
1747    /// not-yet-decoded row groups; rebuilding it with a new row filter
1748    /// applies that filter to the subsequent row groups while leaving the
1749    /// already-decoded row group's results untouched.
1750    ///
1751    /// See the "Adaptive scans" section of [`ParquetPushDecoderBuilder`] for
1752    /// the high-level overview.
1753    #[test]
1754    fn test_into_builder_installs_filter_between_row_groups() {
1755        let schema_descr = test_file_parquet_metadata()
1756            .file_metadata()
1757            .schema_descr_ptr();
1758        let mut decoder = prefetched_decoder(1024);
1759
1760        // Reader for row group 0 — no filter.
1761        let reader0 = expect_data(decoder.try_next_reader());
1762        let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1763        let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
1764        assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1765
1766        // We're between row groups now. Rebuild with a filter on column "a".
1767        assert!(decoder.is_at_row_group_boundary());
1768        assert_eq!(decoder.row_groups_remaining(), 1);
1769        let filter =
1770            ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr, ["a"]), |batch| {
1771                gt(batch.column(0), &Int64Array::new_scalar(250))
1772            });
1773        let mut decoder = decoder
1774            .into_builder()
1775            .unwrap()
1776            .with_row_filter(RowFilter::new(vec![Box::new(filter)]))
1777            .build()
1778            .unwrap();
1779
1780        // Reader for row group 1 — filter applied. The rebuilt decoder kept
1781        // the buffered bytes (see `test_into_builder_preserves_buffered_bytes`)
1782        // so no data needs to be re-supplied. Column "a" in RG1 has values
1783        // 200..399; `a > 250` keeps 251..399 = 149 rows.
1784        let reader1 = expect_data(decoder.try_next_reader());
1785        let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1786        let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1787        assert_eq!(batch1, TEST_BATCH.slice(251, 149));
1788        expect_finished(decoder.try_next_reader());
1789    }
1790
1791    /// `into_builder` is rejected while a row group's reader is being
1792    /// drained (`DecodingRowGroup`); the error points at
1793    /// `is_at_row_group_boundary`.
1794    #[test]
1795    fn test_into_builder_rejected_mid_row_group() {
1796        let mut decoder = prefetched_decoder(50);
1797
1798        // Decode one batch to land mid-row-group, inside `DecodingRowGroup`
1799        // with an active reader — not a boundary.
1800        expect_data(decoder.try_decode());
1801        assert!(!decoder.is_at_row_group_boundary());
1802
1803        let err = decoder.into_builder().unwrap_err();
1804        let err_msg = format!("{err}");
1805        assert!(
1806            err_msg.contains("is_at_row_group_boundary"),
1807            "unexpected error: {err_msg}"
1808        );
1809    }
1810
1811    /// `into_builder` is rejected once the decoder has finished.
1812    #[test]
1813    fn test_into_builder_rejected_on_finished_decoder() {
1814        let mut decoder = prefetched_decoder(1024);
1815        expect_data(decoder.try_decode());
1816        expect_data(decoder.try_decode());
1817        expect_finished(decoder.try_decode());
1818        assert!(!decoder.is_at_row_group_boundary());
1819
1820        let err = decoder.into_builder().unwrap_err();
1821        assert!(
1822            format!("{err}").contains("finished"),
1823            "unexpected error: {err}"
1824        );
1825    }
1826
1827    /// `try_next_reader` hands the active reader off to the caller and
1828    /// transitions the decoder back to `ReadingRowGroup` — so the caller
1829    /// can call `into_builder` even while still holding the returned
1830    /// reader. (The handed-off reader has no link back to the decoder's
1831    /// projection/filter; it has its own `ArrayReader` and `ReadPlan`.)
1832    #[test]
1833    fn test_into_builder_allowed_while_iterating_handed_off_reader() {
1834        let mut decoder = prefetched_decoder(1024);
1835
1836        let reader0 = expect_data(decoder.try_next_reader());
1837        // Decoder no longer owns the reader, so it considers itself
1838        // "between row groups".
1839        assert!(decoder.is_at_row_group_boundary());
1840        // Recovering the builder consumes the decoder but leaves `reader0`
1841        // valid: iterating it is independent of the decoder's state.
1842        let _builder = decoder.into_builder().unwrap();
1843        let batches: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1844        let batch0 = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
1845        assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1846    }
1847
1848    /// `into_builder` recovers a builder for the *remaining* row groups and
1849    /// carries the not-yet-consumed offset/limit budget, so a rebuilt
1850    /// decoder resumes where the original left off rather than restarting.
1851    #[test]
1852    fn test_into_builder_resumes_remaining_budget() {
1853        // limit = 250 spans both 200-row row groups: all 200 rows of RG0
1854        // plus the first 50 rows of RG1.
1855        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1856            .unwrap()
1857            .with_batch_size(1024)
1858            .with_limit(250)
1859            .build()
1860            .unwrap();
1861        prefetch_test_file(&mut decoder);
1862
1863        // RG0 contributes all 200 of its rows.
1864        let reader0 = expect_data(decoder.try_next_reader());
1865        let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1866        let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
1867        assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1868
1869        // Rebuild without changing anything: the remaining 50-row limit and
1870        // the not-yet-decoded RG1 must carry through (as do the buffers, so
1871        // no data needs re-supplying).
1872        assert!(decoder.is_at_row_group_boundary());
1873        let mut decoder = decoder.into_builder().unwrap().build().unwrap();
1874
1875        let reader1 = expect_data(decoder.try_next_reader());
1876        let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1877        let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1878        // Only the first 50 rows of RG1 (200..249) — the rest of the limit.
1879        assert_eq!(batch1, TEST_BATCH.slice(200, 50));
1880        expect_finished(decoder.try_next_reader());
1881    }
1882
1883    /// `into_builder` carries the decoder's buffered bytes across the
1884    /// rebuild: the rebuilt decoder keeps them and does not re-request data
1885    /// it already holds.
1886    #[test]
1887    fn test_into_builder_preserves_buffered_bytes() {
1888        let mut decoder = prefetched_decoder(1024);
1889        assert_eq!(decoder.buffered_bytes(), test_file_len());
1890
1891        // Drain RG0.
1892        let reader0 = expect_data(decoder.try_next_reader());
1893        let _: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1894        // RG1's bytes are still staged inside the decoder.
1895        let buffered = decoder.buffered_bytes();
1896        assert!(buffered > 0);
1897
1898        // Rebuilding via into_builder keeps the staged bytes.
1899        let mut decoder = decoder.into_builder().unwrap().build().unwrap();
1900        assert_eq!(decoder.buffered_bytes(), buffered);
1901
1902        // RG1's bytes are already buffered, so it decodes without a
1903        // `NeedsData` round-trip.
1904        let reader1 = expect_data(decoder.try_next_reader());
1905        let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1906        let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1907        assert_eq!(batch1, TEST_BATCH.slice(200, 200));
1908        expect_finished(decoder.try_next_reader());
1909    }
1910
1911    /// Drive the decoder incrementally. Start with a narrow projection,
1912    /// drain RG0, then `into_builder` and widen the projection to all three
1913    /// columns. The rebuilt decoder's `NeedsData` for RG1 must request
1914    /// bytes for *all three* columns, not just the originally-projected
1915    /// "a". The expected ranges are hardcoded because `TEST_BATCH` and the
1916    /// writer settings are static; this pins the layout cleanly without a
1917    /// parallel reference decoder.
1918    #[test]
1919    fn test_into_builder_expand_projection_requests_new_bytes() {
1920        let metadata = test_file_parquet_metadata();
1921        let schema_descr = metadata.file_metadata().schema_descr_ptr();
1922
1923        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
1924            .unwrap()
1925            .with_batch_size(1024)
1926            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1927            .build()
1928            .unwrap();
1929
1930        // RG0: incrementally satisfy the narrow request — a single
1931        // contiguous range for the "a"-only projection.
1932        let ranges_rg0 = expect_needs_data(decoder.try_next_reader());
1933        assert_eq!(ranges_rg0, vec![4..1860]);
1934        push_ranges_to_decoder(&mut decoder, ranges_rg0);
1935
1936        let reader0 = expect_data(decoder.try_next_reader());
1937        let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1938        let batch0 = concat_batches(&batches0[0].schema(), &batches0).unwrap();
1939        assert_eq!(batch0, TEST_BATCH.slice(0, 200).project(&[0]).unwrap());
1940
1941        // Widen the projection at the boundary.
1942        assert!(decoder.is_at_row_group_boundary());
1943        let mut decoder = decoder
1944            .into_builder()
1945            .unwrap()
1946            .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b", "c"]))
1947            .build()
1948            .unwrap();
1949
1950        // RG1 now requests "a", "b", and "c" column chunks: ~1.8KiB each
1951        // for "a" and "b", ~7.5KiB for the StringView column "c".
1952        let ranges_rg1 = expect_needs_data(decoder.try_next_reader());
1953        assert_eq!(ranges_rg1, vec![11062..12918, 12918..14774, 14774..22230]);
1954        push_ranges_to_decoder(&mut decoder, ranges_rg1);
1955
1956        let reader1 = expect_data(decoder.try_next_reader());
1957        let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
1958        let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
1959        assert_eq!(batch1, TEST_BATCH.slice(200, 200));
1960        expect_finished(decoder.try_next_reader());
1961    }
1962
1963    /// Mirror of [`test_into_builder_expand_projection_requests_new_bytes`]:
1964    /// start with the full projection, drain RG0, then `into_builder` and
1965    /// narrow the projection to just column "a". RG1's `NeedsData` must
1966    /// request only the single "a" column-chunk range, not the three a
1967    /// wide projection would.
1968    #[test]
1969    fn test_into_builder_narrow_projection_requests_fewer_bytes() {
1970        let metadata = test_file_parquet_metadata();
1971        let schema_descr = metadata.file_metadata().schema_descr_ptr();
1972
1973        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
1974            .unwrap()
1975            .with_batch_size(1024)
1976            .build()
1977            .unwrap();
1978
1979        // RG0 with the default (full) projection — three column ranges.
1980        let ranges_rg0 = expect_needs_data(decoder.try_next_reader());
1981        assert_eq!(ranges_rg0, vec![4..1860, 1860..3716, 3716..11062]);
1982        push_ranges_to_decoder(&mut decoder, ranges_rg0);
1983
1984        let reader0 = expect_data(decoder.try_next_reader());
1985        let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
1986        let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
1987        assert_eq!(batch0, TEST_BATCH.slice(0, 200));
1988
1989        // Narrow the projection at the boundary.
1990        assert!(decoder.is_at_row_group_boundary());
1991        let mut decoder = decoder
1992            .into_builder()
1993            .unwrap()
1994            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1995            .build()
1996            .unwrap();
1997
1998        // RG1 now requests column "a" only — a single 1856-byte range.
1999        let ranges_rg1 = expect_needs_data(decoder.try_next_reader());
2000        assert_eq!(ranges_rg1, vec![11062..12918]);
2001        push_ranges_to_decoder(&mut decoder, ranges_rg1);
2002
2003        let reader1 = expect_data(decoder.try_next_reader());
2004        let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
2005        let batch1 = concat_batches(&batches1[0].schema(), &batches1).unwrap();
2006        assert_eq!(batch1, TEST_BATCH.slice(200, 200).project(&[0]).unwrap());
2007        expect_finished(decoder.try_next_reader());
2008    }
2009
2010    /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
2011    ///
2012    /// Note c is a different types (so the data page sizes will be different)
2013    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
2014        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
2015        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
2016        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
2017            if i % 2 == 0 {
2018                format!("string_{i}")
2019            } else {
2020                format!("A string larger than 12 bytes and thus not inlined {i}")
2021            }
2022        })));
2023
2024        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
2025    });
2026
2027    /// Create a parquet file in memory for testing.
2028    ///
2029    /// See [`TEST_BATCH`] for the data in the file.
2030    ///
2031    /// Each column is written in 4 data pages, each with 100 rows, across 2
2032    /// row groups. Each column in each row group has two data pages.
2033    ///
2034    /// The data is split across row groups like this
2035    ///
2036    /// Column |   Values                | Data Page | Row Group
2037    /// -------|------------------------|-----------|-----------
2038    /// a      | 0..99                  | 1         | 0
2039    /// a      | 100..199               | 2         | 0
2040    /// a      | 200..299               | 1         | 1
2041    /// a      | 300..399               | 2         | 1
2042    ///
2043    /// b      | 400..499               | 1         | 0
2044    /// b      | 500..599               | 2         | 0
2045    /// b      | 600..699               | 1         | 1
2046    /// b      | 700..799               | 2         | 1
2047    ///
2048    /// c      | "string_0".."string_99"        | 1         | 0
2049    /// c      | "string_100".."string_199"     | 2         | 0
2050    /// c      | "string_200".."string_299"     | 1         | 1
2051    /// c      | "string_300".."string_399"     | 2         | 1
2052    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
2053        let input_batch = &TEST_BATCH;
2054        let mut output = Vec::new();
2055
2056        let writer_options = WriterProperties::builder()
2057            .set_max_row_group_row_count(Some(200))
2058            .set_data_page_row_count_limit(100)
2059            .build();
2060        let mut writer =
2061            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
2062
2063        // since the limits are only enforced on batch boundaries, write the input
2064        // batch in chunks of 50
2065        let mut row_remain = input_batch.num_rows();
2066        while row_remain > 0 {
2067            let chunk_size = row_remain.min(50);
2068            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
2069            writer.write(&chunk).unwrap();
2070            row_remain -= chunk_size;
2071        }
2072        writer.close().unwrap();
2073        Bytes::from(output)
2074    });
2075
2076    /// Return the length of [`TEST_FILE_DATA`], in bytes
2077    fn test_file_len() -> u64 {
2078        TEST_FILE_DATA.len() as u64
2079    }
2080
2081    /// Return a range that covers the entire [`TEST_FILE_DATA`]
2082    fn test_file_range() -> Range<u64> {
2083        0..test_file_len()
2084    }
2085
2086    /// Return a slice of the test file data from the given range
2087    pub fn test_file_slice(range: Range<u64>) -> Bytes {
2088        let start: usize = range.start.try_into().unwrap();
2089        let end: usize = range.end.try_into().unwrap();
2090        TEST_FILE_DATA.slice(start..end)
2091    }
2092
2093    /// return the metadata for the test file
2094    pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
2095        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
2096        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
2097        let metadata = metadata_decoder.try_decode().unwrap();
2098        let DecodeResult::Data(metadata) = metadata else {
2099            panic!("Expected metadata to be decoded successfully");
2100        };
2101        Arc::new(metadata)
2102    }
2103
2104    /// Push the given ranges to the metadata decoder, simulating reading from a file
2105    fn push_ranges_to_metadata_decoder(
2106        metadata_decoder: &mut ParquetMetaDataPushDecoder,
2107        ranges: Vec<Range<u64>>,
2108    ) {
2109        let data = ranges
2110            .iter()
2111            .map(|range| test_file_slice(range.clone()))
2112            .collect::<Vec<_>>();
2113        metadata_decoder.push_ranges(ranges, data).unwrap();
2114    }
2115
2116    /// Push the entire test file into `decoder`.
2117    fn prefetch_test_file(decoder: &mut ParquetPushDecoder) {
2118        decoder
2119            .push_range(test_file_range(), TEST_FILE_DATA.clone())
2120            .unwrap();
2121    }
2122
2123    /// Build a decoder over the test file with the given batch size and
2124    /// prefetch the whole file into it.
2125    fn prefetched_decoder(batch_size: usize) -> ParquetPushDecoder {
2126        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
2127            .unwrap()
2128            .with_batch_size(batch_size)
2129            .build()
2130            .unwrap();
2131        prefetch_test_file(&mut decoder);
2132        decoder
2133    }
2134
2135    fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
2136        let data = ranges
2137            .iter()
2138            .map(|range| test_file_slice(range.clone()))
2139            .collect::<Vec<_>>();
2140        decoder.push_ranges(ranges, data).unwrap();
2141    }
2142
2143    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
2144    fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
2145        match result.expect("Expected Ok(DecodeResult::Data(T))") {
2146            DecodeResult::Data(data) => data,
2147            result => panic!("Expected DecodeResult::Data, got {result:?}"),
2148        }
2149    }
2150
2151    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
2152    fn expect_needs_data<T: Debug>(
2153        result: Result<DecodeResult<T>, ParquetError>,
2154    ) -> Vec<Range<u64>> {
2155        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
2156            DecodeResult::NeedsData(ranges) => ranges,
2157            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
2158        }
2159    }
2160
2161    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
2162        match result.expect("Expected Ok(DecodeResult::Finished)") {
2163            DecodeResult::Finished => {}
2164            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
2165        }
2166    }
2167}