Skip to main content

parquet/arrow/push_decoder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::{RowBudget, RowGroupReaderBuilder};
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// #   let mut buffer = vec![];
64/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// #   writer.write(&batch).unwrap();
67/// #   writer.close().unwrap();
68/// #   Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// #    let start = range.start as usize;
73/// #     let end = range.end as usize;
74/// #    file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83///     ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84///       .unwrap()
85///       // Optionally configure the decoder, e.g. batch size
86///       .with_batch_size(1024)
87///       // Build the decoder
88///       .build()
89///       .unwrap();
90///
91///     // In a loop, ask the decoder what it needs next, and provide it with the required data
92///     loop {
93///         match decoder.try_decode().unwrap() {
94///             DecodeResult::NeedsData(ranges) => {
95///                 // The decoder needs more data. Fetch the data for the given ranges
96///                 let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97///                 // Push the data to the decoder
98///                 decoder.push_ranges(ranges, data).unwrap();
99///                 // After pushing the data, we can try to decode again on the next iteration
100///             }
101///             DecodeResult::Data(batch) => {
102///                 // Successfully decoded a batch of data
103///                 assert!(batch.num_rows() > 0);
104///             }
105///             DecodeResult::Finished => {
106///                 // The decoder has finished decoding exit the loop
107///                 break;
108///             }
109///         }
110///     }
111/// ```
112pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
113
114/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
115///
116/// There is no "input" for the push decoder by design (the idea is that
117/// the caller pushes data to the decoder as needed)..
118///
119/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
120/// which DO have an `input`. To support reusing the same builder code for
121/// all three types of decoders, we define this `NoInput` for the push decoder to
122/// denote in the type system there is no type.
123#[derive(Debug, Clone, Copy)]
124pub struct NoInput;
125
126/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
127/// more options that can be configured.
128impl ParquetPushDecoderBuilder {
129    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
130    ///
131    /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
132    ///
133    /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
134    ///
135    /// See example on [`ParquetPushDecoderBuilder`]
136    pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
137        Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
138    }
139
140    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
141    /// with the given reader options.
142    ///
143    /// This is similar to [`Self::try_new_decoder`] but allows configuring
144    /// options such as Arrow schema
145    pub fn try_new_decoder_with_options(
146        parquet_metadata: Arc<ParquetMetaData>,
147        arrow_reader_options: ArrowReaderOptions,
148    ) -> Result<Self, ParquetError> {
149        let arrow_reader_metadata =
150            ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
151        Ok(Self::new_with_metadata(arrow_reader_metadata))
152    }
153
154    /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
155    ///
156    /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
157    /// the Parquet metadata and reader options.
158    pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
159        Self::new_builder(NoInput, arrow_reader_metadata)
160    }
161
162    /// Create a [`ParquetPushDecoder`] with the configured options
163    pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
164        let Self {
165            input: NoInput,
166            metadata: parquet_metadata,
167            schema: _,
168            fields,
169            batch_size,
170            row_groups,
171            projection,
172            filter,
173            selection,
174            limit,
175            offset,
176            metrics,
177            row_selection_policy,
178            max_predicate_cache_size,
179        } = self;
180
181        // If no row groups were specified, read all of them
182        let row_groups =
183            row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
184        let has_predicates = filter
185            .as_ref()
186            .is_some_and(|filter| !filter.predicates.is_empty());
187
188        // Prepare to build RowGroup readers
189        let file_len = 0; // not used in push decoder
190        let buffers = PushBuffers::new(file_len);
191        let row_group_reader_builder = RowGroupReaderBuilder::new(
192            batch_size,
193            projection,
194            Arc::clone(&parquet_metadata),
195            fields,
196            filter,
197            metrics,
198            max_predicate_cache_size,
199            buffers,
200            row_selection_policy,
201        );
202
203        // Initialize the decoder with the configured options
204        let remaining_row_groups = RemainingRowGroups::new(
205            parquet_metadata,
206            row_groups,
207            selection,
208            RowBudget::new(offset, limit),
209            has_predicates,
210            row_group_reader_builder,
211        );
212
213        Ok(ParquetPushDecoder {
214            state: ParquetDecoderState::ReadingRowGroup {
215                remaining_row_groups: Box::new(remaining_row_groups),
216            },
217        })
218    }
219}
220
221/// A push based Parquet Decoder
222///
223/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
224///
225/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
226/// underlying reader for performing IO, and thus offers fine grained control
227/// over how data is fetched and decoded.
228///
229/// When more data is needed to make progress, instead of reading data directly
230/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
231/// are needed. Once the caller provides the requested ranges via
232/// [`Self::push_ranges`], they try to decode again by calling
233/// [`Self::try_decode`].
234///
235/// The decoder's internal state tracks what has been already decoded and what
236/// is needed next.
237#[derive(Debug)]
238pub struct ParquetPushDecoder {
239    /// The inner state.
240    ///
241    /// This state is consumed on every transition and a new state is produced
242    /// so the Rust compiler can ensure that the state is always valid and
243    /// transitions are not missed.
244    state: ParquetDecoderState,
245}
246
247impl ParquetPushDecoder {
248    /// Attempt to decode the next batch of data, or return what data is needed
249    ///
250    /// The the decoder communicates the next state with a [`DecodeResult`]
251    ///
252    /// See full example in [`ParquetPushDecoderBuilder`]
253    ///
254    /// ```no_run
255    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
256    /// use parquet::DecodeResult;
257    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
258    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
259    /// let mut decoder = get_decoder();
260    /// loop {
261    ///    match decoder.try_decode().unwrap() {
262    ///       DecodeResult::NeedsData(ranges) => {
263    ///         // The decoder needs more data. Fetch the data for the given ranges
264    ///         // call decoder.push_ranges(ranges, data) and call again
265    ///         push_data(&mut decoder, ranges);
266    ///       }
267    ///       DecodeResult::Data(batch) => {
268    ///         // Successfully decoded the next batch of data
269    ///         println!("Got batch with {} rows", batch.num_rows());
270    ///       }
271    ///       DecodeResult::Finished => {
272    ///         // The decoder has finished decoding all data
273    ///         break;
274    ///       }
275    ///    }
276    /// }
277    ///```
278    pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
279        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
280        let (new_state, decode_result) = current_state.try_next_batch()?;
281        self.state = new_state;
282        Ok(decode_result)
283    }
284
285    /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
286    /// return what data is needed to produce it.
287    ///
288    /// This API can be used to get a reader for decoding the next set of
289    /// RecordBatches while proceeding to begin fetching data for the set (e.g
290    /// row group)
291    ///
292    /// Example
293    /// ```no_run
294    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
295    /// use parquet::DecodeResult;
296    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
297    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
298    /// let mut decoder = get_decoder();
299    /// loop {
300    ///    match decoder.try_next_reader().unwrap() {
301    ///       DecodeResult::NeedsData(ranges) => {
302    ///         // The decoder needs more data. Fetch the data for the given ranges
303    ///         // call decoder.push_ranges(ranges, data) and call again
304    ///         push_data(&mut decoder, ranges);
305    ///       }
306    ///       DecodeResult::Data(reader) => {
307    ///          // spawn a thread to read the batches in parallel
308    ///          // with fetching the next row group / data
309    ///          std::thread::spawn(move || {
310    ///            for batch in reader {
311    ///              let batch = batch.unwrap();
312    ///              println!("Got batch with {} rows", batch.num_rows());
313    ///            }
314    ///         });
315    ///       }
316    ///       DecodeResult::Finished => {
317    ///         // The decoder has finished decoding all data
318    ///         break;
319    ///       }
320    ///    }
321    /// }
322    ///```
323    pub fn try_next_reader(
324        &mut self,
325    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
326        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
327        let (new_state, decode_result) = current_state.try_next_reader()?;
328        self.state = new_state;
329        Ok(decode_result)
330    }
331
332    /// Push data into the decoder for processing
333    ///
334    /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
335    /// single range of data.
336    ///
337    /// Note this can be the entire file or just a part of it. If it is part of the file,
338    /// the ranges should correspond to the data ranges requested by the decoder.
339    ///
340    /// See example in [`ParquetPushDecoderBuilder`]
341    pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
342        self.push_ranges(vec![range], vec![data])
343    }
344
345    /// Push data into the decoder for processing
346    ///
347    /// This should correspond to the data ranges requested by the decoder
348    pub fn push_ranges(
349        &mut self,
350        ranges: Vec<Range<u64>>,
351        data: Vec<Bytes>,
352    ) -> Result<(), ParquetError> {
353        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
354        self.state = current_state.push_data(ranges, data)?;
355        Ok(())
356    }
357
358    /// Returns the total number of buffered bytes in the decoder
359    ///
360    /// This is the sum of the size of all [`Bytes`] that has been pushed to the
361    /// decoder but not yet consumed.
362    ///
363    /// Note that this does not include any overhead of the internal data
364    /// structures and that since [`Bytes`] are ref counted memory, this may not
365    /// reflect additional memory usage.
366    ///
367    /// This can be used to monitor memory usage of the decoder.
368    pub fn buffered_bytes(&self) -> u64 {
369        self.state.buffered_bytes()
370    }
371
372    /// Clear any staged byte ranges currently buffered for future decode work.
373    ///
374    /// This clears byte ranges still owned by the decoder's internal
375    /// `PushBuffers`. It does not affect any data that has already been handed
376    /// off to an active [`ParquetRecordBatchReader`].
377    pub fn clear_all_ranges(&mut self) {
378        self.state.clear_all_ranges();
379    }
380}
381
382/// Internal state machine for the [`ParquetPushDecoder`]
383#[derive(Debug)]
384enum ParquetDecoderState {
385    /// Waiting for data needed to decode the next RowGroup
386    ReadingRowGroup {
387        remaining_row_groups: Box<RemainingRowGroups>,
388    },
389    /// The decoder is actively decoding a RowGroup
390    DecodingRowGroup {
391        /// Current active reader
392        record_batch_reader: Box<ParquetRecordBatchReader>,
393        remaining_row_groups: Box<RemainingRowGroups>,
394    },
395    /// The decoder has finished processing all data
396    Finished,
397}
398
399impl ParquetDecoderState {
400    /// If actively reading a RowGroup, return the currently active
401    /// ParquetRecordBatchReader and advance to the next group.
402    fn try_next_reader(
403        self,
404    ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
405        let mut current_state = self;
406        loop {
407            let (next_state, decode_result) = current_state.transition()?;
408            // if more data is needed to transition, can't proceed further without it
409            match decode_result {
410                DecodeResult::NeedsData(ranges) => {
411                    return Ok((next_state, DecodeResult::NeedsData(ranges)));
412                }
413                // act next based on state
414                DecodeResult::Data(()) | DecodeResult::Finished => {}
415            }
416            match next_state {
417                // not ready to read yet, continue transitioning
418                Self::ReadingRowGroup { .. } => current_state = next_state,
419                // have a reader ready, so return it and set ourself to ReadingRowGroup
420                Self::DecodingRowGroup {
421                    record_batch_reader,
422                    remaining_row_groups,
423                } => {
424                    let result = DecodeResult::Data(*record_batch_reader);
425                    let next_state = Self::ReadingRowGroup {
426                        remaining_row_groups,
427                    };
428                    return Ok((next_state, result));
429                }
430                Self::Finished => {
431                    return Ok((Self::Finished, DecodeResult::Finished));
432                }
433            }
434        }
435    }
436
437    /// Current state --> next state + output
438    ///
439    /// This function is called to get the next RecordBatch
440    ///
441    /// This structure is used to reduce the indentation level of the main loop
442    /// in try_build
443    fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
444        let mut current_state = self;
445        loop {
446            let (new_state, decode_result) = current_state.transition()?;
447            // if more data is needed to transition, can't proceed further without it
448            match decode_result {
449                DecodeResult::NeedsData(ranges) => {
450                    return Ok((new_state, DecodeResult::NeedsData(ranges)));
451                }
452                // act next based on state
453                DecodeResult::Data(()) | DecodeResult::Finished => {}
454            }
455            match new_state {
456                // not ready to read yet, continue transitioning
457                Self::ReadingRowGroup { .. } => current_state = new_state,
458                // have a reader ready, so decode the next batch
459                Self::DecodingRowGroup {
460                    mut record_batch_reader,
461                    remaining_row_groups,
462                } => {
463                    match record_batch_reader.next() {
464                        // Successfully decoded a batch, return it
465                        Some(Ok(batch)) => {
466                            let result = DecodeResult::Data(batch);
467                            let next_state = Self::DecodingRowGroup {
468                                record_batch_reader,
469                                remaining_row_groups,
470                            };
471                            return Ok((next_state, result));
472                        }
473                        // No more batches in this row group, move to the next row group
474                        None => {
475                            current_state = Self::ReadingRowGroup {
476                                remaining_row_groups,
477                            }
478                        }
479                        // some error occurred while decoding, so return that
480                        Some(Err(e)) => {
481                            // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
482                            return Err(ParquetError::ArrowError(e.to_string()));
483                        }
484                    }
485                }
486                Self::Finished => {
487                    return Ok((Self::Finished, DecodeResult::Finished));
488                }
489            }
490        }
491    }
492
493    /// Transition to the next state with a reader (data can be produced), if not end of stream
494    ///
495    /// This function is called in a loop until the decoder is ready to return
496    /// data (has the required pages buffered) or is finished.
497    fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
498        // result returned when there is data ready
499        let data_ready = DecodeResult::Data(());
500        match self {
501            Self::ReadingRowGroup {
502                mut remaining_row_groups,
503            } => {
504                match remaining_row_groups.try_next_reader()? {
505                    // If we have a next reader, we can transition to decoding it
506                    DecodeResult::Data(record_batch_reader) => {
507                        // Transition to decoding the row group
508                        Ok((
509                            Self::DecodingRowGroup {
510                                record_batch_reader: Box::new(record_batch_reader),
511                                remaining_row_groups,
512                            },
513                            data_ready,
514                        ))
515                    }
516                    DecodeResult::NeedsData(ranges) => {
517                        // If we need more data, we return the ranges needed and stay in Reading
518                        // RowGroup state
519                        Ok((
520                            Self::ReadingRowGroup {
521                                remaining_row_groups,
522                            },
523                            DecodeResult::NeedsData(ranges),
524                        ))
525                    }
526                    // If there are no more readers, we are finished
527                    DecodeResult::Finished => {
528                        // No more row groups to read, we are finished
529                        Ok((Self::Finished, DecodeResult::Finished))
530                    }
531                }
532            }
533            // if we are already in DecodingRowGroup, just return data ready
534            Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
535            // if finished, just return finished
536            Self::Finished => Ok((self, DecodeResult::Finished)),
537        }
538    }
539
540    /// Push data, and transition state if needed
541    ///
542    /// This should correspond to the data ranges requested by the decoder
543    pub fn push_data(
544        self,
545        ranges: Vec<Range<u64>>,
546        data: Vec<Bytes>,
547    ) -> Result<Self, ParquetError> {
548        match self {
549            ParquetDecoderState::ReadingRowGroup {
550                mut remaining_row_groups,
551            } => {
552                // Push data to the RowGroupReaderBuilder
553                remaining_row_groups.push_data(ranges, data);
554                Ok(ParquetDecoderState::ReadingRowGroup {
555                    remaining_row_groups,
556                })
557            }
558            // it is ok to get data before we asked for it
559            ParquetDecoderState::DecodingRowGroup {
560                record_batch_reader,
561                mut remaining_row_groups,
562            } => {
563                remaining_row_groups.push_data(ranges, data);
564                Ok(ParquetDecoderState::DecodingRowGroup {
565                    record_batch_reader,
566                    remaining_row_groups,
567                })
568            }
569            ParquetDecoderState::Finished => Err(ParquetError::General(
570                "Cannot push data to a finished decoder".to_string(),
571            )),
572        }
573    }
574
575    /// How many bytes are currently buffered in the decoder?
576    fn buffered_bytes(&self) -> u64 {
577        match self {
578            ParquetDecoderState::ReadingRowGroup {
579                remaining_row_groups,
580            } => remaining_row_groups.buffered_bytes(),
581            ParquetDecoderState::DecodingRowGroup {
582                record_batch_reader: _,
583                remaining_row_groups,
584            } => remaining_row_groups.buffered_bytes(),
585            ParquetDecoderState::Finished => 0,
586        }
587    }
588
589    /// Clear any staged ranges currently buffered in the decoder.
590    fn clear_all_ranges(&mut self) {
591        match self {
592            ParquetDecoderState::ReadingRowGroup {
593                remaining_row_groups,
594            } => remaining_row_groups.clear_all_ranges(),
595            ParquetDecoderState::DecodingRowGroup {
596                record_batch_reader: _,
597                remaining_row_groups,
598            } => remaining_row_groups.clear_all_ranges(),
599            ParquetDecoderState::Finished => {}
600        }
601    }
602}
603
604#[cfg(test)]
605mod test {
606    use super::*;
607    use crate::DecodeResult;
608    use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
609    use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
610    use crate::arrow::{ArrowWriter, ProjectionMask};
611    use crate::errors::ParquetError;
612    use crate::file::metadata::ParquetMetaDataPushDecoder;
613    use crate::file::properties::WriterProperties;
614    use arrow::compute::kernels::cmp::{gt, lt};
615    use arrow_array::cast::AsArray;
616    use arrow_array::types::Int64Type;
617    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
618    use arrow_select::concat::concat_batches;
619    use bytes::Bytes;
620    use std::fmt::Debug;
621    use std::ops::Range;
622    use std::sync::{Arc, LazyLock};
623
624    /// Test decoder struct size (as they are copied around on each transition, they
625    /// should not grow too large)
626    #[test]
627    fn test_decoder_size() {
628        assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
629    }
630
631    /// Decode the entire file at once, simulating a scenario where all data is
632    /// available in memory
633    #[test]
634    fn test_decoder_all_data() {
635        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
636            .unwrap()
637            .build()
638            .unwrap();
639
640        decoder
641            .push_range(test_file_range(), TEST_FILE_DATA.clone())
642            .unwrap();
643
644        let results = vec![
645            // first row group should be decoded without needing more data
646            expect_data(decoder.try_decode()),
647            // second row group should be decoded without needing more data
648            expect_data(decoder.try_decode()),
649        ];
650        expect_finished(decoder.try_decode());
651
652        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
653        // Check that the output matches the input batch
654        assert_eq!(all_output, *TEST_BATCH);
655    }
656
657    /// Decode the entire file incrementally, simulating a scenario where data is
658    /// fetched as needed
659    #[test]
660    fn test_decoder_incremental() {
661        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
662            .unwrap()
663            .build()
664            .unwrap();
665
666        let mut results = vec![];
667
668        // First row group, expect a single request
669        let ranges = expect_needs_data(decoder.try_decode());
670        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
671        push_ranges_to_decoder(&mut decoder, ranges);
672        // The decoder should currently only store the data it needs to decode the first row group
673        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
674        results.push(expect_data(decoder.try_decode()));
675        // the decoder should have consumed the data for the first row group and freed it
676        assert_eq!(decoder.buffered_bytes(), 0);
677
678        // Second row group,
679        let ranges = expect_needs_data(decoder.try_decode());
680        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
681        push_ranges_to_decoder(&mut decoder, ranges);
682        // The decoder should currently only store the data it needs to decode the second row group
683        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
684        results.push(expect_data(decoder.try_decode()));
685        // the decoder should have consumed the data for the second row group and freed it
686        assert_eq!(decoder.buffered_bytes(), 0);
687        expect_finished(decoder.try_decode());
688
689        // Check that the output matches the input batch
690        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
691        assert_eq!(all_output, *TEST_BATCH);
692    }
693
694    /// Releasing staged ranges should free speculative buffers without affecting
695    /// the active row group reader.
696    #[test]
697    fn test_decoder_clear_all_ranges() {
698        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
699            .unwrap()
700            .with_batch_size(100)
701            .build()
702            .unwrap();
703
704        decoder
705            .push_range(test_file_range(), TEST_FILE_DATA.clone())
706            .unwrap();
707        assert_eq!(decoder.buffered_bytes(), test_file_len());
708
709        // The current row group reader is built from the prefetched bytes, but
710        // the speculative full-file range remains staged in the decoder.
711        let batch1 = expect_data(decoder.try_decode());
712        assert_eq!(batch1, TEST_BATCH.slice(0, 100));
713        assert_eq!(decoder.buffered_bytes(), test_file_len());
714
715        // All of the buffer is released
716        decoder.clear_all_ranges();
717        assert_eq!(decoder.buffered_bytes(), 0);
718
719        // The active reader still owns the current row group's bytes, so it can
720        // continue decoding without consulting PushBuffers.
721        let batch2 = expect_data(decoder.try_decode());
722        assert_eq!(batch2, TEST_BATCH.slice(100, 100));
723        assert_eq!(decoder.buffered_bytes(), 0);
724
725        // Moving to the next row group now requires the decoder to ask for data
726        // again because the staged speculative ranges were released.
727        let ranges = expect_needs_data(decoder.try_decode());
728        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
729        push_ranges_to_decoder(&mut decoder, ranges);
730        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
731
732        let batch3 = expect_data(decoder.try_decode());
733        assert_eq!(batch3, TEST_BATCH.slice(200, 100));
734        assert_eq!(decoder.buffered_bytes(), 0);
735
736        let batch4 = expect_data(decoder.try_decode());
737        assert_eq!(batch4, TEST_BATCH.slice(300, 100));
738        assert_eq!(decoder.buffered_bytes(), 0);
739
740        expect_finished(decoder.try_decode());
741    }
742
743    /// Decode the entire file incrementally, simulating partial reads
744    #[test]
745    fn test_decoder_partial() {
746        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
747            .unwrap()
748            .build()
749            .unwrap();
750
751        // First row group, expect a single request for all data needed to read "a" and "b"
752        let ranges = expect_needs_data(decoder.try_decode());
753        push_ranges_to_decoder(&mut decoder, ranges);
754
755        let batch1 = expect_data(decoder.try_decode());
756        let expected1 = TEST_BATCH.slice(0, 200);
757        assert_eq!(batch1, expected1);
758
759        // Second row group, this time provide the data in two steps
760        let ranges = expect_needs_data(decoder.try_decode());
761        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
762        assert!(!ranges1.is_empty());
763        assert!(!ranges2.is_empty());
764        // push first half to simulate partial read
765        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
766
767        // still expect more data
768        let ranges = expect_needs_data(decoder.try_decode());
769        assert_eq!(ranges, ranges2); // should be the remaining ranges
770        // push empty ranges should be a no-op
771        push_ranges_to_decoder(&mut decoder, vec![]);
772        let ranges = expect_needs_data(decoder.try_decode());
773        assert_eq!(ranges, ranges2); // should be the remaining ranges
774        push_ranges_to_decoder(&mut decoder, ranges);
775
776        let batch2 = expect_data(decoder.try_decode());
777        let expected2 = TEST_BATCH.slice(200, 200);
778        assert_eq!(batch2, expected2);
779
780        expect_finished(decoder.try_decode());
781    }
782
783    /// Decode multiple columns "a" and "b", expect that the decoder requests
784    /// only a single request per row group
785    #[test]
786    fn test_decoder_selection_does_one_request() {
787        let builder =
788            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
789
790        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
791
792        let mut decoder = builder
793            .with_projection(
794                ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
795            )
796            .build()
797            .unwrap();
798
799        // First row group, expect a single request for all data needed to read "a" and "b"
800        let ranges = expect_needs_data(decoder.try_decode());
801        push_ranges_to_decoder(&mut decoder, ranges);
802
803        let batch1 = expect_data(decoder.try_decode());
804        let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
805        assert_eq!(batch1, expected1);
806
807        // Second row group, similarly expect a single request for all data needed to read "a" and "b"
808        let ranges = expect_needs_data(decoder.try_decode());
809        push_ranges_to_decoder(&mut decoder, ranges);
810
811        let batch2 = expect_data(decoder.try_decode());
812        let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
813        assert_eq!(batch2, expected2);
814
815        expect_finished(decoder.try_decode());
816    }
817
818    /// Decode with a filter that requires multiple requests, but only provide part
819    /// of the data needed for the filter at a time simulating partial reads.
820    #[test]
821    fn test_decoder_single_filter_partial() {
822        let builder =
823            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
824
825        // Values in column "a" range 0..399
826        // First filter: "a" > 250  (nothing in Row Group 0, both data pages in Row Group 1)
827        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
828
829        // a > 250
830        let row_filter_a = ArrowPredicateFn::new(
831            // claim to use both a and b so we get two ranges requests for the filter pages
832            ProjectionMask::columns(&schema_descr, ["a", "b"]),
833            |batch: RecordBatch| {
834                let scalar_250 = Int64Array::new_scalar(250);
835                let column = batch.column(0).as_primitive::<Int64Type>();
836                gt(column, &scalar_250)
837            },
838        );
839
840        let mut decoder = builder
841            .with_projection(
842                // read only column "a" to test that filter pages are reused
843                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
844            )
845            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
846            .build()
847            .unwrap();
848
849        // First row group, evaluating filters
850        let ranges = expect_needs_data(decoder.try_decode());
851        // only provide half the ranges
852        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
853        assert!(!ranges1.is_empty());
854        assert!(!ranges2.is_empty());
855        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
856        // still expect more data
857        let ranges = expect_needs_data(decoder.try_decode());
858        assert_eq!(ranges, ranges2); // should be the remaining ranges
859        let ranges = expect_needs_data(decoder.try_decode());
860        assert_eq!(ranges, ranges2); // should be the remaining ranges
861        push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
862
863        // Since no rows in the first row group pass the filters, there is no
864        // additional requests to read data pages for "b" here
865
866        // Second row group
867        let ranges = expect_needs_data(decoder.try_decode());
868        push_ranges_to_decoder(&mut decoder, ranges);
869
870        let batch = expect_data(decoder.try_decode());
871        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
872        assert_eq!(batch, expected);
873
874        expect_finished(decoder.try_decode());
875    }
876
877    /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
878    #[test]
879    fn test_decoder_single_filter_and_row_selection() {
880        let builder =
881            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
882
883        // Values in column "a" range 0..399
884        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
885        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
886
887        // a > 250
888        let row_filter_a = ArrowPredicateFn::new(
889            ProjectionMask::columns(&schema_descr, ["a"]),
890            |batch: RecordBatch| {
891                let scalar_250 = Int64Array::new_scalar(250);
892                let column = batch.column(0).as_primitive::<Int64Type>();
893                gt(column, &scalar_250)
894            },
895        );
896
897        let mut decoder = builder
898            .with_projection(
899                // read only column "a" to test that filter pages are reused
900                ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
901            )
902            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
903            .with_row_selection(RowSelection::from(vec![
904                RowSelector::skip(200),   // skip first row group
905                RowSelector::select(100), // first 100 rows of second row group
906                RowSelector::skip(100),
907            ]))
908            .build()
909            .unwrap();
910
911        // expect the first row group to be filtered out (no filter is evaluated due to row selection)
912
913        // First row group, first filter (a > 250)
914        let ranges = expect_needs_data(decoder.try_decode());
915        push_ranges_to_decoder(&mut decoder, ranges);
916
917        // Second row group
918        let ranges = expect_needs_data(decoder.try_decode());
919        push_ranges_to_decoder(&mut decoder, ranges);
920
921        let batch = expect_data(decoder.try_decode());
922        let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
923        assert_eq!(batch, expected);
924
925        expect_finished(decoder.try_decode());
926    }
927
928    /// Decode with multiple filters that require multiple requests
929    #[test]
930    fn test_decoder_multi_filters() {
931        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
932        let builder =
933            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
934
935        // Values in column "a" range 0..399
936        // Values in column "b" range 400..799
937        // First filter: "a" > 175  (last data page in Row Group 0)
938        // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
939        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
940
941        // a > 175
942        let row_filter_a = ArrowPredicateFn::new(
943            ProjectionMask::columns(&schema_descr, ["a"]),
944            |batch: RecordBatch| {
945                let scalar_175 = Int64Array::new_scalar(175);
946                let column = batch.column(0).as_primitive::<Int64Type>();
947                gt(column, &scalar_175)
948            },
949        );
950
951        // b < 625
952        let row_filter_b = ArrowPredicateFn::new(
953            ProjectionMask::columns(&schema_descr, ["b"]),
954            |batch: RecordBatch| {
955                let scalar_625 = Int64Array::new_scalar(625);
956                let column = batch.column(0).as_primitive::<Int64Type>();
957                lt(column, &scalar_625)
958            },
959        );
960
961        let mut decoder = builder
962            .with_projection(
963                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
964            )
965            .with_row_filter(RowFilter::new(vec![
966                Box::new(row_filter_a),
967                Box::new(row_filter_b),
968            ]))
969            .build()
970            .unwrap();
971
972        // First row group, first filter (a > 175)
973        let ranges = expect_needs_data(decoder.try_decode());
974        push_ranges_to_decoder(&mut decoder, ranges);
975
976        // first row group, second filter (b < 625)
977        let ranges = expect_needs_data(decoder.try_decode());
978        push_ranges_to_decoder(&mut decoder, ranges);
979
980        // first row group, data pages for "c"
981        let ranges = expect_needs_data(decoder.try_decode());
982        push_ranges_to_decoder(&mut decoder, ranges);
983
984        // expect the first batch to be decoded: rows 176..199, column "c"
985        let batch1 = expect_data(decoder.try_decode());
986        let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
987        assert_eq!(batch1, expected1);
988
989        // Second row group, first filter (a > 175)
990        let ranges = expect_needs_data(decoder.try_decode());
991        push_ranges_to_decoder(&mut decoder, ranges);
992
993        // Second row group, second filter (b < 625)
994        let ranges = expect_needs_data(decoder.try_decode());
995        push_ranges_to_decoder(&mut decoder, ranges);
996
997        // Second row group, data pages for "c"
998        let ranges = expect_needs_data(decoder.try_decode());
999        push_ranges_to_decoder(&mut decoder, ranges);
1000
1001        // expect the second batch to be decoded: rows 200..224, column "c"
1002        let batch2 = expect_data(decoder.try_decode());
1003        let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
1004        assert_eq!(batch2, expected2);
1005
1006        expect_finished(decoder.try_decode());
1007    }
1008
1009    /// Decode with a filter that uses a column that is also projected, and expect
1010    /// that the filter pages are reused (don't refetch them)
1011    #[test]
1012    fn test_decoder_reuses_filter_pages() {
1013        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1014        let builder =
1015            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1016
1017        // Values in column "a" range 0..399
1018        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
1019        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1020
1021        // a > 250
1022        let row_filter_a = ArrowPredicateFn::new(
1023            ProjectionMask::columns(&schema_descr, ["a"]),
1024            |batch: RecordBatch| {
1025                let scalar_250 = Int64Array::new_scalar(250);
1026                let column = batch.column(0).as_primitive::<Int64Type>();
1027                gt(column, &scalar_250)
1028            },
1029        );
1030
1031        let mut decoder = builder
1032            .with_projection(
1033                // read only column "a" to test that filter pages are reused
1034                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1035            )
1036            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1037            .build()
1038            .unwrap();
1039
1040        // First row group, first filter (a > 175)
1041        let ranges = expect_needs_data(decoder.try_decode());
1042        push_ranges_to_decoder(&mut decoder, ranges);
1043
1044        // expect the first row group to be filtered out (no rows match)
1045
1046        // Second row group, first filter (a > 250)
1047        let ranges = expect_needs_data(decoder.try_decode());
1048        push_ranges_to_decoder(&mut decoder, ranges);
1049
1050        // expect that the second row group is decoded: rows 251..399, column "a"
1051        // Note that the filter pages for "a" should be reused and no additional data
1052        // should be requested
1053        let batch = expect_data(decoder.try_decode());
1054        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1055        assert_eq!(batch, expected);
1056
1057        expect_finished(decoder.try_decode());
1058    }
1059
1060    #[test]
1061    fn test_decoder_empty_filters() {
1062        let builder =
1063            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1064        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1065
1066        // only read column "c", but with empty filters
1067        let mut decoder = builder
1068            .with_projection(
1069                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1070            )
1071            .with_row_filter(RowFilter::new(vec![
1072                // empty filters should be ignored
1073            ]))
1074            .build()
1075            .unwrap();
1076
1077        // First row group
1078        let ranges = expect_needs_data(decoder.try_decode());
1079        push_ranges_to_decoder(&mut decoder, ranges);
1080
1081        // expect the first batch to be decoded: rows 0..199, column "c"
1082        let batch1 = expect_data(decoder.try_decode());
1083        let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1084        assert_eq!(batch1, expected1);
1085
1086        // Second row group,
1087        let ranges = expect_needs_data(decoder.try_decode());
1088        push_ranges_to_decoder(&mut decoder, ranges);
1089
1090        // expect the second batch to be decoded: rows 200..399, column "c"
1091        let batch2 = expect_data(decoder.try_decode());
1092        let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1093
1094        assert_eq!(batch2, expected2);
1095
1096        expect_finished(decoder.try_decode());
1097    }
1098
1099    /// When filter pushdown is combined with a `LIMIT`, the predicate must
1100    /// not be evaluated for rows beyond the `limit`-th match.
1101    ///
1102    /// Filter `a > 175` produces 24 matches in row group 0 (rows 176..199).
1103    /// With `limit = 10`, only the first 10 matches (rows 176..185) should be
1104    /// emitted, AND the predicate counter should observe that evaluation was
1105    /// short-circuited.
1106    #[test]
1107    fn test_decoder_filter_with_limit_short_circuits_within_row_group() {
1108        use std::sync::atomic::{AtomicUsize, Ordering};
1109
1110        let builder =
1111            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1112        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1113
1114        let rows_filtered = Arc::new(AtomicUsize::new(0));
1115        let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1116
1117        let row_filter_a = ArrowPredicateFn::new(
1118            ProjectionMask::columns(&schema_descr, ["a"]),
1119            move |batch: RecordBatch| {
1120                rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1121                let scalar_175 = Int64Array::new_scalar(175);
1122                let column = batch.column(0).as_primitive::<Int64Type>();
1123                gt(column, &scalar_175)
1124            },
1125        );
1126
1127        // Use a small batch size so the row group is evaluated across
1128        // multiple predicate batches; that is the regime where Layer 2's
1129        // short-circuit saves predicate evaluation work. Matching rows are
1130        // 176..199 (24 rows); with batch_size = 10 those span batches 17, 18,
1131        // and 19 (rows 170..199). A limit of 10 should stop filter evaluation
1132        // in the middle of batch 18.
1133        let mut decoder = builder
1134            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1135            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1136            .with_batch_size(10)
1137            .with_limit(10)
1138            .build()
1139            .unwrap();
1140
1141        // First row group: filter columns fetch (predicate is evaluated here)
1142        let ranges = expect_needs_data(decoder.try_decode());
1143        push_ranges_to_decoder(&mut decoder, ranges);
1144
1145        // The first 10 matching rows come out: 176..185, column "a"
1146        let batch = expect_data(decoder.try_decode());
1147        let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1148        assert_eq!(batch, expected);
1149
1150        // no data for row group 1 should be requested — the limit
1151        // was satisfied by row group 0 and the `Start` state for row group 1
1152        // short-circuits to `Finished`.
1153        expect_finished(decoder.try_decode());
1154
1155        // Row 186 is the 11th match; the scan should stop no later than the
1156        // batch containing it (batch 18 of 10 rows = rows 180..189), so at
1157        // most 190 rows are evaluated.
1158        let evaluated = rows_filtered.load(Ordering::Relaxed);
1159        assert!(
1160            evaluated <= 190,
1161            "predicate evaluated {evaluated} rows; expected ≤ 190 (stop within batch containing 11th match)"
1162        );
1163    }
1164
1165    /// Once the limit has been satisfied by a prior row group, subsequent
1166    /// row groups should be skipped entirely — no data request for their
1167    /// filter columns.
1168    #[test]
1169    fn test_decoder_filter_with_limit_skips_later_row_groups() {
1170        let builder =
1171            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1172        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1173
1174        // `a > 175` matches rows 176..199 in row group 0 (24 matches) and
1175        // 200..399 in row group 1 (200 matches). With limit = 5, all matches
1176        // should come from row group 0.
1177        let row_filter_a = ArrowPredicateFn::new(
1178            ProjectionMask::columns(&schema_descr, ["a"]),
1179            |batch: RecordBatch| {
1180                let scalar_175 = Int64Array::new_scalar(175);
1181                let column = batch.column(0).as_primitive::<Int64Type>();
1182                gt(column, &scalar_175)
1183            },
1184        );
1185
1186        let mut decoder = builder
1187            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1188            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1189            .with_limit(5)
1190            .build()
1191            .unwrap();
1192
1193        // Row group 0: fetch filter pages
1194        let ranges = expect_needs_data(decoder.try_decode());
1195        push_ranges_to_decoder(&mut decoder, ranges);
1196
1197        // First 5 matches: 176..180
1198        let batch = expect_data(decoder.try_decode());
1199        let expected = TEST_BATCH.slice(176, 5).project(&[0]).unwrap();
1200        assert_eq!(batch, expected);
1201
1202        // Row group 1 must NOT request data — the limit is already satisfied
1203        // so `Start` in row group 1 short-circuits to `Finished`.
1204        expect_finished(decoder.try_decode());
1205    }
1206
1207    /// The predicate short-circuit must account for `self.offset` as well as
1208    /// `self.limit`. The post-predicate `with_offset` step skips that many
1209    /// already-selected rows before `with_limit` counts output rows — so the
1210    /// predicate must retain at least `offset + limit` matches. Without the
1211    /// fix, Layer 2 caps at just `limit` and the later `with_offset` consumes
1212    /// all of them, producing 0 rows instead of `limit`.
1213    ///
1214    /// `a > 175` matches rows 176..199 in row group 0 (24 matches). With
1215    /// `offset = 10, limit = 5`, the expected output is rows 186..190 (the
1216    /// 11th through 15th matches).
1217    #[test]
1218    fn test_decoder_filter_with_offset_and_limit() {
1219        let builder =
1220            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1221        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1222
1223        let row_filter_a = ArrowPredicateFn::new(
1224            ProjectionMask::columns(&schema_descr, ["a"]),
1225            |batch: RecordBatch| {
1226                let scalar_175 = Int64Array::new_scalar(175);
1227                let column = batch.column(0).as_primitive::<Int64Type>();
1228                gt(column, &scalar_175)
1229            },
1230        );
1231
1232        let mut decoder = builder
1233            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1234            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1235            .with_offset(10)
1236            .with_limit(5)
1237            .build()
1238            .unwrap();
1239
1240        let ranges = expect_needs_data(decoder.try_decode());
1241        push_ranges_to_decoder(&mut decoder, ranges);
1242
1243        let batch = expect_data(decoder.try_decode());
1244        let expected = TEST_BATCH.slice(186, 5).project(&[0]).unwrap();
1245        assert_eq!(batch, expected);
1246
1247        expect_finished(decoder.try_decode());
1248    }
1249
1250    /// The limit short-circuit must also be correct when the limited predicate
1251    /// is the last predicate in a multi-predicate chain.
1252    ///
1253    /// `a > 175` first narrows row group 0 to rows 176..199. The final
1254    /// predicate `b < 625` is then evaluated only over those 24 rows, all of
1255    /// which match. With `limit = 10`, the final output should still be rows
1256    /// 176..185, and the second predicate should stop before consuming all 24
1257    /// selected rows.
1258    #[test]
1259    fn test_decoder_multi_filters_with_limit() {
1260        use std::sync::atomic::{AtomicUsize, Ordering};
1261
1262        let builder =
1263            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1264        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1265
1266        let first_predicate_rows = Arc::new(AtomicUsize::new(0));
1267        let second_predicate_rows = Arc::new(AtomicUsize::new(0));
1268
1269        let first_predicate_rows_for_filter = Arc::clone(&first_predicate_rows);
1270        let row_filter_a = ArrowPredicateFn::new(
1271            ProjectionMask::columns(&schema_descr, ["a"]),
1272            move |batch: RecordBatch| {
1273                first_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1274                let scalar_175 = Int64Array::new_scalar(175);
1275                let column = batch.column(0).as_primitive::<Int64Type>();
1276                gt(column, &scalar_175)
1277            },
1278        );
1279
1280        let second_predicate_rows_for_filter = Arc::clone(&second_predicate_rows);
1281        let row_filter_b = ArrowPredicateFn::new(
1282            ProjectionMask::columns(&schema_descr, ["b"]),
1283            move |batch: RecordBatch| {
1284                second_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1285                let scalar_625 = Int64Array::new_scalar(625);
1286                let column = batch.column(0).as_primitive::<Int64Type>();
1287                lt(column, &scalar_625)
1288            },
1289        );
1290
1291        let mut decoder = builder
1292            .with_projection(ProjectionMask::columns(&schema_descr, ["c"]))
1293            .with_row_filter(RowFilter::new(vec![
1294                Box::new(row_filter_a),
1295                Box::new(row_filter_b),
1296            ]))
1297            .with_batch_size(10)
1298            .with_limit(10)
1299            .build()
1300            .unwrap();
1301
1302        // Row group 0, first predicate
1303        let ranges = expect_needs_data(decoder.try_decode());
1304        push_ranges_to_decoder(&mut decoder, ranges);
1305
1306        // Row group 0, second predicate
1307        let ranges = expect_needs_data(decoder.try_decode());
1308        push_ranges_to_decoder(&mut decoder, ranges);
1309
1310        // Final projected data
1311        let ranges = expect_needs_data(decoder.try_decode());
1312        push_ranges_to_decoder(&mut decoder, ranges);
1313
1314        let batch = expect_data(decoder.try_decode());
1315        let expected = TEST_BATCH.slice(176, 10).project(&[2]).unwrap();
1316        assert_eq!(batch, expected);
1317
1318        // The overall limit was satisfied by row group 0.
1319        expect_finished(decoder.try_decode());
1320
1321        assert_eq!(first_predicate_rows.load(Ordering::Relaxed), 200);
1322        assert!(
1323            second_predicate_rows.load(Ordering::Relaxed) < 24,
1324            "final predicate should short-circuit before consuming all 24 rows selected by the first predicate"
1325        );
1326    }
1327
1328    /// When a row selection already exists, limiting the predicate must still
1329    /// preserve alignment with that prior selection.
1330    ///
1331    /// The explicit selection narrows row group 0 to rows 150..199. Applying
1332    /// `a > 175` over that selection yields rows 176..199. With `limit = 10`,
1333    /// the decoder should emit rows 176..185 and stop without evaluating the
1334    /// remaining selected rows.
1335    #[test]
1336    fn test_decoder_filter_with_row_selection_and_limit() {
1337        use std::sync::atomic::{AtomicUsize, Ordering};
1338
1339        let builder =
1340            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1341        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1342
1343        let rows_filtered = Arc::new(AtomicUsize::new(0));
1344        let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1345
1346        let row_filter_a = ArrowPredicateFn::new(
1347            ProjectionMask::columns(&schema_descr, ["a"]),
1348            move |batch: RecordBatch| {
1349                rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1350                let scalar_175 = Int64Array::new_scalar(175);
1351                let column = batch.column(0).as_primitive::<Int64Type>();
1352                gt(column, &scalar_175)
1353            },
1354        );
1355
1356        let mut decoder = builder
1357            .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1358            .with_row_selection(RowSelection::from(vec![
1359                RowSelector::skip(150),
1360                RowSelector::select(50),
1361            ]))
1362            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1363            .with_batch_size(10)
1364            .with_limit(10)
1365            .build()
1366            .unwrap();
1367
1368        let ranges = expect_needs_data(decoder.try_decode());
1369        push_ranges_to_decoder(&mut decoder, ranges);
1370
1371        let batch = expect_data(decoder.try_decode());
1372        let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1373        assert_eq!(batch, expected);
1374
1375        expect_finished(decoder.try_decode());
1376
1377        assert!(
1378            rows_filtered.load(Ordering::Relaxed) < 50,
1379            "predicate should short-circuit before consuming all 50 rows from the explicit row selection"
1380        );
1381    }
1382
1383    #[test]
1384    fn test_decoder_offset_limit() {
1385        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1386            .unwrap()
1387            // skip entire first row group (200 rows) and first 25 rows of second row group
1388            .with_offset(225)
1389            // and limit to 20 rows
1390            .with_limit(20)
1391            .build()
1392            .unwrap();
1393
1394        // First row group should be skipped,
1395
1396        // Second row group
1397        let ranges = expect_needs_data(decoder.try_decode());
1398        push_ranges_to_decoder(&mut decoder, ranges);
1399
1400        // expect the first and only batch to be decoded
1401        let batch1 = expect_data(decoder.try_decode());
1402        let expected1 = TEST_BATCH.slice(225, 20);
1403        assert_eq!(batch1, expected1);
1404
1405        expect_finished(decoder.try_decode());
1406    }
1407
1408    #[test]
1409    fn test_decoder_try_next_reader_offset_limit() {
1410        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1411            .unwrap()
1412            .with_offset(225)
1413            .with_limit(20)
1414            .build()
1415            .unwrap();
1416
1417        let ranges = expect_needs_data(decoder.try_next_reader());
1418        push_ranges_to_decoder(&mut decoder, ranges);
1419
1420        let reader = expect_data(decoder.try_next_reader());
1421        let batches = reader
1422            .map(|batch| batch.expect("expected decoded batch"))
1423            .collect::<Vec<_>>();
1424        let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
1425        assert_eq!(output, TEST_BATCH.slice(225, 20));
1426
1427        expect_finished(decoder.try_next_reader());
1428    }
1429
1430    #[test]
1431    fn test_decoder_row_group_selection() {
1432        // take only the second row group
1433        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1434            .unwrap()
1435            .with_row_groups(vec![1])
1436            .build()
1437            .unwrap();
1438
1439        // First row group should be skipped,
1440
1441        // Second row group
1442        let ranges = expect_needs_data(decoder.try_decode());
1443        push_ranges_to_decoder(&mut decoder, ranges);
1444
1445        // expect the first and only batch to be decoded
1446        let batch1 = expect_data(decoder.try_decode());
1447        let expected1 = TEST_BATCH.slice(200, 200);
1448        assert_eq!(batch1, expected1);
1449
1450        expect_finished(decoder.try_decode());
1451    }
1452
1453    #[test]
1454    fn test_decoder_row_selection() {
1455        // take only the second row group
1456        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1457            .unwrap()
1458            .with_row_selection(RowSelection::from(vec![
1459                RowSelector::skip(225),  // skip first row group and 25 rows of second])
1460                RowSelector::select(20), // take 20 rows
1461            ]))
1462            .build()
1463            .unwrap();
1464
1465        // First row group should be skipped,
1466
1467        // Second row group
1468        let ranges = expect_needs_data(decoder.try_decode());
1469        push_ranges_to_decoder(&mut decoder, ranges);
1470
1471        // expect the first ane only batch to be decoded
1472        let batch1 = expect_data(decoder.try_decode());
1473        let expected1 = TEST_BATCH.slice(225, 20);
1474        assert_eq!(batch1, expected1);
1475
1476        expect_finished(decoder.try_decode());
1477    }
1478
1479    /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1480    ///
1481    /// Note c is a different types (so the data page sizes will be different)
1482    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1483        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1484        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1485        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1486            if i % 2 == 0 {
1487                format!("string_{i}")
1488            } else {
1489                format!("A string larger than 12 bytes and thus not inlined {i}")
1490            }
1491        })));
1492
1493        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1494    });
1495
1496    /// Create a parquet file in memory for testing.
1497    ///
1498    /// See [`TEST_BATCH`] for the data in the file.
1499    ///
1500    /// Each column is written in 4 data pages, each with 100 rows, across 2
1501    /// row groups. Each column in each row group has two data pages.
1502    ///
1503    /// The data is split across row groups like this
1504    ///
1505    /// Column |   Values                | Data Page | Row Group
1506    /// -------|------------------------|-----------|-----------
1507    /// a      | 0..99                  | 1         | 0
1508    /// a      | 100..199               | 2         | 0
1509    /// a      | 200..299               | 1         | 1
1510    /// a      | 300..399               | 2         | 1
1511    ///
1512    /// b      | 400..499               | 1         | 0
1513    /// b      | 500..599               | 2         | 0
1514    /// b      | 600..699               | 1         | 1
1515    /// b      | 700..799               | 2         | 1
1516    ///
1517    /// c      | "string_0".."string_99"        | 1         | 0
1518    /// c      | "string_100".."string_199"     | 2         | 0
1519    /// c      | "string_200".."string_299"     | 1         | 1
1520    /// c      | "string_300".."string_399"     | 2         | 1
1521    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1522        let input_batch = &TEST_BATCH;
1523        let mut output = Vec::new();
1524
1525        let writer_options = WriterProperties::builder()
1526            .set_max_row_group_row_count(Some(200))
1527            .set_data_page_row_count_limit(100)
1528            .build();
1529        let mut writer =
1530            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1531
1532        // since the limits are only enforced on batch boundaries, write the input
1533        // batch in chunks of 50
1534        let mut row_remain = input_batch.num_rows();
1535        while row_remain > 0 {
1536            let chunk_size = row_remain.min(50);
1537            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1538            writer.write(&chunk).unwrap();
1539            row_remain -= chunk_size;
1540        }
1541        writer.close().unwrap();
1542        Bytes::from(output)
1543    });
1544
1545    /// Return the length of [`TEST_FILE_DATA`], in bytes
1546    fn test_file_len() -> u64 {
1547        TEST_FILE_DATA.len() as u64
1548    }
1549
1550    /// Return a range that covers the entire [`TEST_FILE_DATA`]
1551    fn test_file_range() -> Range<u64> {
1552        0..test_file_len()
1553    }
1554
1555    /// Return a slice of the test file data from the given range
1556    pub fn test_file_slice(range: Range<u64>) -> Bytes {
1557        let start: usize = range.start.try_into().unwrap();
1558        let end: usize = range.end.try_into().unwrap();
1559        TEST_FILE_DATA.slice(start..end)
1560    }
1561
1562    /// return the metadata for the test file
1563    pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1564        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1565        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1566        let metadata = metadata_decoder.try_decode().unwrap();
1567        let DecodeResult::Data(metadata) = metadata else {
1568            panic!("Expected metadata to be decoded successfully");
1569        };
1570        Arc::new(metadata)
1571    }
1572
1573    /// Push the given ranges to the metadata decoder, simulating reading from a file
1574    fn push_ranges_to_metadata_decoder(
1575        metadata_decoder: &mut ParquetMetaDataPushDecoder,
1576        ranges: Vec<Range<u64>>,
1577    ) {
1578        let data = ranges
1579            .iter()
1580            .map(|range| test_file_slice(range.clone()))
1581            .collect::<Vec<_>>();
1582        metadata_decoder.push_ranges(ranges, data).unwrap();
1583    }
1584
1585    fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1586        let data = ranges
1587            .iter()
1588            .map(|range| test_file_slice(range.clone()))
1589            .collect::<Vec<_>>();
1590        decoder.push_ranges(ranges, data).unwrap();
1591    }
1592
1593    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1594    fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1595        match result.expect("Expected Ok(DecodeResult::Data(T))") {
1596            DecodeResult::Data(data) => data,
1597            result => panic!("Expected DecodeResult::Data, got {result:?}"),
1598        }
1599    }
1600
1601    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1602    fn expect_needs_data<T: Debug>(
1603        result: Result<DecodeResult<T>, ParquetError>,
1604    ) -> Vec<Range<u64>> {
1605        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1606            DecodeResult::NeedsData(ranges) => ranges,
1607            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1608        }
1609    }
1610
1611    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1612        match result.expect("Expected Ok(DecodeResult::Finished)") {
1613            DecodeResult::Finished => {}
1614            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1615        }
1616    }
1617}