Skip to main content

parquet/arrow/push_decoder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::RowGroupReaderBuilder;
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// #   let mut buffer = vec![];
64/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// #   writer.write(&batch).unwrap();
67/// #   writer.close().unwrap();
68/// #   Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// #    let start = range.start as usize;
73/// #     let end = range.end as usize;
74/// #    file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83///     ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84///       .unwrap()
85///       // Optionally configure the decoder, e.g. batch size
86///       .with_batch_size(1024)
87///       // Build the decoder
88///       .build()
89///       .unwrap();
90///
91///     // In a loop, ask the decoder what it needs next, and provide it with the required data
92///     loop {
93///         match decoder.try_decode().unwrap() {
94///             DecodeResult::NeedsData(ranges) => {
95///                 // The decoder needs more data. Fetch the data for the given ranges
96///                 let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97///                 // Push the data to the decoder
98///                 decoder.push_ranges(ranges, data).unwrap();
99///                 // After pushing the data, we can try to decode again on the next iteration
100///             }
101///             DecodeResult::Data(batch) => {
102///                 // Successfully decoded a batch of data
103///                 assert!(batch.num_rows() > 0);
104///             }
105///             DecodeResult::Finished => {
106///                 // The decoder has finished decoding exit the loop
107///                 break;
108///             }
109///         }
110///     }
111/// ```
112pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
113
114/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
115///
116/// There is no "input" for the push decoder by design (the idea is that
117/// the caller pushes data to the decoder as needed)..
118///
119/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
120/// which DO have an `input`. To support reusing the same builder code for
121/// all three types of decoders, we define this `NoInput` for the push decoder to
122/// denote in the type system there is no type.
123#[derive(Debug, Clone, Copy)]
124pub struct NoInput;
125
126/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
127/// more options that can be configured.
128impl ParquetPushDecoderBuilder {
129    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
130    ///
131    /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
132    ///
133    /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
134    ///
135    /// See example on [`ParquetPushDecoderBuilder`]
136    pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
137        Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
138    }
139
140    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
141    /// with the given reader options.
142    ///
143    /// This is similar to [`Self::try_new_decoder`] but allows configuring
144    /// options such as Arrow schema
145    pub fn try_new_decoder_with_options(
146        parquet_metadata: Arc<ParquetMetaData>,
147        arrow_reader_options: ArrowReaderOptions,
148    ) -> Result<Self, ParquetError> {
149        let arrow_reader_metadata =
150            ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
151        Ok(Self::new_with_metadata(arrow_reader_metadata))
152    }
153
154    /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
155    ///
156    /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
157    /// the Parquet metadata and reader options.
158    pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
159        Self::new_builder(NoInput, arrow_reader_metadata)
160    }
161
162    /// Create a [`ParquetPushDecoder`] with the configured options
163    pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
164        let Self {
165            input: NoInput,
166            metadata: parquet_metadata,
167            schema: _,
168            fields,
169            batch_size,
170            row_groups,
171            projection,
172            filter,
173            selection,
174            limit,
175            offset,
176            metrics,
177            row_selection_policy,
178            max_predicate_cache_size,
179        } = self;
180
181        // If no row groups were specified, read all of them
182        let row_groups =
183            row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
184
185        // Prepare to build RowGroup readers
186        let file_len = 0; // not used in push decoder
187        let buffers = PushBuffers::new(file_len);
188        let row_group_reader_builder = RowGroupReaderBuilder::new(
189            batch_size,
190            projection,
191            Arc::clone(&parquet_metadata),
192            fields,
193            filter,
194            limit,
195            offset,
196            metrics,
197            max_predicate_cache_size,
198            buffers,
199            row_selection_policy,
200        );
201
202        // Initialize the decoder with the configured options
203        let remaining_row_groups = RemainingRowGroups::new(
204            parquet_metadata,
205            row_groups,
206            selection,
207            row_group_reader_builder,
208        );
209
210        Ok(ParquetPushDecoder {
211            state: ParquetDecoderState::ReadingRowGroup {
212                remaining_row_groups: Box::new(remaining_row_groups),
213            },
214        })
215    }
216}
217
218/// A push based Parquet Decoder
219///
220/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
221///
222/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
223/// underlying reader for performing IO, and thus offers fine grained control
224/// over how data is fetched and decoded.
225///
226/// When more data is needed to make progress, instead of reading data directly
227/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
228/// are needed. Once the caller provides the requested ranges via
229/// [`Self::push_ranges`], they try to decode again by calling
230/// [`Self::try_decode`].
231///
232/// The decoder's internal state tracks what has been already decoded and what
233/// is needed next.
234#[derive(Debug)]
235pub struct ParquetPushDecoder {
236    /// The inner state.
237    ///
238    /// This state is consumed on every transition and a new state is produced
239    /// so the Rust compiler can ensure that the state is always valid and
240    /// transitions are not missed.
241    state: ParquetDecoderState,
242}
243
244impl ParquetPushDecoder {
245    /// Attempt to decode the next batch of data, or return what data is needed
246    ///
247    /// The the decoder communicates the next state with a [`DecodeResult`]
248    ///
249    /// See full example in [`ParquetPushDecoderBuilder`]
250    ///
251    /// ```no_run
252    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
253    /// use parquet::DecodeResult;
254    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
255    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
256    /// let mut decoder = get_decoder();
257    /// loop {
258    ///    match decoder.try_decode().unwrap() {
259    ///       DecodeResult::NeedsData(ranges) => {
260    ///         // The decoder needs more data. Fetch the data for the given ranges
261    ///         // call decoder.push_ranges(ranges, data) and call again
262    ///         push_data(&mut decoder, ranges);
263    ///       }
264    ///       DecodeResult::Data(batch) => {
265    ///         // Successfully decoded the next batch of data
266    ///         println!("Got batch with {} rows", batch.num_rows());
267    ///       }
268    ///       DecodeResult::Finished => {
269    ///         // The decoder has finished decoding all data
270    ///         break;
271    ///       }
272    ///    }
273    /// }
274    ///```
275    pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
276        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
277        let (new_state, decode_result) = current_state.try_next_batch()?;
278        self.state = new_state;
279        Ok(decode_result)
280    }
281
282    /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
283    /// return what data is needed to produce it.
284    ///
285    /// This API can be used to get a reader for decoding the next set of
286    /// RecordBatches while proceeding to begin fetching data for the set (e.g
287    /// row group)
288    ///
289    /// Example
290    /// ```no_run
291    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
292    /// use parquet::DecodeResult;
293    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
294    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
295    /// let mut decoder = get_decoder();
296    /// loop {
297    ///    match decoder.try_next_reader().unwrap() {
298    ///       DecodeResult::NeedsData(ranges) => {
299    ///         // The decoder needs more data. Fetch the data for the given ranges
300    ///         // call decoder.push_ranges(ranges, data) and call again
301    ///         push_data(&mut decoder, ranges);
302    ///       }
303    ///       DecodeResult::Data(reader) => {
304    ///          // spawn a thread to read the batches in parallel
305    ///          // with fetching the next row group / data
306    ///          std::thread::spawn(move || {
307    ///            for batch in reader {
308    ///              let batch = batch.unwrap();
309    ///              println!("Got batch with {} rows", batch.num_rows());
310    ///            }
311    ///         });
312    ///       }
313    ///       DecodeResult::Finished => {
314    ///         // The decoder has finished decoding all data
315    ///         break;
316    ///       }
317    ///    }
318    /// }
319    ///```
320    pub fn try_next_reader(
321        &mut self,
322    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
323        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
324        let (new_state, decode_result) = current_state.try_next_reader()?;
325        self.state = new_state;
326        Ok(decode_result)
327    }
328
329    /// Push data into the decoder for processing
330    ///
331    /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
332    /// single range of data.
333    ///
334    /// Note this can be the entire file or just a part of it. If it is part of the file,
335    /// the ranges should correspond to the data ranges requested by the decoder.
336    ///
337    /// See example in [`ParquetPushDecoderBuilder`]
338    pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
339        self.push_ranges(vec![range], vec![data])
340    }
341
342    /// Push data into the decoder for processing
343    ///
344    /// This should correspond to the data ranges requested by the decoder
345    pub fn push_ranges(
346        &mut self,
347        ranges: Vec<Range<u64>>,
348        data: Vec<Bytes>,
349    ) -> Result<(), ParquetError> {
350        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
351        self.state = current_state.push_data(ranges, data)?;
352        Ok(())
353    }
354
355    /// Returns the total number of buffered bytes in the decoder
356    ///
357    /// This is the sum of the size of all [`Bytes`] that has been pushed to the
358    /// decoder but not yet consumed.
359    ///
360    /// Note that this does not include any overhead of the internal data
361    /// structures and that since [`Bytes`] are ref counted memory, this may not
362    /// reflect additional memory usage.
363    ///
364    /// This can be used to monitor memory usage of the decoder.
365    pub fn buffered_bytes(&self) -> u64 {
366        self.state.buffered_bytes()
367    }
368
369    /// Clear any staged byte ranges currently buffered for future decode work.
370    ///
371    /// This clears byte ranges still owned by the decoder's internal
372    /// `PushBuffers`. It does not affect any data that has already been handed
373    /// off to an active [`ParquetRecordBatchReader`].
374    pub fn clear_all_ranges(&mut self) {
375        self.state.clear_all_ranges();
376    }
377}
378
379/// Internal state machine for the [`ParquetPushDecoder`]
380#[derive(Debug)]
381enum ParquetDecoderState {
382    /// Waiting for data needed to decode the next RowGroup
383    ReadingRowGroup {
384        remaining_row_groups: Box<RemainingRowGroups>,
385    },
386    /// The decoder is actively decoding a RowGroup
387    DecodingRowGroup {
388        /// Current active reader
389        record_batch_reader: Box<ParquetRecordBatchReader>,
390        remaining_row_groups: Box<RemainingRowGroups>,
391    },
392    /// The decoder has finished processing all data
393    Finished,
394}
395
396impl ParquetDecoderState {
397    /// If actively reading a RowGroup, return the currently active
398    /// ParquetRecordBatchReader and advance to the next group.
399    fn try_next_reader(
400        self,
401    ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
402        let mut current_state = self;
403        loop {
404            let (next_state, decode_result) = current_state.transition()?;
405            // if more data is needed to transition, can't proceed further without it
406            match decode_result {
407                DecodeResult::NeedsData(ranges) => {
408                    return Ok((next_state, DecodeResult::NeedsData(ranges)));
409                }
410                // act next based on state
411                DecodeResult::Data(()) | DecodeResult::Finished => {}
412            }
413            match next_state {
414                // not ready to read yet, continue transitioning
415                Self::ReadingRowGroup { .. } => current_state = next_state,
416                // have a reader ready, so return it and set ourself to ReadingRowGroup
417                Self::DecodingRowGroup {
418                    record_batch_reader,
419                    remaining_row_groups,
420                } => {
421                    let result = DecodeResult::Data(*record_batch_reader);
422                    let next_state = Self::ReadingRowGroup {
423                        remaining_row_groups,
424                    };
425                    return Ok((next_state, result));
426                }
427                Self::Finished => {
428                    return Ok((Self::Finished, DecodeResult::Finished));
429                }
430            }
431        }
432    }
433
434    /// Current state --> next state + output
435    ///
436    /// This function is called to get the next RecordBatch
437    ///
438    /// This structure is used to reduce the indentation level of the main loop
439    /// in try_build
440    fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
441        let mut current_state = self;
442        loop {
443            let (new_state, decode_result) = current_state.transition()?;
444            // if more data is needed to transition, can't proceed further without it
445            match decode_result {
446                DecodeResult::NeedsData(ranges) => {
447                    return Ok((new_state, DecodeResult::NeedsData(ranges)));
448                }
449                // act next based on state
450                DecodeResult::Data(()) | DecodeResult::Finished => {}
451            }
452            match new_state {
453                // not ready to read yet, continue transitioning
454                Self::ReadingRowGroup { .. } => current_state = new_state,
455                // have a reader ready, so decode the next batch
456                Self::DecodingRowGroup {
457                    mut record_batch_reader,
458                    remaining_row_groups,
459                } => {
460                    match record_batch_reader.next() {
461                        // Successfully decoded a batch, return it
462                        Some(Ok(batch)) => {
463                            let result = DecodeResult::Data(batch);
464                            let next_state = Self::DecodingRowGroup {
465                                record_batch_reader,
466                                remaining_row_groups,
467                            };
468                            return Ok((next_state, result));
469                        }
470                        // No more batches in this row group, move to the next row group
471                        None => {
472                            current_state = Self::ReadingRowGroup {
473                                remaining_row_groups,
474                            }
475                        }
476                        // some error occurred while decoding, so return that
477                        Some(Err(e)) => {
478                            // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
479                            return Err(ParquetError::ArrowError(e.to_string()));
480                        }
481                    }
482                }
483                Self::Finished => {
484                    return Ok((Self::Finished, DecodeResult::Finished));
485                }
486            }
487        }
488    }
489
490    /// Transition to the next state with a reader (data can be produced), if not end of stream
491    ///
492    /// This function is called in a loop until the decoder is ready to return
493    /// data (has the required pages buffered) or is finished.
494    fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
495        // result returned when there is data ready
496        let data_ready = DecodeResult::Data(());
497        match self {
498            Self::ReadingRowGroup {
499                mut remaining_row_groups,
500            } => {
501                match remaining_row_groups.try_next_reader()? {
502                    // If we have a next reader, we can transition to decoding it
503                    DecodeResult::Data(record_batch_reader) => {
504                        // Transition to decoding the row group
505                        Ok((
506                            Self::DecodingRowGroup {
507                                record_batch_reader: Box::new(record_batch_reader),
508                                remaining_row_groups,
509                            },
510                            data_ready,
511                        ))
512                    }
513                    DecodeResult::NeedsData(ranges) => {
514                        // If we need more data, we return the ranges needed and stay in Reading
515                        // RowGroup state
516                        Ok((
517                            Self::ReadingRowGroup {
518                                remaining_row_groups,
519                            },
520                            DecodeResult::NeedsData(ranges),
521                        ))
522                    }
523                    // If there are no more readers, we are finished
524                    DecodeResult::Finished => {
525                        // No more row groups to read, we are finished
526                        Ok((Self::Finished, DecodeResult::Finished))
527                    }
528                }
529            }
530            // if we are already in DecodingRowGroup, just return data ready
531            Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
532            // if finished, just return finished
533            Self::Finished => Ok((self, DecodeResult::Finished)),
534        }
535    }
536
537    /// Push data, and transition state if needed
538    ///
539    /// This should correspond to the data ranges requested by the decoder
540    pub fn push_data(
541        self,
542        ranges: Vec<Range<u64>>,
543        data: Vec<Bytes>,
544    ) -> Result<Self, ParquetError> {
545        match self {
546            ParquetDecoderState::ReadingRowGroup {
547                mut remaining_row_groups,
548            } => {
549                // Push data to the RowGroupReaderBuilder
550                remaining_row_groups.push_data(ranges, data);
551                Ok(ParquetDecoderState::ReadingRowGroup {
552                    remaining_row_groups,
553                })
554            }
555            // it is ok to get data before we asked for it
556            ParquetDecoderState::DecodingRowGroup {
557                record_batch_reader,
558                mut remaining_row_groups,
559            } => {
560                remaining_row_groups.push_data(ranges, data);
561                Ok(ParquetDecoderState::DecodingRowGroup {
562                    record_batch_reader,
563                    remaining_row_groups,
564                })
565            }
566            ParquetDecoderState::Finished => Err(ParquetError::General(
567                "Cannot push data to a finished decoder".to_string(),
568            )),
569        }
570    }
571
572    /// How many bytes are currently buffered in the decoder?
573    fn buffered_bytes(&self) -> u64 {
574        match self {
575            ParquetDecoderState::ReadingRowGroup {
576                remaining_row_groups,
577            } => remaining_row_groups.buffered_bytes(),
578            ParquetDecoderState::DecodingRowGroup {
579                record_batch_reader: _,
580                remaining_row_groups,
581            } => remaining_row_groups.buffered_bytes(),
582            ParquetDecoderState::Finished => 0,
583        }
584    }
585
586    /// Clear any staged ranges currently buffered in the decoder.
587    fn clear_all_ranges(&mut self) {
588        match self {
589            ParquetDecoderState::ReadingRowGroup {
590                remaining_row_groups,
591            } => remaining_row_groups.clear_all_ranges(),
592            ParquetDecoderState::DecodingRowGroup {
593                record_batch_reader: _,
594                remaining_row_groups,
595            } => remaining_row_groups.clear_all_ranges(),
596            ParquetDecoderState::Finished => {}
597        }
598    }
599}
600
601#[cfg(test)]
602mod test {
603    use super::*;
604    use crate::DecodeResult;
605    use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
606    use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
607    use crate::arrow::{ArrowWriter, ProjectionMask};
608    use crate::errors::ParquetError;
609    use crate::file::metadata::ParquetMetaDataPushDecoder;
610    use crate::file::properties::WriterProperties;
611    use arrow::compute::kernels::cmp::{gt, lt};
612    use arrow_array::cast::AsArray;
613    use arrow_array::types::Int64Type;
614    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
615    use arrow_select::concat::concat_batches;
616    use bytes::Bytes;
617    use std::fmt::Debug;
618    use std::ops::Range;
619    use std::sync::{Arc, LazyLock};
620
621    /// Test decoder struct size (as they are copied around on each transition, they
622    /// should not grow too large)
623    #[test]
624    fn test_decoder_size() {
625        assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
626    }
627
628    /// Decode the entire file at once, simulating a scenario where all data is
629    /// available in memory
630    #[test]
631    fn test_decoder_all_data() {
632        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
633            .unwrap()
634            .build()
635            .unwrap();
636
637        decoder
638            .push_range(test_file_range(), TEST_FILE_DATA.clone())
639            .unwrap();
640
641        let results = vec![
642            // first row group should be decoded without needing more data
643            expect_data(decoder.try_decode()),
644            // second row group should be decoded without needing more data
645            expect_data(decoder.try_decode()),
646        ];
647        expect_finished(decoder.try_decode());
648
649        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
650        // Check that the output matches the input batch
651        assert_eq!(all_output, *TEST_BATCH);
652    }
653
654    /// Decode the entire file incrementally, simulating a scenario where data is
655    /// fetched as needed
656    #[test]
657    fn test_decoder_incremental() {
658        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
659            .unwrap()
660            .build()
661            .unwrap();
662
663        let mut results = vec![];
664
665        // First row group, expect a single request
666        let ranges = expect_needs_data(decoder.try_decode());
667        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
668        push_ranges_to_decoder(&mut decoder, ranges);
669        // The decoder should currently only store the data it needs to decode the first row group
670        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
671        results.push(expect_data(decoder.try_decode()));
672        // the decoder should have consumed the data for the first row group and freed it
673        assert_eq!(decoder.buffered_bytes(), 0);
674
675        // Second row group,
676        let ranges = expect_needs_data(decoder.try_decode());
677        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
678        push_ranges_to_decoder(&mut decoder, ranges);
679        // The decoder should currently only store the data it needs to decode the second row group
680        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
681        results.push(expect_data(decoder.try_decode()));
682        // the decoder should have consumed the data for the second row group and freed it
683        assert_eq!(decoder.buffered_bytes(), 0);
684        expect_finished(decoder.try_decode());
685
686        // Check that the output matches the input batch
687        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
688        assert_eq!(all_output, *TEST_BATCH);
689    }
690
691    /// Releasing staged ranges should free speculative buffers without affecting
692    /// the active row group reader.
693    #[test]
694    fn test_decoder_clear_all_ranges() {
695        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
696            .unwrap()
697            .with_batch_size(100)
698            .build()
699            .unwrap();
700
701        decoder
702            .push_range(test_file_range(), TEST_FILE_DATA.clone())
703            .unwrap();
704        assert_eq!(decoder.buffered_bytes(), test_file_len());
705
706        // The current row group reader is built from the prefetched bytes, but
707        // the speculative full-file range remains staged in the decoder.
708        let batch1 = expect_data(decoder.try_decode());
709        assert_eq!(batch1, TEST_BATCH.slice(0, 100));
710        assert_eq!(decoder.buffered_bytes(), test_file_len());
711
712        // All of the buffer is released
713        decoder.clear_all_ranges();
714        assert_eq!(decoder.buffered_bytes(), 0);
715
716        // The active reader still owns the current row group's bytes, so it can
717        // continue decoding without consulting PushBuffers.
718        let batch2 = expect_data(decoder.try_decode());
719        assert_eq!(batch2, TEST_BATCH.slice(100, 100));
720        assert_eq!(decoder.buffered_bytes(), 0);
721
722        // Moving to the next row group now requires the decoder to ask for data
723        // again because the staged speculative ranges were released.
724        let ranges = expect_needs_data(decoder.try_decode());
725        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
726        push_ranges_to_decoder(&mut decoder, ranges);
727        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
728
729        let batch3 = expect_data(decoder.try_decode());
730        assert_eq!(batch3, TEST_BATCH.slice(200, 100));
731        assert_eq!(decoder.buffered_bytes(), 0);
732
733        let batch4 = expect_data(decoder.try_decode());
734        assert_eq!(batch4, TEST_BATCH.slice(300, 100));
735        assert_eq!(decoder.buffered_bytes(), 0);
736
737        expect_finished(decoder.try_decode());
738    }
739
740    /// Decode the entire file incrementally, simulating partial reads
741    #[test]
742    fn test_decoder_partial() {
743        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
744            .unwrap()
745            .build()
746            .unwrap();
747
748        // First row group, expect a single request for all data needed to read "a" and "b"
749        let ranges = expect_needs_data(decoder.try_decode());
750        push_ranges_to_decoder(&mut decoder, ranges);
751
752        let batch1 = expect_data(decoder.try_decode());
753        let expected1 = TEST_BATCH.slice(0, 200);
754        assert_eq!(batch1, expected1);
755
756        // Second row group, this time provide the data in two steps
757        let ranges = expect_needs_data(decoder.try_decode());
758        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
759        assert!(!ranges1.is_empty());
760        assert!(!ranges2.is_empty());
761        // push first half to simulate partial read
762        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
763
764        // still expect more data
765        let ranges = expect_needs_data(decoder.try_decode());
766        assert_eq!(ranges, ranges2); // should be the remaining ranges
767        // push empty ranges should be a no-op
768        push_ranges_to_decoder(&mut decoder, vec![]);
769        let ranges = expect_needs_data(decoder.try_decode());
770        assert_eq!(ranges, ranges2); // should be the remaining ranges
771        push_ranges_to_decoder(&mut decoder, ranges);
772
773        let batch2 = expect_data(decoder.try_decode());
774        let expected2 = TEST_BATCH.slice(200, 200);
775        assert_eq!(batch2, expected2);
776
777        expect_finished(decoder.try_decode());
778    }
779
780    /// Decode multiple columns "a" and "b", expect that the decoder requests
781    /// only a single request per row group
782    #[test]
783    fn test_decoder_selection_does_one_request() {
784        let builder =
785            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
786
787        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
788
789        let mut decoder = builder
790            .with_projection(
791                ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
792            )
793            .build()
794            .unwrap();
795
796        // First row group, expect a single request for all data needed to read "a" and "b"
797        let ranges = expect_needs_data(decoder.try_decode());
798        push_ranges_to_decoder(&mut decoder, ranges);
799
800        let batch1 = expect_data(decoder.try_decode());
801        let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
802        assert_eq!(batch1, expected1);
803
804        // Second row group, similarly expect a single request for all data needed to read "a" and "b"
805        let ranges = expect_needs_data(decoder.try_decode());
806        push_ranges_to_decoder(&mut decoder, ranges);
807
808        let batch2 = expect_data(decoder.try_decode());
809        let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
810        assert_eq!(batch2, expected2);
811
812        expect_finished(decoder.try_decode());
813    }
814
815    /// Decode with a filter that requires multiple requests, but only provide part
816    /// of the data needed for the filter at a time simulating partial reads.
817    #[test]
818    fn test_decoder_single_filter_partial() {
819        let builder =
820            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
821
822        // Values in column "a" range 0..399
823        // First filter: "a" > 250  (nothing in Row Group 0, both data pages in Row Group 1)
824        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
825
826        // a > 250
827        let row_filter_a = ArrowPredicateFn::new(
828            // claim to use both a and b so we get two ranges requests for the filter pages
829            ProjectionMask::columns(&schema_descr, ["a", "b"]),
830            |batch: RecordBatch| {
831                let scalar_250 = Int64Array::new_scalar(250);
832                let column = batch.column(0).as_primitive::<Int64Type>();
833                gt(column, &scalar_250)
834            },
835        );
836
837        let mut decoder = builder
838            .with_projection(
839                // read only column "a" to test that filter pages are reused
840                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
841            )
842            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
843            .build()
844            .unwrap();
845
846        // First row group, evaluating filters
847        let ranges = expect_needs_data(decoder.try_decode());
848        // only provide half the ranges
849        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
850        assert!(!ranges1.is_empty());
851        assert!(!ranges2.is_empty());
852        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
853        // still expect more data
854        let ranges = expect_needs_data(decoder.try_decode());
855        assert_eq!(ranges, ranges2); // should be the remaining ranges
856        let ranges = expect_needs_data(decoder.try_decode());
857        assert_eq!(ranges, ranges2); // should be the remaining ranges
858        push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
859
860        // Since no rows in the first row group pass the filters, there is no
861        // additional requests to read data pages for "b" here
862
863        // Second row group
864        let ranges = expect_needs_data(decoder.try_decode());
865        push_ranges_to_decoder(&mut decoder, ranges);
866
867        let batch = expect_data(decoder.try_decode());
868        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
869        assert_eq!(batch, expected);
870
871        expect_finished(decoder.try_decode());
872    }
873
874    /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
875    #[test]
876    fn test_decoder_single_filter_and_row_selection() {
877        let builder =
878            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
879
880        // Values in column "a" range 0..399
881        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
882        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
883
884        // a > 250
885        let row_filter_a = ArrowPredicateFn::new(
886            ProjectionMask::columns(&schema_descr, ["a"]),
887            |batch: RecordBatch| {
888                let scalar_250 = Int64Array::new_scalar(250);
889                let column = batch.column(0).as_primitive::<Int64Type>();
890                gt(column, &scalar_250)
891            },
892        );
893
894        let mut decoder = builder
895            .with_projection(
896                // read only column "a" to test that filter pages are reused
897                ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
898            )
899            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
900            .with_row_selection(RowSelection::from(vec![
901                RowSelector::skip(200),   // skip first row group
902                RowSelector::select(100), // first 100 rows of second row group
903                RowSelector::skip(100),
904            ]))
905            .build()
906            .unwrap();
907
908        // expect the first row group to be filtered out (no filter is evaluated due to row selection)
909
910        // First row group, first filter (a > 250)
911        let ranges = expect_needs_data(decoder.try_decode());
912        push_ranges_to_decoder(&mut decoder, ranges);
913
914        // Second row group
915        let ranges = expect_needs_data(decoder.try_decode());
916        push_ranges_to_decoder(&mut decoder, ranges);
917
918        let batch = expect_data(decoder.try_decode());
919        let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
920        assert_eq!(batch, expected);
921
922        expect_finished(decoder.try_decode());
923    }
924
925    /// Decode with multiple filters that require multiple requests
926    #[test]
927    fn test_decoder_multi_filters() {
928        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
929        let builder =
930            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
931
932        // Values in column "a" range 0..399
933        // Values in column "b" range 400..799
934        // First filter: "a" > 175  (last data page in Row Group 0)
935        // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
936        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
937
938        // a > 175
939        let row_filter_a = ArrowPredicateFn::new(
940            ProjectionMask::columns(&schema_descr, ["a"]),
941            |batch: RecordBatch| {
942                let scalar_175 = Int64Array::new_scalar(175);
943                let column = batch.column(0).as_primitive::<Int64Type>();
944                gt(column, &scalar_175)
945            },
946        );
947
948        // b < 625
949        let row_filter_b = ArrowPredicateFn::new(
950            ProjectionMask::columns(&schema_descr, ["b"]),
951            |batch: RecordBatch| {
952                let scalar_625 = Int64Array::new_scalar(625);
953                let column = batch.column(0).as_primitive::<Int64Type>();
954                lt(column, &scalar_625)
955            },
956        );
957
958        let mut decoder = builder
959            .with_projection(
960                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
961            )
962            .with_row_filter(RowFilter::new(vec![
963                Box::new(row_filter_a),
964                Box::new(row_filter_b),
965            ]))
966            .build()
967            .unwrap();
968
969        // First row group, first filter (a > 175)
970        let ranges = expect_needs_data(decoder.try_decode());
971        push_ranges_to_decoder(&mut decoder, ranges);
972
973        // first row group, second filter (b < 625)
974        let ranges = expect_needs_data(decoder.try_decode());
975        push_ranges_to_decoder(&mut decoder, ranges);
976
977        // first row group, data pages for "c"
978        let ranges = expect_needs_data(decoder.try_decode());
979        push_ranges_to_decoder(&mut decoder, ranges);
980
981        // expect the first batch to be decoded: rows 176..199, column "c"
982        let batch1 = expect_data(decoder.try_decode());
983        let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
984        assert_eq!(batch1, expected1);
985
986        // Second row group, first filter (a > 175)
987        let ranges = expect_needs_data(decoder.try_decode());
988        push_ranges_to_decoder(&mut decoder, ranges);
989
990        // Second row group, second filter (b < 625)
991        let ranges = expect_needs_data(decoder.try_decode());
992        push_ranges_to_decoder(&mut decoder, ranges);
993
994        // Second row group, data pages for "c"
995        let ranges = expect_needs_data(decoder.try_decode());
996        push_ranges_to_decoder(&mut decoder, ranges);
997
998        // expect the second batch to be decoded: rows 200..224, column "c"
999        let batch2 = expect_data(decoder.try_decode());
1000        let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
1001        assert_eq!(batch2, expected2);
1002
1003        expect_finished(decoder.try_decode());
1004    }
1005
1006    /// Decode with a filter that uses a column that is also projected, and expect
1007    /// that the filter pages are reused (don't refetch them)
1008    #[test]
1009    fn test_decoder_reuses_filter_pages() {
1010        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1011        let builder =
1012            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1013
1014        // Values in column "a" range 0..399
1015        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
1016        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1017
1018        // a > 250
1019        let row_filter_a = ArrowPredicateFn::new(
1020            ProjectionMask::columns(&schema_descr, ["a"]),
1021            |batch: RecordBatch| {
1022                let scalar_250 = Int64Array::new_scalar(250);
1023                let column = batch.column(0).as_primitive::<Int64Type>();
1024                gt(column, &scalar_250)
1025            },
1026        );
1027
1028        let mut decoder = builder
1029            .with_projection(
1030                // read only column "a" to test that filter pages are reused
1031                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1032            )
1033            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1034            .build()
1035            .unwrap();
1036
1037        // First row group, first filter (a > 175)
1038        let ranges = expect_needs_data(decoder.try_decode());
1039        push_ranges_to_decoder(&mut decoder, ranges);
1040
1041        // expect the first row group to be filtered out (no rows match)
1042
1043        // Second row group, first filter (a > 250)
1044        let ranges = expect_needs_data(decoder.try_decode());
1045        push_ranges_to_decoder(&mut decoder, ranges);
1046
1047        // expect that the second row group is decoded: rows 251..399, column "a"
1048        // Note that the filter pages for "a" should be reused and no additional data
1049        // should be requested
1050        let batch = expect_data(decoder.try_decode());
1051        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1052        assert_eq!(batch, expected);
1053
1054        expect_finished(decoder.try_decode());
1055    }
1056
1057    #[test]
1058    fn test_decoder_empty_filters() {
1059        let builder =
1060            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1061        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1062
1063        // only read column "c", but with empty filters
1064        let mut decoder = builder
1065            .with_projection(
1066                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1067            )
1068            .with_row_filter(RowFilter::new(vec![
1069                // empty filters should be ignored
1070            ]))
1071            .build()
1072            .unwrap();
1073
1074        // First row group
1075        let ranges = expect_needs_data(decoder.try_decode());
1076        push_ranges_to_decoder(&mut decoder, ranges);
1077
1078        // expect the first batch to be decoded: rows 0..199, column "c"
1079        let batch1 = expect_data(decoder.try_decode());
1080        let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1081        assert_eq!(batch1, expected1);
1082
1083        // Second row group,
1084        let ranges = expect_needs_data(decoder.try_decode());
1085        push_ranges_to_decoder(&mut decoder, ranges);
1086
1087        // expect the second batch to be decoded: rows 200..399, column "c"
1088        let batch2 = expect_data(decoder.try_decode());
1089        let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1090
1091        assert_eq!(batch2, expected2);
1092
1093        expect_finished(decoder.try_decode());
1094    }
1095
1096    #[test]
1097    fn test_decoder_offset_limit() {
1098        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1099            .unwrap()
1100            // skip entire first row group (200 rows) and first 25 rows of second row group
1101            .with_offset(225)
1102            // and limit to 20 rows
1103            .with_limit(20)
1104            .build()
1105            .unwrap();
1106
1107        // First row group should be skipped,
1108
1109        // Second row group
1110        let ranges = expect_needs_data(decoder.try_decode());
1111        push_ranges_to_decoder(&mut decoder, ranges);
1112
1113        // expect the first and only batch to be decoded
1114        let batch1 = expect_data(decoder.try_decode());
1115        let expected1 = TEST_BATCH.slice(225, 20);
1116        assert_eq!(batch1, expected1);
1117
1118        expect_finished(decoder.try_decode());
1119    }
1120
1121    #[test]
1122    fn test_decoder_row_group_selection() {
1123        // take only the second row group
1124        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1125            .unwrap()
1126            .with_row_groups(vec![1])
1127            .build()
1128            .unwrap();
1129
1130        // First row group should be skipped,
1131
1132        // Second row group
1133        let ranges = expect_needs_data(decoder.try_decode());
1134        push_ranges_to_decoder(&mut decoder, ranges);
1135
1136        // expect the first and only batch to be decoded
1137        let batch1 = expect_data(decoder.try_decode());
1138        let expected1 = TEST_BATCH.slice(200, 200);
1139        assert_eq!(batch1, expected1);
1140
1141        expect_finished(decoder.try_decode());
1142    }
1143
1144    #[test]
1145    fn test_decoder_row_selection() {
1146        // take only the second row group
1147        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1148            .unwrap()
1149            .with_row_selection(RowSelection::from(vec![
1150                RowSelector::skip(225),  // skip first row group and 25 rows of second])
1151                RowSelector::select(20), // take 20 rows
1152            ]))
1153            .build()
1154            .unwrap();
1155
1156        // First row group should be skipped,
1157
1158        // Second row group
1159        let ranges = expect_needs_data(decoder.try_decode());
1160        push_ranges_to_decoder(&mut decoder, ranges);
1161
1162        // expect the first ane only batch to be decoded
1163        let batch1 = expect_data(decoder.try_decode());
1164        let expected1 = TEST_BATCH.slice(225, 20);
1165        assert_eq!(batch1, expected1);
1166
1167        expect_finished(decoder.try_decode());
1168    }
1169
1170    /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1171    ///
1172    /// Note c is a different types (so the data page sizes will be different)
1173    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1174        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1175        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1176        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1177            if i % 2 == 0 {
1178                format!("string_{i}")
1179            } else {
1180                format!("A string larger than 12 bytes and thus not inlined {i}")
1181            }
1182        })));
1183
1184        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1185    });
1186
1187    /// Create a parquet file in memory for testing.
1188    ///
1189    /// See [`TEST_BATCH`] for the data in the file.
1190    ///
1191    /// Each column is written in 4 data pages, each with 100 rows, across 2
1192    /// row groups. Each column in each row group has two data pages.
1193    ///
1194    /// The data is split across row groups like this
1195    ///
1196    /// Column |   Values                | Data Page | Row Group
1197    /// -------|------------------------|-----------|-----------
1198    /// a      | 0..99                  | 1         | 0
1199    /// a      | 100..199               | 2         | 0
1200    /// a      | 200..299               | 1         | 1
1201    /// a      | 300..399               | 2         | 1
1202    ///
1203    /// b      | 400..499               | 1         | 0
1204    /// b      | 500..599               | 2         | 0
1205    /// b      | 600..699               | 1         | 1
1206    /// b      | 700..799               | 2         | 1
1207    ///
1208    /// c      | "string_0".."string_99"        | 1         | 0
1209    /// c      | "string_100".."string_199"     | 2         | 0
1210    /// c      | "string_200".."string_299"     | 1         | 1
1211    /// c      | "string_300".."string_399"     | 2         | 1
1212    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1213        let input_batch = &TEST_BATCH;
1214        let mut output = Vec::new();
1215
1216        let writer_options = WriterProperties::builder()
1217            .set_max_row_group_row_count(Some(200))
1218            .set_data_page_row_count_limit(100)
1219            .build();
1220        let mut writer =
1221            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1222
1223        // since the limits are only enforced on batch boundaries, write the input
1224        // batch in chunks of 50
1225        let mut row_remain = input_batch.num_rows();
1226        while row_remain > 0 {
1227            let chunk_size = row_remain.min(50);
1228            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1229            writer.write(&chunk).unwrap();
1230            row_remain -= chunk_size;
1231        }
1232        writer.close().unwrap();
1233        Bytes::from(output)
1234    });
1235
1236    /// Return the length of [`TEST_FILE_DATA`], in bytes
1237    fn test_file_len() -> u64 {
1238        TEST_FILE_DATA.len() as u64
1239    }
1240
1241    /// Return a range that covers the entire [`TEST_FILE_DATA`]
1242    fn test_file_range() -> Range<u64> {
1243        0..test_file_len()
1244    }
1245
1246    /// Return a slice of the test file data from the given range
1247    pub fn test_file_slice(range: Range<u64>) -> Bytes {
1248        let start: usize = range.start.try_into().unwrap();
1249        let end: usize = range.end.try_into().unwrap();
1250        TEST_FILE_DATA.slice(start..end)
1251    }
1252
1253    /// return the metadata for the test file
1254    pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1255        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1256        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1257        let metadata = metadata_decoder.try_decode().unwrap();
1258        let DecodeResult::Data(metadata) = metadata else {
1259            panic!("Expected metadata to be decoded successfully");
1260        };
1261        Arc::new(metadata)
1262    }
1263
1264    /// Push the given ranges to the metadata decoder, simulating reading from a file
1265    fn push_ranges_to_metadata_decoder(
1266        metadata_decoder: &mut ParquetMetaDataPushDecoder,
1267        ranges: Vec<Range<u64>>,
1268    ) {
1269        let data = ranges
1270            .iter()
1271            .map(|range| test_file_slice(range.clone()))
1272            .collect::<Vec<_>>();
1273        metadata_decoder.push_ranges(ranges, data).unwrap();
1274    }
1275
1276    fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1277        let data = ranges
1278            .iter()
1279            .map(|range| test_file_slice(range.clone()))
1280            .collect::<Vec<_>>();
1281        decoder.push_ranges(ranges, data).unwrap();
1282    }
1283
1284    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1285    fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1286        match result.expect("Expected Ok(DecodeResult::Data(T))") {
1287            DecodeResult::Data(data) => data,
1288            result => panic!("Expected DecodeResult::Data, got {result:?}"),
1289        }
1290    }
1291
1292    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1293    fn expect_needs_data<T: Debug>(
1294        result: Result<DecodeResult<T>, ParquetError>,
1295    ) -> Vec<Range<u64>> {
1296        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1297            DecodeResult::NeedsData(ranges) => ranges,
1298            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1299        }
1300    }
1301
1302    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1303        match result.expect("Expected Ok(DecodeResult::Finished)") {
1304            DecodeResult::Finished => {}
1305            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1306        }
1307    }
1308}