parquet/arrow/push_decoder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::RowGroupReaderBuilder;
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`] and pass
41/// the file length and metadata of the Parquet file to decode.
42///
43/// You can decode the metadata from a Parquet file using either
44/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
45///
46/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
47/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
48///
49/// Note the "input" type is `u64` which represents the length of the Parquet file
50/// being decoded. This is needed to initialize the internal buffers that track
51/// what data has been provided to the decoder.
52///
53/// # Example
54/// ```
55/// # use std::ops::Range;
56/// # use std::sync::Arc;
57/// # use bytes::Bytes;
58/// # use arrow_array::record_batch;
59/// # use parquet::DecodeResult;
60/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
61/// # use parquet::arrow::ArrowWriter;
62/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
63/// # let file_bytes = {
64/// #   let mut buffer = vec![];
65/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
66/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
67/// #   writer.write(&batch).unwrap();
68/// #   writer.close().unwrap();
69/// #   Bytes::from(buffer)
70/// # };
71/// # // mimic IO by returning a function that returns the bytes for a given range
72/// # let get_range = |range: &Range<u64>| -> Bytes {
73/// #    let start = range.start as usize;
74/// #     let end = range.end as usize;
75/// #    file_bytes.slice(start..end)
76/// # };
77/// # let file_length = file_bytes.len() as u64;
78/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
79/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
80/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
81/// # let parquet_metadata = Arc::new(parquet_metadata);
82/// // The file length and metadata are required to create the decoder
83/// let mut decoder =
84///     ParquetPushDecoderBuilder::try_new_decoder(file_length, parquet_metadata)
85///       .unwrap()
86///       // Optionally configure the decoder, e.g. batch size
87///       .with_batch_size(1024)
88///       // Build the decoder
89///       .build()
90///       .unwrap();
91///
92///     // In a loop, ask the decoder what it needs next, and provide it with the required data
93///     loop {
94///         match decoder.try_decode().unwrap() {
95///             DecodeResult::NeedsData(ranges) => {
96///                 // The decoder needs more data. Fetch the data for the given ranges
97///                 let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
98///                 // Push the data to the decoder
99///                 decoder.push_ranges(ranges, data).unwrap();
100///                 // After pushing the data, we can try to decode again on the next iteration
101///             }
102///             DecodeResult::Data(batch) => {
103///                 // Successfully decoded a batch of data
104///                 assert!(batch.num_rows() > 0);
105///             }
106///             DecodeResult::Finished => {
107///                 // The decoder has finished decoding exit the loop
108///                 break;
109///             }
110///         }
111///     }
112/// ```
113pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
114
115/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
116/// more options that can be configured.
117impl ParquetPushDecoderBuilder {
118    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
119    ///
120    /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
121    ///
122    /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
123    ///
124    /// See example on [`ParquetPushDecoderBuilder`]
125    pub fn try_new_decoder(
126        file_len: u64,
127        parquet_metadata: Arc<ParquetMetaData>,
128    ) -> Result<Self, ParquetError> {
129        Self::try_new_decoder_with_options(
130            file_len,
131            parquet_metadata,
132            ArrowReaderOptions::default(),
133        )
134    }
135
136    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
137    /// with the given reader options.
138    ///
139    /// This is similar to [`Self::try_new_decoder`] but allows configuring
140    /// options such as Arrow schema
141    pub fn try_new_decoder_with_options(
142        file_len: u64,
143        parquet_metadata: Arc<ParquetMetaData>,
144        arrow_reader_options: ArrowReaderOptions,
145    ) -> Result<Self, ParquetError> {
146        let arrow_reader_metadata =
147            ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
148        Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
149    }
150
151    /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
152    ///
153    /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
154    /// the Parquet metadata and reader options.
155    pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: ArrowReaderMetadata) -> Self {
156        Self::new_builder(file_len, arrow_reader_metadata)
157    }
158
159    /// Create a [`ParquetPushDecoder`] with the configured options
160    pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
161        let Self {
162            input: file_len,
163            metadata: parquet_metadata,
164            schema: _,
165            fields,
166            batch_size,
167            row_groups,
168            projection,
169            filter,
170            selection,
171            limit,
172            offset,
173            metrics,
174            max_predicate_cache_size,
175        } = self;
176
177        // If no row groups were specified, read all of them
178        let row_groups =
179            row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
180
181        // Prepare to build RowGroup readers
182        let buffers = PushBuffers::new(file_len);
183        let row_group_reader_builder = RowGroupReaderBuilder::new(
184            batch_size,
185            projection,
186            Arc::clone(&parquet_metadata),
187            fields,
188            filter,
189            limit,
190            offset,
191            metrics,
192            max_predicate_cache_size,
193            buffers,
194        );
195
196        // Initialize the decoder with the configured options
197        let remaining_row_groups = RemainingRowGroups::new(
198            parquet_metadata,
199            row_groups,
200            selection,
201            row_group_reader_builder,
202        );
203
204        Ok(ParquetPushDecoder {
205            state: ParquetDecoderState::ReadingRowGroup {
206                remaining_row_groups: Box::new(remaining_row_groups),
207            },
208        })
209    }
210}
211
212/// A push based Parquet Decoder
213///
214/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
215///
216/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
217/// underlying reader for performing IO, and thus offers fine grained control
218/// over how data is fetched and decoded.
219///
220/// When more data is needed to make progress, instead of reading data directly
221/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
222/// are needed. Once the caller provides the requested ranges via
223/// [`Self::push_ranges`], they try to decode again by calling
224/// [`Self::try_decode`].
225///
226/// The decoder's internal state tracks what has been already decoded and what
227/// is needed next.
228#[derive(Debug)]
229pub struct ParquetPushDecoder {
230    /// The inner state.
231    ///
232    /// This state is consumed on every transition and a new state is produced
233    /// so the Rust compiler can ensure that the state is always valid and
234    /// transitions are not missed.
235    state: ParquetDecoderState,
236}
237
238impl ParquetPushDecoder {
239    /// Attempt to decode the next batch of data, or return what data is needed
240    ///
241    /// The the decoder communicates the next state with a [`DecodeResult`]
242    ///
243    /// See full example in [`ParquetPushDecoderBuilder`]
244    ///
245    /// ```no_run
246    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
247    /// use parquet::DecodeResult;
248    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
249    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
250    /// let mut decoder = get_decoder();
251    /// loop {
252    ///    match decoder.try_decode().unwrap() {
253    ///       DecodeResult::NeedsData(ranges) => {
254    ///         // The decoder needs more data. Fetch the data for the given ranges
255    ///         // call decoder.push_ranges(ranges, data) and call again
256    ///         push_data(&mut decoder, ranges);
257    ///       }
258    ///       DecodeResult::Data(batch) => {
259    ///         // Successfully decoded the next batch of data
260    ///         println!("Got batch with {} rows", batch.num_rows());
261    ///       }
262    ///       DecodeResult::Finished => {
263    ///         // The decoder has finished decoding all data
264    ///         break;
265    ///       }
266    ///    }
267    /// }
268    ///```
269    pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
270        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
271        let (new_state, decode_result) = current_state.try_transition()?;
272        self.state = new_state;
273        Ok(decode_result)
274    }
275
276    /// Push data into the decoder for processing
277    ///
278    /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
279    /// single range of data.
280    ///
281    /// Note this can be the entire file or just a part of it. If it is part of the file,
282    /// the ranges should correspond to the data ranges requested by the decoder.
283    ///
284    /// See example in [`ParquetPushDecoderBuilder`]
285    pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
286        self.push_ranges(vec![range], vec![data])
287    }
288
289    /// Push data into the decoder for processing
290    ///
291    /// This should correspond to the data ranges requested by the decoder
292    pub fn push_ranges(
293        &mut self,
294        ranges: Vec<Range<u64>>,
295        data: Vec<Bytes>,
296    ) -> Result<(), ParquetError> {
297        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
298        self.state = current_state.push_data(ranges, data)?;
299        Ok(())
300    }
301
302    /// Returns the total number of buffered bytes in the decoder
303    ///
304    /// This is the sum of the size of all [`Bytes`] that has been pushed to the
305    /// decoder but not yet consumed.
306    ///
307    /// Note that this does not include any overhead of the internal data
308    /// structures and that since [`Bytes`] are ref counted memory, this may not
309    /// reflect additional memory usage.
310    ///
311    /// This can be used to monitor memory usage of the decoder.
312    pub fn buffered_bytes(&self) -> u64 {
313        self.state.buffered_bytes()
314    }
315}
316
317/// Internal state machine for the [`ParquetPushDecoder`]
318#[derive(Debug)]
319enum ParquetDecoderState {
320    /// Waiting for data needed to decode the next RowGroup
321    ReadingRowGroup {
322        remaining_row_groups: Box<RemainingRowGroups>,
323    },
324    /// The decoder is actively decoding a RowGroup
325    DecodingRowGroup {
326        /// Current active reader
327        record_batch_reader: Box<ParquetRecordBatchReader>,
328        remaining_row_groups: Box<RemainingRowGroups>,
329    },
330    /// The decoder has finished processing all data
331    Finished,
332}
333
334impl ParquetDecoderState {
335    /// Current state --> next state + output
336    ///
337    /// This function is called to check if the decoder has any RecordBatches
338    /// and [`Self::push_data`] is called when new data is available.
339    ///
340    /// # Notes
341    ///
342    /// This structure is used to reduce the indentation level of the main loop
343    /// in try_build
344    fn try_transition(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
345        match self {
346            Self::ReadingRowGroup {
347                mut remaining_row_groups,
348            } => {
349                match remaining_row_groups.try_next_reader()? {
350                    // If we have a next reader, we can transition to decoding it
351                    DecodeResult::Data(record_batch_reader) => {
352                        // Transition to decoding the row group
353                        Self::DecodingRowGroup {
354                            record_batch_reader: Box::new(record_batch_reader),
355                            remaining_row_groups,
356                        }
357                        .try_transition()
358                    }
359                    // If there are no more readers, we are finished
360                    DecodeResult::NeedsData(ranges) => {
361                        // If we need more data, we return the ranges needed and stay in Reading
362                        // RowGroup state
363                        Ok((
364                            Self::ReadingRowGroup {
365                                remaining_row_groups,
366                            },
367                            DecodeResult::NeedsData(ranges),
368                        ))
369                    }
370                    DecodeResult::Finished => {
371                        // No more row groups to read, we are finished
372                        Ok((Self::Finished, DecodeResult::Finished))
373                    }
374                }
375            }
376            Self::DecodingRowGroup {
377                mut record_batch_reader,
378                remaining_row_groups,
379            } => {
380                // Decide the next record batch
381                match record_batch_reader.next() {
382                    Some(Ok(batch)) => {
383                        // Successfully decoded a batch, return it
384                        Ok((
385                            Self::DecodingRowGroup {
386                                record_batch_reader,
387                                remaining_row_groups,
388                            },
389                            DecodeResult::Data(batch),
390                        ))
391                    }
392                    None => {
393                        // No more batches in this row group, move to the next row group
394                        // or finish if there are no more row groups
395                        Self::ReadingRowGroup {
396                            remaining_row_groups,
397                        }
398                        .try_transition()
399                    }
400                    Some(Err(e)) => Err(ParquetError::from(e)), // some error occurred while decoding
401                }
402            }
403            Self::Finished => Ok((Self::Finished, DecodeResult::Finished)),
404        }
405    }
406
407    /// Push data, and transition state if needed
408    ///
409    /// This should correspond to the data ranges requested by the decoder
410    pub fn push_data(
411        self,
412        ranges: Vec<Range<u64>>,
413        data: Vec<Bytes>,
414    ) -> Result<Self, ParquetError> {
415        match self {
416            ParquetDecoderState::ReadingRowGroup {
417                mut remaining_row_groups,
418            } => {
419                // Push data to the RowGroupReaderBuilder
420                remaining_row_groups.push_data(ranges, data);
421                Ok(ParquetDecoderState::ReadingRowGroup {
422                    remaining_row_groups,
423                })
424            }
425            // it is ok to get data before we asked for it
426            ParquetDecoderState::DecodingRowGroup {
427                record_batch_reader,
428                mut remaining_row_groups,
429            } => {
430                remaining_row_groups.push_data(ranges, data);
431                Ok(ParquetDecoderState::DecodingRowGroup {
432                    record_batch_reader,
433                    remaining_row_groups,
434                })
435            }
436            ParquetDecoderState::Finished => Err(ParquetError::General(
437                "Cannot push data to a finished decoder".to_string(),
438            )),
439        }
440    }
441
442    /// How many bytes are currently buffered in the decoder?
443    fn buffered_bytes(&self) -> u64 {
444        match self {
445            ParquetDecoderState::ReadingRowGroup {
446                remaining_row_groups,
447            } => remaining_row_groups.buffered_bytes(),
448            ParquetDecoderState::DecodingRowGroup {
449                record_batch_reader: _,
450                remaining_row_groups,
451            } => remaining_row_groups.buffered_bytes(),
452            ParquetDecoderState::Finished => 0,
453        }
454    }
455}
456
457#[cfg(test)]
458mod test {
459    use super::*;
460    use crate::DecodeResult;
461    use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
462    use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
463    use crate::arrow::{ArrowWriter, ProjectionMask};
464    use crate::errors::ParquetError;
465    use crate::file::metadata::ParquetMetaDataPushDecoder;
466    use crate::file::properties::WriterProperties;
467    use arrow::compute::kernels::cmp::{gt, lt};
468    use arrow_array::cast::AsArray;
469    use arrow_array::types::Int64Type;
470    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
471    use arrow_select::concat::concat_batches;
472    use bytes::Bytes;
473    use std::fmt::Debug;
474    use std::ops::Range;
475    use std::sync::{Arc, LazyLock};
476
477    /// Test decoder struct size (as they are copied around on each transition, they
478    /// should not grow too large)
479    #[test]
480    fn test_decoder_size() {
481        assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
482    }
483
484    /// Decode the entire file at once, simulating a scenario where all data is
485    /// available in memory
486    #[test]
487    fn test_decoder_all_data() {
488        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
489            test_file_len(),
490            test_file_parquet_metadata(),
491        )
492        .unwrap()
493        .build()
494        .unwrap();
495
496        decoder
497            .push_range(test_file_range(), TEST_FILE_DATA.clone())
498            .unwrap();
499
500        let results = vec![
501            // first row group should be decoded without needing more data
502            expect_data(decoder.try_decode()),
503            // second row group should be decoded without needing more data
504            expect_data(decoder.try_decode()),
505        ];
506        expect_finished(decoder.try_decode());
507
508        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
509        // Check that the output matches the input batch
510        assert_eq!(all_output, *TEST_BATCH);
511    }
512
513    /// Decode the entire file incrementally, simulating a scenario where data is
514    /// fetched as needed
515    #[test]
516    fn test_decoder_incremental() {
517        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
518            test_file_len(),
519            test_file_parquet_metadata(),
520        )
521        .unwrap()
522        .build()
523        .unwrap();
524
525        let mut results = vec![];
526
527        // First row group, expect a single request
528        let ranges = expect_needs_data(decoder.try_decode());
529        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
530        push_ranges_to_decoder(&mut decoder, ranges);
531        // The decoder should currently only store the data it needs to decode the first row group
532        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
533        results.push(expect_data(decoder.try_decode()));
534        // the decoder should have consumed the data for the first row group and freed it
535        assert_eq!(decoder.buffered_bytes(), 0);
536
537        // Second row group,
538        let ranges = expect_needs_data(decoder.try_decode());
539        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
540        push_ranges_to_decoder(&mut decoder, ranges);
541        // The decoder should currently only store the data it needs to decode the second row group
542        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
543        results.push(expect_data(decoder.try_decode()));
544        // the decoder should have consumed the data for the second row group and freed it
545        assert_eq!(decoder.buffered_bytes(), 0);
546        expect_finished(decoder.try_decode());
547
548        // Check that the output matches the input batch
549        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
550        assert_eq!(all_output, *TEST_BATCH);
551    }
552
553    /// Decode the entire file incrementally, simulating partial reads
554    #[test]
555    fn test_decoder_partial() {
556        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
557            test_file_len(),
558            test_file_parquet_metadata(),
559        )
560        .unwrap()
561        .build()
562        .unwrap();
563
564        // First row group, expect a single request for all data needed to read "a" and "b"
565        let ranges = expect_needs_data(decoder.try_decode());
566        push_ranges_to_decoder(&mut decoder, ranges);
567
568        let batch1 = expect_data(decoder.try_decode());
569        let expected1 = TEST_BATCH.slice(0, 200);
570        assert_eq!(batch1, expected1);
571
572        // Second row group, this time provide the data in two steps
573        let ranges = expect_needs_data(decoder.try_decode());
574        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
575        assert!(!ranges1.is_empty());
576        assert!(!ranges2.is_empty());
577        // push first half to simulate partial read
578        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
579
580        // still expect more data
581        let ranges = expect_needs_data(decoder.try_decode());
582        assert_eq!(ranges, ranges2); // should be the remaining ranges
583        // push empty ranges should be a no-op
584        push_ranges_to_decoder(&mut decoder, vec![]);
585        let ranges = expect_needs_data(decoder.try_decode());
586        assert_eq!(ranges, ranges2); // should be the remaining ranges
587        push_ranges_to_decoder(&mut decoder, ranges);
588
589        let batch2 = expect_data(decoder.try_decode());
590        let expected2 = TEST_BATCH.slice(200, 200);
591        assert_eq!(batch2, expected2);
592
593        expect_finished(decoder.try_decode());
594    }
595
596    /// Decode multiple columns "a" and "b", expect that the decoder requests
597    /// only a single request per row group
598    #[test]
599    fn test_decoder_selection_does_one_request() {
600        let builder = ParquetPushDecoderBuilder::try_new_decoder(
601            test_file_len(),
602            test_file_parquet_metadata(),
603        )
604        .unwrap();
605
606        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
607
608        let mut decoder = builder
609            .with_projection(
610                ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
611            )
612            .build()
613            .unwrap();
614
615        // First row group, expect a single request for all data needed to read "a" and "b"
616        let ranges = expect_needs_data(decoder.try_decode());
617        push_ranges_to_decoder(&mut decoder, ranges);
618
619        let batch1 = expect_data(decoder.try_decode());
620        let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
621        assert_eq!(batch1, expected1);
622
623        // Second row group, similarly expect a single request for all data needed to read "a" and "b"
624        let ranges = expect_needs_data(decoder.try_decode());
625        push_ranges_to_decoder(&mut decoder, ranges);
626
627        let batch2 = expect_data(decoder.try_decode());
628        let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
629        assert_eq!(batch2, expected2);
630
631        expect_finished(decoder.try_decode());
632    }
633
634    /// Decode with a filter that requires multiple requests, but only provide part
635    /// of the data needed for the filter at a time simulating partial reads.
636    #[test]
637    fn test_decoder_single_filter_partial() {
638        let builder = ParquetPushDecoderBuilder::try_new_decoder(
639            test_file_len(),
640            test_file_parquet_metadata(),
641        )
642        .unwrap();
643
644        // Values in column "a" range 0..399
645        // First filter: "a" > 250  (nothing in Row Group 0, both data pages in Row Group 1)
646        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
647
648        // a > 250
649        let row_filter_a = ArrowPredicateFn::new(
650            // claim to use both a and b so we get two ranges requests for the filter pages
651            ProjectionMask::columns(&schema_descr, ["a", "b"]),
652            |batch: RecordBatch| {
653                let scalar_250 = Int64Array::new_scalar(250);
654                let column = batch.column(0).as_primitive::<Int64Type>();
655                gt(column, &scalar_250)
656            },
657        );
658
659        let mut decoder = builder
660            .with_projection(
661                // read only column "a" to test that filter pages are reused
662                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
663            )
664            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
665            .build()
666            .unwrap();
667
668        // First row group, evaluating filters
669        let ranges = expect_needs_data(decoder.try_decode());
670        // only provide half the ranges
671        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
672        assert!(!ranges1.is_empty());
673        assert!(!ranges2.is_empty());
674        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
675        // still expect more data
676        let ranges = expect_needs_data(decoder.try_decode());
677        assert_eq!(ranges, ranges2); // should be the remaining ranges
678        let ranges = expect_needs_data(decoder.try_decode());
679        assert_eq!(ranges, ranges2); // should be the remaining ranges
680        push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
681
682        // Since no rows in the first row group pass the filters, there is no
683        // additional requests to read data pages for "b" here
684
685        // Second row group
686        let ranges = expect_needs_data(decoder.try_decode());
687        push_ranges_to_decoder(&mut decoder, ranges);
688
689        let batch = expect_data(decoder.try_decode());
690        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
691        assert_eq!(batch, expected);
692
693        expect_finished(decoder.try_decode());
694    }
695
696    /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
697    #[test]
698    fn test_decoder_single_filter_and_row_selection() {
699        let builder = ParquetPushDecoderBuilder::try_new_decoder(
700            test_file_len(),
701            test_file_parquet_metadata(),
702        )
703        .unwrap();
704
705        // Values in column "a" range 0..399
706        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
707        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
708
709        // a > 250
710        let row_filter_a = ArrowPredicateFn::new(
711            ProjectionMask::columns(&schema_descr, ["a"]),
712            |batch: RecordBatch| {
713                let scalar_250 = Int64Array::new_scalar(250);
714                let column = batch.column(0).as_primitive::<Int64Type>();
715                gt(column, &scalar_250)
716            },
717        );
718
719        let mut decoder = builder
720            .with_projection(
721                // read only column "a" to test that filter pages are reused
722                ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
723            )
724            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
725            .with_row_selection(RowSelection::from(vec![
726                RowSelector::skip(200),   // skip first row group
727                RowSelector::select(100), // first 100 rows of second row group
728                RowSelector::skip(100),
729            ]))
730            .build()
731            .unwrap();
732
733        // expect the first row group to be filtered out (no filter is evaluated due to row selection)
734
735        // First row group, first filter (a > 250)
736        let ranges = expect_needs_data(decoder.try_decode());
737        push_ranges_to_decoder(&mut decoder, ranges);
738
739        // Second row group
740        let ranges = expect_needs_data(decoder.try_decode());
741        push_ranges_to_decoder(&mut decoder, ranges);
742
743        let batch = expect_data(decoder.try_decode());
744        let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
745        assert_eq!(batch, expected);
746
747        expect_finished(decoder.try_decode());
748    }
749
750    /// Decode with multiple filters that require multiple requests
751    #[test]
752    fn test_decoder_multi_filters() {
753        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
754        let builder = ParquetPushDecoderBuilder::try_new_decoder(
755            test_file_len(),
756            test_file_parquet_metadata(),
757        )
758        .unwrap();
759
760        // Values in column "a" range 0..399
761        // Values in column "b" range 400..799
762        // First filter: "a" > 175  (last data page in Row Group 0)
763        // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
764        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
765
766        // a > 175
767        let row_filter_a = ArrowPredicateFn::new(
768            ProjectionMask::columns(&schema_descr, ["a"]),
769            |batch: RecordBatch| {
770                let scalar_175 = Int64Array::new_scalar(175);
771                let column = batch.column(0).as_primitive::<Int64Type>();
772                gt(column, &scalar_175)
773            },
774        );
775
776        // b < 625
777        let row_filter_b = ArrowPredicateFn::new(
778            ProjectionMask::columns(&schema_descr, ["b"]),
779            |batch: RecordBatch| {
780                let scalar_625 = Int64Array::new_scalar(625);
781                let column = batch.column(0).as_primitive::<Int64Type>();
782                lt(column, &scalar_625)
783            },
784        );
785
786        let mut decoder = builder
787            .with_projection(
788                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
789            )
790            .with_row_filter(RowFilter::new(vec![
791                Box::new(row_filter_a),
792                Box::new(row_filter_b),
793            ]))
794            .build()
795            .unwrap();
796
797        // First row group, first filter (a > 175)
798        let ranges = expect_needs_data(decoder.try_decode());
799        push_ranges_to_decoder(&mut decoder, ranges);
800
801        // first row group, second filter (b < 625)
802        let ranges = expect_needs_data(decoder.try_decode());
803        push_ranges_to_decoder(&mut decoder, ranges);
804
805        // first row group, data pages for "c"
806        let ranges = expect_needs_data(decoder.try_decode());
807        push_ranges_to_decoder(&mut decoder, ranges);
808
809        // expect the first batch to be decoded: rows 176..199, column "c"
810        let batch1 = expect_data(decoder.try_decode());
811        let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
812        assert_eq!(batch1, expected1);
813
814        // Second row group, first filter (a > 175)
815        let ranges = expect_needs_data(decoder.try_decode());
816        push_ranges_to_decoder(&mut decoder, ranges);
817
818        // Second row group, second filter (b < 625)
819        let ranges = expect_needs_data(decoder.try_decode());
820        push_ranges_to_decoder(&mut decoder, ranges);
821
822        // Second row group, data pages for "c"
823        let ranges = expect_needs_data(decoder.try_decode());
824        push_ranges_to_decoder(&mut decoder, ranges);
825
826        // expect the second batch to be decoded: rows 200..224, column "c"
827        let batch2 = expect_data(decoder.try_decode());
828        let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
829        assert_eq!(batch2, expected2);
830
831        expect_finished(decoder.try_decode());
832    }
833
834    /// Decode with a filter that uses a column that is also projected, and expect
835    /// that the filter pages are reused (don't refetch them)
836    #[test]
837    fn test_decoder_reuses_filter_pages() {
838        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
839        let builder = ParquetPushDecoderBuilder::try_new_decoder(
840            test_file_len(),
841            test_file_parquet_metadata(),
842        )
843        .unwrap();
844
845        // Values in column "a" range 0..399
846        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
847        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
848
849        // a > 250
850        let row_filter_a = ArrowPredicateFn::new(
851            ProjectionMask::columns(&schema_descr, ["a"]),
852            |batch: RecordBatch| {
853                let scalar_250 = Int64Array::new_scalar(250);
854                let column = batch.column(0).as_primitive::<Int64Type>();
855                gt(column, &scalar_250)
856            },
857        );
858
859        let mut decoder = builder
860            .with_projection(
861                // read only column "a" to test that filter pages are reused
862                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
863            )
864            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
865            .build()
866            .unwrap();
867
868        // First row group, first filter (a > 175)
869        let ranges = expect_needs_data(decoder.try_decode());
870        push_ranges_to_decoder(&mut decoder, ranges);
871
872        // expect the first row group to be filtered out (no rows match)
873
874        // Second row group, first filter (a > 250)
875        let ranges = expect_needs_data(decoder.try_decode());
876        push_ranges_to_decoder(&mut decoder, ranges);
877
878        // expect that the second row group is decoded: rows 251..399, column "a"
879        // Note that the filter pages for "a" should be reused and no additional data
880        // should be requested
881        let batch = expect_data(decoder.try_decode());
882        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
883        assert_eq!(batch, expected);
884
885        expect_finished(decoder.try_decode());
886    }
887
888    #[test]
889    fn test_decoder_empty_filters() {
890        let builder = ParquetPushDecoderBuilder::try_new_decoder(
891            test_file_len(),
892            test_file_parquet_metadata(),
893        )
894        .unwrap();
895        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
896
897        // only read column "c", but with empty filters
898        let mut decoder = builder
899            .with_projection(
900                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
901            )
902            .with_row_filter(RowFilter::new(vec![
903                // empty filters should be ignored
904            ]))
905            .build()
906            .unwrap();
907
908        // First row group
909        let ranges = expect_needs_data(decoder.try_decode());
910        push_ranges_to_decoder(&mut decoder, ranges);
911
912        // expect the first batch to be decoded: rows 0..199, column "c"
913        let batch1 = expect_data(decoder.try_decode());
914        let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
915        assert_eq!(batch1, expected1);
916
917        // Second row group,
918        let ranges = expect_needs_data(decoder.try_decode());
919        push_ranges_to_decoder(&mut decoder, ranges);
920
921        // expect the second batch to be decoded: rows 200..399, column "c"
922        let batch2 = expect_data(decoder.try_decode());
923        let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
924
925        assert_eq!(batch2, expected2);
926
927        expect_finished(decoder.try_decode());
928    }
929
930    #[test]
931    fn test_decoder_offset_limit() {
932        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
933            test_file_len(),
934            test_file_parquet_metadata(),
935        )
936        .unwrap()
937        // skip entire first row group (200 rows) and first 25 rows of second row group
938        .with_offset(225)
939        // and limit to 20 rows
940        .with_limit(20)
941        .build()
942        .unwrap();
943
944        // First row group should be skipped,
945
946        // Second row group
947        let ranges = expect_needs_data(decoder.try_decode());
948        push_ranges_to_decoder(&mut decoder, ranges);
949
950        // expect the first and only batch to be decoded
951        let batch1 = expect_data(decoder.try_decode());
952        let expected1 = TEST_BATCH.slice(225, 20);
953        assert_eq!(batch1, expected1);
954
955        expect_finished(decoder.try_decode());
956    }
957
958    #[test]
959    fn test_decoder_row_group_selection() {
960        // take only the second row group
961        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
962            test_file_len(),
963            test_file_parquet_metadata(),
964        )
965        .unwrap()
966        .with_row_groups(vec![1])
967        .build()
968        .unwrap();
969
970        // First row group should be skipped,
971
972        // Second row group
973        let ranges = expect_needs_data(decoder.try_decode());
974        push_ranges_to_decoder(&mut decoder, ranges);
975
976        // expect the first and only batch to be decoded
977        let batch1 = expect_data(decoder.try_decode());
978        let expected1 = TEST_BATCH.slice(200, 200);
979        assert_eq!(batch1, expected1);
980
981        expect_finished(decoder.try_decode());
982    }
983
984    #[test]
985    fn test_decoder_row_selection() {
986        // take only the second row group
987        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
988            test_file_len(),
989            test_file_parquet_metadata(),
990        )
991        .unwrap()
992        .with_row_selection(RowSelection::from(vec![
993            RowSelector::skip(225),  // skip first row group and 25 rows of second])
994            RowSelector::select(20), // take 20 rows
995        ]))
996        .build()
997        .unwrap();
998
999        // First row group should be skipped,
1000
1001        // Second row group
1002        let ranges = expect_needs_data(decoder.try_decode());
1003        push_ranges_to_decoder(&mut decoder, ranges);
1004
1005        // expect the first ane only batch to be decoded
1006        let batch1 = expect_data(decoder.try_decode());
1007        let expected1 = TEST_BATCH.slice(225, 20);
1008        assert_eq!(batch1, expected1);
1009
1010        expect_finished(decoder.try_decode());
1011    }
1012
1013    /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1014    ///
1015    /// Note c is a different types (so the data page sizes will be different)
1016    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1017        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1018        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1019        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1020            if i % 2 == 0 {
1021                format!("string_{i}")
1022            } else {
1023                format!("A string larger than 12 bytes and thus not inlined {i}")
1024            }
1025        })));
1026
1027        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1028    });
1029
1030    /// Create a parquet file in memory for testing.
1031    ///
1032    /// See [`TEST_BATCH`] for the data in the file.
1033    ///
1034    /// Each column is written in 4 data pages, each with 100 rows, across 2
1035    /// row groups. Each column in each row group has two data pages.
1036    ///
1037    /// The data is split across row groups like this
1038    ///
1039    /// Column |   Values                | Data Page | Row Group
1040    /// -------|------------------------|-----------|-----------
1041    /// a      | 0..99                  | 1         | 0
1042    /// a      | 100..199               | 2         | 0
1043    /// a      | 200..299               | 1         | 1
1044    /// a      | 300..399               | 2         | 1
1045    ///
1046    /// b      | 400..499               | 1         | 0
1047    /// b      | 500..599               | 2         | 0
1048    /// b      | 600..699               | 1         | 1
1049    /// b      | 700..799               | 2         | 1
1050    ///
1051    /// c      | "string_0".."string_99"        | 1         | 0
1052    /// c      | "string_100".."string_199"     | 2         | 0
1053    /// c      | "string_200".."string_299"     | 1         | 1
1054    /// c      | "string_300".."string_399"     | 2         | 1
1055    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1056        let input_batch = &TEST_BATCH;
1057        let mut output = Vec::new();
1058
1059        let writer_options = WriterProperties::builder()
1060            .set_max_row_group_size(200)
1061            .set_data_page_row_count_limit(100)
1062            .build();
1063        let mut writer =
1064            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1065
1066        // since the limits are only enforced on batch boundaries, write the input
1067        // batch in chunks of 50
1068        let mut row_remain = input_batch.num_rows();
1069        while row_remain > 0 {
1070            let chunk_size = row_remain.min(50);
1071            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1072            writer.write(&chunk).unwrap();
1073            row_remain -= chunk_size;
1074        }
1075        writer.close().unwrap();
1076        Bytes::from(output)
1077    });
1078
1079    /// Return the length of [`TEST_FILE_DATA`], in bytes
1080    fn test_file_len() -> u64 {
1081        TEST_FILE_DATA.len() as u64
1082    }
1083
1084    /// Return a range that covers the entire [`TEST_FILE_DATA`]
1085    fn test_file_range() -> Range<u64> {
1086        0..test_file_len()
1087    }
1088
1089    /// Return a slice of the test file data from the given range
1090    pub fn test_file_slice(range: Range<u64>) -> Bytes {
1091        let start: usize = range.start.try_into().unwrap();
1092        let end: usize = range.end.try_into().unwrap();
1093        TEST_FILE_DATA.slice(start..end)
1094    }
1095
1096    /// return the metadata for the test file
1097    pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1098        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1099        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1100        let metadata = metadata_decoder.try_decode().unwrap();
1101        let DecodeResult::Data(metadata) = metadata else {
1102            panic!("Expected metadata to be decoded successfully");
1103        };
1104        Arc::new(metadata)
1105    }
1106
1107    /// Push the given ranges to the metadata decoder, simulating reading from a file
1108    fn push_ranges_to_metadata_decoder(
1109        metadata_decoder: &mut ParquetMetaDataPushDecoder,
1110        ranges: Vec<Range<u64>>,
1111    ) {
1112        let data = ranges
1113            .iter()
1114            .map(|range| test_file_slice(range.clone()))
1115            .collect::<Vec<_>>();
1116        metadata_decoder.push_ranges(ranges, data).unwrap();
1117    }
1118
1119    fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1120        let data = ranges
1121            .iter()
1122            .map(|range| test_file_slice(range.clone()))
1123            .collect::<Vec<_>>();
1124        decoder.push_ranges(ranges, data).unwrap();
1125    }
1126
1127    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1128    fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1129        match result.expect("Expected Ok(DecodeResult::Data(T))") {
1130            DecodeResult::Data(data) => data,
1131            result => panic!("Expected DecodeResult::Data, got {result:?}"),
1132        }
1133    }
1134
1135    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1136    fn expect_needs_data<T: Debug>(
1137        result: Result<DecodeResult<T>, ParquetError>,
1138    ) -> Vec<Range<u64>> {
1139        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1140            DecodeResult::NeedsData(ranges) => ranges,
1141            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1142        }
1143    }
1144
1145    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1146        match result.expect("Expected Ok(DecodeResult::Finished)") {
1147            DecodeResult::Finished => {}
1148            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1149        }
1150    }
1151}