parquet/arrow/push_decoder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::RowGroupReaderBuilder;
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// #   let mut buffer = vec![];
64/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// #   writer.write(&batch).unwrap();
67/// #   writer.close().unwrap();
68/// #   Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// #    let start = range.start as usize;
73/// #     let end = range.end as usize;
74/// #    file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83///     ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84///       .unwrap()
85///       // Optionally configure the decoder, e.g. batch size
86///       .with_batch_size(1024)
87///       // Build the decoder
88///       .build()
89///       .unwrap();
90///
91///     // In a loop, ask the decoder what it needs next, and provide it with the required data
92///     loop {
93///         match decoder.try_decode().unwrap() {
94///             DecodeResult::NeedsData(ranges) => {
95///                 // The decoder needs more data. Fetch the data for the given ranges
96///                 let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97///                 // Push the data to the decoder
98///                 decoder.push_ranges(ranges, data).unwrap();
99///                 // After pushing the data, we can try to decode again on the next iteration
100///             }
101///             DecodeResult::Data(batch) => {
102///                 // Successfully decoded a batch of data
103///                 assert!(batch.num_rows() > 0);
104///             }
105///             DecodeResult::Finished => {
106///                 // The decoder has finished decoding exit the loop
107///                 break;
108///             }
109///         }
110///     }
111/// ```
112pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
113
114/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
115///
116/// There is no "input" for the push decoder by design (the idea is that
117/// the caller pushes data to the decoder as needed)..
118///
119/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
120/// which DO have an `input`. To support reusing the same builder code for
121/// all three types of decoders, we define this `NoInput` for the push decoder to
122/// denote in the type system there is no type.
123#[derive(Debug, Clone, Copy)]
124pub struct NoInput;
125
126/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
127/// more options that can be configured.
128impl ParquetPushDecoderBuilder {
129    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
130    ///
131    /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
132    ///
133    /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
134    ///
135    /// See example on [`ParquetPushDecoderBuilder`]
136    pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
137        Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
138    }
139
140    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
141    /// with the given reader options.
142    ///
143    /// This is similar to [`Self::try_new_decoder`] but allows configuring
144    /// options such as Arrow schema
145    pub fn try_new_decoder_with_options(
146        parquet_metadata: Arc<ParquetMetaData>,
147        arrow_reader_options: ArrowReaderOptions,
148    ) -> Result<Self, ParquetError> {
149        let arrow_reader_metadata =
150            ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
151        Ok(Self::new_with_metadata(arrow_reader_metadata))
152    }
153
154    /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
155    ///
156    /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
157    /// the Parquet metadata and reader options.
158    pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
159        Self::new_builder(NoInput, arrow_reader_metadata)
160    }
161
162    /// Create a [`ParquetPushDecoder`] with the configured options
163    pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
164        let Self {
165            input: NoInput,
166            metadata: parquet_metadata,
167            schema: _,
168            fields,
169            batch_size,
170            row_groups,
171            projection,
172            filter,
173            selection,
174            limit,
175            offset,
176            metrics,
177            row_selection_policy,
178            max_predicate_cache_size,
179        } = self;
180
181        // If no row groups were specified, read all of them
182        let row_groups =
183            row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
184
185        // Prepare to build RowGroup readers
186        let file_len = 0; // not used in push decoder
187        let buffers = PushBuffers::new(file_len);
188        let row_group_reader_builder = RowGroupReaderBuilder::new(
189            batch_size,
190            projection,
191            Arc::clone(&parquet_metadata),
192            fields,
193            filter,
194            limit,
195            offset,
196            metrics,
197            max_predicate_cache_size,
198            buffers,
199            row_selection_policy,
200        );
201
202        // Initialize the decoder with the configured options
203        let remaining_row_groups = RemainingRowGroups::new(
204            parquet_metadata,
205            row_groups,
206            selection,
207            row_group_reader_builder,
208        );
209
210        Ok(ParquetPushDecoder {
211            state: ParquetDecoderState::ReadingRowGroup {
212                remaining_row_groups: Box::new(remaining_row_groups),
213            },
214        })
215    }
216}
217
218/// A push based Parquet Decoder
219///
220/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
221///
222/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
223/// underlying reader for performing IO, and thus offers fine grained control
224/// over how data is fetched and decoded.
225///
226/// When more data is needed to make progress, instead of reading data directly
227/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
228/// are needed. Once the caller provides the requested ranges via
229/// [`Self::push_ranges`], they try to decode again by calling
230/// [`Self::try_decode`].
231///
232/// The decoder's internal state tracks what has been already decoded and what
233/// is needed next.
234#[derive(Debug)]
235pub struct ParquetPushDecoder {
236    /// The inner state.
237    ///
238    /// This state is consumed on every transition and a new state is produced
239    /// so the Rust compiler can ensure that the state is always valid and
240    /// transitions are not missed.
241    state: ParquetDecoderState,
242}
243
244impl ParquetPushDecoder {
245    /// Attempt to decode the next batch of data, or return what data is needed
246    ///
247    /// The the decoder communicates the next state with a [`DecodeResult`]
248    ///
249    /// See full example in [`ParquetPushDecoderBuilder`]
250    ///
251    /// ```no_run
252    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
253    /// use parquet::DecodeResult;
254    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
255    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
256    /// let mut decoder = get_decoder();
257    /// loop {
258    ///    match decoder.try_decode().unwrap() {
259    ///       DecodeResult::NeedsData(ranges) => {
260    ///         // The decoder needs more data. Fetch the data for the given ranges
261    ///         // call decoder.push_ranges(ranges, data) and call again
262    ///         push_data(&mut decoder, ranges);
263    ///       }
264    ///       DecodeResult::Data(batch) => {
265    ///         // Successfully decoded the next batch of data
266    ///         println!("Got batch with {} rows", batch.num_rows());
267    ///       }
268    ///       DecodeResult::Finished => {
269    ///         // The decoder has finished decoding all data
270    ///         break;
271    ///       }
272    ///    }
273    /// }
274    ///```
275    pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
276        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
277        let (new_state, decode_result) = current_state.try_next_batch()?;
278        self.state = new_state;
279        Ok(decode_result)
280    }
281
282    /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
283    /// return what data is needed to produce it.
284    ///
285    /// This API can be used to get a reader for decoding the next set of
286    /// RecordBatches while proceeding to begin fetching data for the set (e.g
287    /// row group)
288    ///
289    /// Example
290    /// ```no_run
291    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
292    /// use parquet::DecodeResult;
293    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
294    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
295    /// let mut decoder = get_decoder();
296    /// loop {
297    ///    match decoder.try_next_reader().unwrap() {
298    ///       DecodeResult::NeedsData(ranges) => {
299    ///         // The decoder needs more data. Fetch the data for the given ranges
300    ///         // call decoder.push_ranges(ranges, data) and call again
301    ///         push_data(&mut decoder, ranges);
302    ///       }
303    ///       DecodeResult::Data(reader) => {
304    ///          // spawn a thread to read the batches in parallel
305    ///          // with fetching the next row group / data
306    ///          std::thread::spawn(move || {
307    ///            for batch in reader {
308    ///              let batch = batch.unwrap();
309    ///              println!("Got batch with {} rows", batch.num_rows());
310    ///            }
311    ///         });
312    ///       }
313    ///       DecodeResult::Finished => {
314    ///         // The decoder has finished decoding all data
315    ///         break;
316    ///       }
317    ///    }
318    /// }
319    ///```
320    pub fn try_next_reader(
321        &mut self,
322    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
323        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
324        let (new_state, decode_result) = current_state.try_next_reader()?;
325        self.state = new_state;
326        Ok(decode_result)
327    }
328
329    /// Push data into the decoder for processing
330    ///
331    /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
332    /// single range of data.
333    ///
334    /// Note this can be the entire file or just a part of it. If it is part of the file,
335    /// the ranges should correspond to the data ranges requested by the decoder.
336    ///
337    /// See example in [`ParquetPushDecoderBuilder`]
338    pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
339        self.push_ranges(vec![range], vec![data])
340    }
341
342    /// Push data into the decoder for processing
343    ///
344    /// This should correspond to the data ranges requested by the decoder
345    pub fn push_ranges(
346        &mut self,
347        ranges: Vec<Range<u64>>,
348        data: Vec<Bytes>,
349    ) -> Result<(), ParquetError> {
350        let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
351        self.state = current_state.push_data(ranges, data)?;
352        Ok(())
353    }
354
355    /// Returns the total number of buffered bytes in the decoder
356    ///
357    /// This is the sum of the size of all [`Bytes`] that has been pushed to the
358    /// decoder but not yet consumed.
359    ///
360    /// Note that this does not include any overhead of the internal data
361    /// structures and that since [`Bytes`] are ref counted memory, this may not
362    /// reflect additional memory usage.
363    ///
364    /// This can be used to monitor memory usage of the decoder.
365    pub fn buffered_bytes(&self) -> u64 {
366        self.state.buffered_bytes()
367    }
368}
369
370/// Internal state machine for the [`ParquetPushDecoder`]
371#[derive(Debug)]
372enum ParquetDecoderState {
373    /// Waiting for data needed to decode the next RowGroup
374    ReadingRowGroup {
375        remaining_row_groups: Box<RemainingRowGroups>,
376    },
377    /// The decoder is actively decoding a RowGroup
378    DecodingRowGroup {
379        /// Current active reader
380        record_batch_reader: Box<ParquetRecordBatchReader>,
381        remaining_row_groups: Box<RemainingRowGroups>,
382    },
383    /// The decoder has finished processing all data
384    Finished,
385}
386
387impl ParquetDecoderState {
388    /// If actively reading a RowGroup, return the currently active
389    /// ParquetRecordBatchReader and advance to the next group.
390    fn try_next_reader(
391        self,
392    ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
393        let mut current_state = self;
394        loop {
395            let (next_state, decode_result) = current_state.transition()?;
396            // if more data is needed to transition, can't proceed further without it
397            match decode_result {
398                DecodeResult::NeedsData(ranges) => {
399                    return Ok((next_state, DecodeResult::NeedsData(ranges)));
400                }
401                // act next based on state
402                DecodeResult::Data(()) | DecodeResult::Finished => {}
403            }
404            match next_state {
405                // not ready to read yet, continue transitioning
406                Self::ReadingRowGroup { .. } => current_state = next_state,
407                // have a reader ready, so return it and set ourself to ReadingRowGroup
408                Self::DecodingRowGroup {
409                    record_batch_reader,
410                    remaining_row_groups,
411                } => {
412                    let result = DecodeResult::Data(*record_batch_reader);
413                    let next_state = Self::ReadingRowGroup {
414                        remaining_row_groups,
415                    };
416                    return Ok((next_state, result));
417                }
418                Self::Finished => {
419                    return Ok((Self::Finished, DecodeResult::Finished));
420                }
421            }
422        }
423    }
424
425    /// Current state --> next state + output
426    ///
427    /// This function is called to get the next RecordBatch
428    ///
429    /// This structure is used to reduce the indentation level of the main loop
430    /// in try_build
431    fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
432        let mut current_state = self;
433        loop {
434            let (new_state, decode_result) = current_state.transition()?;
435            // if more data is needed to transition, can't proceed further without it
436            match decode_result {
437                DecodeResult::NeedsData(ranges) => {
438                    return Ok((new_state, DecodeResult::NeedsData(ranges)));
439                }
440                // act next based on state
441                DecodeResult::Data(()) | DecodeResult::Finished => {}
442            }
443            match new_state {
444                // not ready to read yet, continue transitioning
445                Self::ReadingRowGroup { .. } => current_state = new_state,
446                // have a reader ready, so decode the next batch
447                Self::DecodingRowGroup {
448                    mut record_batch_reader,
449                    remaining_row_groups,
450                } => {
451                    match record_batch_reader.next() {
452                        // Successfully decoded a batch, return it
453                        Some(Ok(batch)) => {
454                            let result = DecodeResult::Data(batch);
455                            let next_state = Self::DecodingRowGroup {
456                                record_batch_reader,
457                                remaining_row_groups,
458                            };
459                            return Ok((next_state, result));
460                        }
461                        // No more batches in this row group, move to the next row group
462                        None => {
463                            current_state = Self::ReadingRowGroup {
464                                remaining_row_groups,
465                            }
466                        }
467                        // some error occurred while decoding, so return that
468                        Some(Err(e)) => {
469                            // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
470                            return Err(ParquetError::ArrowError(e.to_string()));
471                        }
472                    }
473                }
474                Self::Finished => {
475                    return Ok((Self::Finished, DecodeResult::Finished));
476                }
477            }
478        }
479    }
480
481    /// Transition to the next state with a reader (data can be produced), if not end of stream
482    ///
483    /// This function is called in a loop until the decoder is ready to return
484    /// data (has the required pages buffered) or is finished.
485    fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
486        // result returned when there is data ready
487        let data_ready = DecodeResult::Data(());
488        match self {
489            Self::ReadingRowGroup {
490                mut remaining_row_groups,
491            } => {
492                match remaining_row_groups.try_next_reader()? {
493                    // If we have a next reader, we can transition to decoding it
494                    DecodeResult::Data(record_batch_reader) => {
495                        // Transition to decoding the row group
496                        Ok((
497                            Self::DecodingRowGroup {
498                                record_batch_reader: Box::new(record_batch_reader),
499                                remaining_row_groups,
500                            },
501                            data_ready,
502                        ))
503                    }
504                    DecodeResult::NeedsData(ranges) => {
505                        // If we need more data, we return the ranges needed and stay in Reading
506                        // RowGroup state
507                        Ok((
508                            Self::ReadingRowGroup {
509                                remaining_row_groups,
510                            },
511                            DecodeResult::NeedsData(ranges),
512                        ))
513                    }
514                    // If there are no more readers, we are finished
515                    DecodeResult::Finished => {
516                        // No more row groups to read, we are finished
517                        Ok((Self::Finished, DecodeResult::Finished))
518                    }
519                }
520            }
521            // if we are already in DecodingRowGroup, just return data ready
522            Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
523            // if finished, just return finished
524            Self::Finished => Ok((self, DecodeResult::Finished)),
525        }
526    }
527
528    /// Push data, and transition state if needed
529    ///
530    /// This should correspond to the data ranges requested by the decoder
531    pub fn push_data(
532        self,
533        ranges: Vec<Range<u64>>,
534        data: Vec<Bytes>,
535    ) -> Result<Self, ParquetError> {
536        match self {
537            ParquetDecoderState::ReadingRowGroup {
538                mut remaining_row_groups,
539            } => {
540                // Push data to the RowGroupReaderBuilder
541                remaining_row_groups.push_data(ranges, data);
542                Ok(ParquetDecoderState::ReadingRowGroup {
543                    remaining_row_groups,
544                })
545            }
546            // it is ok to get data before we asked for it
547            ParquetDecoderState::DecodingRowGroup {
548                record_batch_reader,
549                mut remaining_row_groups,
550            } => {
551                remaining_row_groups.push_data(ranges, data);
552                Ok(ParquetDecoderState::DecodingRowGroup {
553                    record_batch_reader,
554                    remaining_row_groups,
555                })
556            }
557            ParquetDecoderState::Finished => Err(ParquetError::General(
558                "Cannot push data to a finished decoder".to_string(),
559            )),
560        }
561    }
562
563    /// How many bytes are currently buffered in the decoder?
564    fn buffered_bytes(&self) -> u64 {
565        match self {
566            ParquetDecoderState::ReadingRowGroup {
567                remaining_row_groups,
568            } => remaining_row_groups.buffered_bytes(),
569            ParquetDecoderState::DecodingRowGroup {
570                record_batch_reader: _,
571                remaining_row_groups,
572            } => remaining_row_groups.buffered_bytes(),
573            ParquetDecoderState::Finished => 0,
574        }
575    }
576}
577
578#[cfg(test)]
579mod test {
580    use super::*;
581    use crate::DecodeResult;
582    use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
583    use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
584    use crate::arrow::{ArrowWriter, ProjectionMask};
585    use crate::errors::ParquetError;
586    use crate::file::metadata::ParquetMetaDataPushDecoder;
587    use crate::file::properties::WriterProperties;
588    use arrow::compute::kernels::cmp::{gt, lt};
589    use arrow_array::cast::AsArray;
590    use arrow_array::types::Int64Type;
591    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
592    use arrow_select::concat::concat_batches;
593    use bytes::Bytes;
594    use std::fmt::Debug;
595    use std::ops::Range;
596    use std::sync::{Arc, LazyLock};
597
598    /// Test decoder struct size (as they are copied around on each transition, they
599    /// should not grow too large)
600    #[test]
601    fn test_decoder_size() {
602        assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
603    }
604
605    /// Decode the entire file at once, simulating a scenario where all data is
606    /// available in memory
607    #[test]
608    fn test_decoder_all_data() {
609        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
610            .unwrap()
611            .build()
612            .unwrap();
613
614        decoder
615            .push_range(test_file_range(), TEST_FILE_DATA.clone())
616            .unwrap();
617
618        let results = vec![
619            // first row group should be decoded without needing more data
620            expect_data(decoder.try_decode()),
621            // second row group should be decoded without needing more data
622            expect_data(decoder.try_decode()),
623        ];
624        expect_finished(decoder.try_decode());
625
626        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
627        // Check that the output matches the input batch
628        assert_eq!(all_output, *TEST_BATCH);
629    }
630
631    /// Decode the entire file incrementally, simulating a scenario where data is
632    /// fetched as needed
633    #[test]
634    fn test_decoder_incremental() {
635        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
636            .unwrap()
637            .build()
638            .unwrap();
639
640        let mut results = vec![];
641
642        // First row group, expect a single request
643        let ranges = expect_needs_data(decoder.try_decode());
644        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
645        push_ranges_to_decoder(&mut decoder, ranges);
646        // The decoder should currently only store the data it needs to decode the first row group
647        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
648        results.push(expect_data(decoder.try_decode()));
649        // the decoder should have consumed the data for the first row group and freed it
650        assert_eq!(decoder.buffered_bytes(), 0);
651
652        // Second row group,
653        let ranges = expect_needs_data(decoder.try_decode());
654        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
655        push_ranges_to_decoder(&mut decoder, ranges);
656        // The decoder should currently only store the data it needs to decode the second row group
657        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
658        results.push(expect_data(decoder.try_decode()));
659        // the decoder should have consumed the data for the second row group and freed it
660        assert_eq!(decoder.buffered_bytes(), 0);
661        expect_finished(decoder.try_decode());
662
663        // Check that the output matches the input batch
664        let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
665        assert_eq!(all_output, *TEST_BATCH);
666    }
667
668    /// Decode the entire file incrementally, simulating partial reads
669    #[test]
670    fn test_decoder_partial() {
671        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
672            .unwrap()
673            .build()
674            .unwrap();
675
676        // First row group, expect a single request for all data needed to read "a" and "b"
677        let ranges = expect_needs_data(decoder.try_decode());
678        push_ranges_to_decoder(&mut decoder, ranges);
679
680        let batch1 = expect_data(decoder.try_decode());
681        let expected1 = TEST_BATCH.slice(0, 200);
682        assert_eq!(batch1, expected1);
683
684        // Second row group, this time provide the data in two steps
685        let ranges = expect_needs_data(decoder.try_decode());
686        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
687        assert!(!ranges1.is_empty());
688        assert!(!ranges2.is_empty());
689        // push first half to simulate partial read
690        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
691
692        // still expect more data
693        let ranges = expect_needs_data(decoder.try_decode());
694        assert_eq!(ranges, ranges2); // should be the remaining ranges
695        // push empty ranges should be a no-op
696        push_ranges_to_decoder(&mut decoder, vec![]);
697        let ranges = expect_needs_data(decoder.try_decode());
698        assert_eq!(ranges, ranges2); // should be the remaining ranges
699        push_ranges_to_decoder(&mut decoder, ranges);
700
701        let batch2 = expect_data(decoder.try_decode());
702        let expected2 = TEST_BATCH.slice(200, 200);
703        assert_eq!(batch2, expected2);
704
705        expect_finished(decoder.try_decode());
706    }
707
708    /// Decode multiple columns "a" and "b", expect that the decoder requests
709    /// only a single request per row group
710    #[test]
711    fn test_decoder_selection_does_one_request() {
712        let builder =
713            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
714
715        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
716
717        let mut decoder = builder
718            .with_projection(
719                ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
720            )
721            .build()
722            .unwrap();
723
724        // First row group, expect a single request for all data needed to read "a" and "b"
725        let ranges = expect_needs_data(decoder.try_decode());
726        push_ranges_to_decoder(&mut decoder, ranges);
727
728        let batch1 = expect_data(decoder.try_decode());
729        let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
730        assert_eq!(batch1, expected1);
731
732        // Second row group, similarly expect a single request for all data needed to read "a" and "b"
733        let ranges = expect_needs_data(decoder.try_decode());
734        push_ranges_to_decoder(&mut decoder, ranges);
735
736        let batch2 = expect_data(decoder.try_decode());
737        let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
738        assert_eq!(batch2, expected2);
739
740        expect_finished(decoder.try_decode());
741    }
742
743    /// Decode with a filter that requires multiple requests, but only provide part
744    /// of the data needed for the filter at a time simulating partial reads.
745    #[test]
746    fn test_decoder_single_filter_partial() {
747        let builder =
748            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
749
750        // Values in column "a" range 0..399
751        // First filter: "a" > 250  (nothing in Row Group 0, both data pages in Row Group 1)
752        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
753
754        // a > 250
755        let row_filter_a = ArrowPredicateFn::new(
756            // claim to use both a and b so we get two ranges requests for the filter pages
757            ProjectionMask::columns(&schema_descr, ["a", "b"]),
758            |batch: RecordBatch| {
759                let scalar_250 = Int64Array::new_scalar(250);
760                let column = batch.column(0).as_primitive::<Int64Type>();
761                gt(column, &scalar_250)
762            },
763        );
764
765        let mut decoder = builder
766            .with_projection(
767                // read only column "a" to test that filter pages are reused
768                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
769            )
770            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
771            .build()
772            .unwrap();
773
774        // First row group, evaluating filters
775        let ranges = expect_needs_data(decoder.try_decode());
776        // only provide half the ranges
777        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
778        assert!(!ranges1.is_empty());
779        assert!(!ranges2.is_empty());
780        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
781        // still expect more data
782        let ranges = expect_needs_data(decoder.try_decode());
783        assert_eq!(ranges, ranges2); // should be the remaining ranges
784        let ranges = expect_needs_data(decoder.try_decode());
785        assert_eq!(ranges, ranges2); // should be the remaining ranges
786        push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
787
788        // Since no rows in the first row group pass the filters, there is no
789        // additional requests to read data pages for "b" here
790
791        // Second row group
792        let ranges = expect_needs_data(decoder.try_decode());
793        push_ranges_to_decoder(&mut decoder, ranges);
794
795        let batch = expect_data(decoder.try_decode());
796        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
797        assert_eq!(batch, expected);
798
799        expect_finished(decoder.try_decode());
800    }
801
802    /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
803    #[test]
804    fn test_decoder_single_filter_and_row_selection() {
805        let builder =
806            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
807
808        // Values in column "a" range 0..399
809        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
810        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
811
812        // a > 250
813        let row_filter_a = ArrowPredicateFn::new(
814            ProjectionMask::columns(&schema_descr, ["a"]),
815            |batch: RecordBatch| {
816                let scalar_250 = Int64Array::new_scalar(250);
817                let column = batch.column(0).as_primitive::<Int64Type>();
818                gt(column, &scalar_250)
819            },
820        );
821
822        let mut decoder = builder
823            .with_projection(
824                // read only column "a" to test that filter pages are reused
825                ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
826            )
827            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
828            .with_row_selection(RowSelection::from(vec![
829                RowSelector::skip(200),   // skip first row group
830                RowSelector::select(100), // first 100 rows of second row group
831                RowSelector::skip(100),
832            ]))
833            .build()
834            .unwrap();
835
836        // expect the first row group to be filtered out (no filter is evaluated due to row selection)
837
838        // First row group, first filter (a > 250)
839        let ranges = expect_needs_data(decoder.try_decode());
840        push_ranges_to_decoder(&mut decoder, ranges);
841
842        // Second row group
843        let ranges = expect_needs_data(decoder.try_decode());
844        push_ranges_to_decoder(&mut decoder, ranges);
845
846        let batch = expect_data(decoder.try_decode());
847        let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
848        assert_eq!(batch, expected);
849
850        expect_finished(decoder.try_decode());
851    }
852
853    /// Decode with multiple filters that require multiple requests
854    #[test]
855    fn test_decoder_multi_filters() {
856        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
857        let builder =
858            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
859
860        // Values in column "a" range 0..399
861        // Values in column "b" range 400..799
862        // First filter: "a" > 175  (last data page in Row Group 0)
863        // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
864        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
865
866        // a > 175
867        let row_filter_a = ArrowPredicateFn::new(
868            ProjectionMask::columns(&schema_descr, ["a"]),
869            |batch: RecordBatch| {
870                let scalar_175 = Int64Array::new_scalar(175);
871                let column = batch.column(0).as_primitive::<Int64Type>();
872                gt(column, &scalar_175)
873            },
874        );
875
876        // b < 625
877        let row_filter_b = ArrowPredicateFn::new(
878            ProjectionMask::columns(&schema_descr, ["b"]),
879            |batch: RecordBatch| {
880                let scalar_625 = Int64Array::new_scalar(625);
881                let column = batch.column(0).as_primitive::<Int64Type>();
882                lt(column, &scalar_625)
883            },
884        );
885
886        let mut decoder = builder
887            .with_projection(
888                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
889            )
890            .with_row_filter(RowFilter::new(vec![
891                Box::new(row_filter_a),
892                Box::new(row_filter_b),
893            ]))
894            .build()
895            .unwrap();
896
897        // First row group, first filter (a > 175)
898        let ranges = expect_needs_data(decoder.try_decode());
899        push_ranges_to_decoder(&mut decoder, ranges);
900
901        // first row group, second filter (b < 625)
902        let ranges = expect_needs_data(decoder.try_decode());
903        push_ranges_to_decoder(&mut decoder, ranges);
904
905        // first row group, data pages for "c"
906        let ranges = expect_needs_data(decoder.try_decode());
907        push_ranges_to_decoder(&mut decoder, ranges);
908
909        // expect the first batch to be decoded: rows 176..199, column "c"
910        let batch1 = expect_data(decoder.try_decode());
911        let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
912        assert_eq!(batch1, expected1);
913
914        // Second row group, first filter (a > 175)
915        let ranges = expect_needs_data(decoder.try_decode());
916        push_ranges_to_decoder(&mut decoder, ranges);
917
918        // Second row group, second filter (b < 625)
919        let ranges = expect_needs_data(decoder.try_decode());
920        push_ranges_to_decoder(&mut decoder, ranges);
921
922        // Second row group, data pages for "c"
923        let ranges = expect_needs_data(decoder.try_decode());
924        push_ranges_to_decoder(&mut decoder, ranges);
925
926        // expect the second batch to be decoded: rows 200..224, column "c"
927        let batch2 = expect_data(decoder.try_decode());
928        let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
929        assert_eq!(batch2, expected2);
930
931        expect_finished(decoder.try_decode());
932    }
933
934    /// Decode with a filter that uses a column that is also projected, and expect
935    /// that the filter pages are reused (don't refetch them)
936    #[test]
937    fn test_decoder_reuses_filter_pages() {
938        // Create a decoder for decoding parquet data (note it does not have any IO / readers)
939        let builder =
940            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
941
942        // Values in column "a" range 0..399
943        // First filter: "a" > 250  (nothing in Row Group 0, last data page in Row Group 1)
944        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
945
946        // a > 250
947        let row_filter_a = ArrowPredicateFn::new(
948            ProjectionMask::columns(&schema_descr, ["a"]),
949            |batch: RecordBatch| {
950                let scalar_250 = Int64Array::new_scalar(250);
951                let column = batch.column(0).as_primitive::<Int64Type>();
952                gt(column, &scalar_250)
953            },
954        );
955
956        let mut decoder = builder
957            .with_projection(
958                // read only column "a" to test that filter pages are reused
959                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
960            )
961            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
962            .build()
963            .unwrap();
964
965        // First row group, first filter (a > 175)
966        let ranges = expect_needs_data(decoder.try_decode());
967        push_ranges_to_decoder(&mut decoder, ranges);
968
969        // expect the first row group to be filtered out (no rows match)
970
971        // Second row group, first filter (a > 250)
972        let ranges = expect_needs_data(decoder.try_decode());
973        push_ranges_to_decoder(&mut decoder, ranges);
974
975        // expect that the second row group is decoded: rows 251..399, column "a"
976        // Note that the filter pages for "a" should be reused and no additional data
977        // should be requested
978        let batch = expect_data(decoder.try_decode());
979        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
980        assert_eq!(batch, expected);
981
982        expect_finished(decoder.try_decode());
983    }
984
985    #[test]
986    fn test_decoder_empty_filters() {
987        let builder =
988            ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
989        let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
990
991        // only read column "c", but with empty filters
992        let mut decoder = builder
993            .with_projection(
994                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
995            )
996            .with_row_filter(RowFilter::new(vec![
997                // empty filters should be ignored
998            ]))
999            .build()
1000            .unwrap();
1001
1002        // First row group
1003        let ranges = expect_needs_data(decoder.try_decode());
1004        push_ranges_to_decoder(&mut decoder, ranges);
1005
1006        // expect the first batch to be decoded: rows 0..199, column "c"
1007        let batch1 = expect_data(decoder.try_decode());
1008        let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1009        assert_eq!(batch1, expected1);
1010
1011        // Second row group,
1012        let ranges = expect_needs_data(decoder.try_decode());
1013        push_ranges_to_decoder(&mut decoder, ranges);
1014
1015        // expect the second batch to be decoded: rows 200..399, column "c"
1016        let batch2 = expect_data(decoder.try_decode());
1017        let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1018
1019        assert_eq!(batch2, expected2);
1020
1021        expect_finished(decoder.try_decode());
1022    }
1023
1024    #[test]
1025    fn test_decoder_offset_limit() {
1026        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1027            .unwrap()
1028            // skip entire first row group (200 rows) and first 25 rows of second row group
1029            .with_offset(225)
1030            // and limit to 20 rows
1031            .with_limit(20)
1032            .build()
1033            .unwrap();
1034
1035        // First row group should be skipped,
1036
1037        // Second row group
1038        let ranges = expect_needs_data(decoder.try_decode());
1039        push_ranges_to_decoder(&mut decoder, ranges);
1040
1041        // expect the first and only batch to be decoded
1042        let batch1 = expect_data(decoder.try_decode());
1043        let expected1 = TEST_BATCH.slice(225, 20);
1044        assert_eq!(batch1, expected1);
1045
1046        expect_finished(decoder.try_decode());
1047    }
1048
1049    #[test]
1050    fn test_decoder_row_group_selection() {
1051        // take only the second row group
1052        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1053            .unwrap()
1054            .with_row_groups(vec![1])
1055            .build()
1056            .unwrap();
1057
1058        // First row group should be skipped,
1059
1060        // Second row group
1061        let ranges = expect_needs_data(decoder.try_decode());
1062        push_ranges_to_decoder(&mut decoder, ranges);
1063
1064        // expect the first and only batch to be decoded
1065        let batch1 = expect_data(decoder.try_decode());
1066        let expected1 = TEST_BATCH.slice(200, 200);
1067        assert_eq!(batch1, expected1);
1068
1069        expect_finished(decoder.try_decode());
1070    }
1071
1072    #[test]
1073    fn test_decoder_row_selection() {
1074        // take only the second row group
1075        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1076            .unwrap()
1077            .with_row_selection(RowSelection::from(vec![
1078                RowSelector::skip(225),  // skip first row group and 25 rows of second])
1079                RowSelector::select(20), // take 20 rows
1080            ]))
1081            .build()
1082            .unwrap();
1083
1084        // First row group should be skipped,
1085
1086        // Second row group
1087        let ranges = expect_needs_data(decoder.try_decode());
1088        push_ranges_to_decoder(&mut decoder, ranges);
1089
1090        // expect the first ane only batch to be decoded
1091        let batch1 = expect_data(decoder.try_decode());
1092        let expected1 = TEST_BATCH.slice(225, 20);
1093        assert_eq!(batch1, expected1);
1094
1095        expect_finished(decoder.try_decode());
1096    }
1097
1098    /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1099    ///
1100    /// Note c is a different types (so the data page sizes will be different)
1101    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1102        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1103        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1104        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1105            if i % 2 == 0 {
1106                format!("string_{i}")
1107            } else {
1108                format!("A string larger than 12 bytes and thus not inlined {i}")
1109            }
1110        })));
1111
1112        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1113    });
1114
1115    /// Create a parquet file in memory for testing.
1116    ///
1117    /// See [`TEST_BATCH`] for the data in the file.
1118    ///
1119    /// Each column is written in 4 data pages, each with 100 rows, across 2
1120    /// row groups. Each column in each row group has two data pages.
1121    ///
1122    /// The data is split across row groups like this
1123    ///
1124    /// Column |   Values                | Data Page | Row Group
1125    /// -------|------------------------|-----------|-----------
1126    /// a      | 0..99                  | 1         | 0
1127    /// a      | 100..199               | 2         | 0
1128    /// a      | 200..299               | 1         | 1
1129    /// a      | 300..399               | 2         | 1
1130    ///
1131    /// b      | 400..499               | 1         | 0
1132    /// b      | 500..599               | 2         | 0
1133    /// b      | 600..699               | 1         | 1
1134    /// b      | 700..799               | 2         | 1
1135    ///
1136    /// c      | "string_0".."string_99"        | 1         | 0
1137    /// c      | "string_100".."string_199"     | 2         | 0
1138    /// c      | "string_200".."string_299"     | 1         | 1
1139    /// c      | "string_300".."string_399"     | 2         | 1
1140    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1141        let input_batch = &TEST_BATCH;
1142        let mut output = Vec::new();
1143
1144        let writer_options = WriterProperties::builder()
1145            .set_max_row_group_size(200)
1146            .set_data_page_row_count_limit(100)
1147            .build();
1148        let mut writer =
1149            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1150
1151        // since the limits are only enforced on batch boundaries, write the input
1152        // batch in chunks of 50
1153        let mut row_remain = input_batch.num_rows();
1154        while row_remain > 0 {
1155            let chunk_size = row_remain.min(50);
1156            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1157            writer.write(&chunk).unwrap();
1158            row_remain -= chunk_size;
1159        }
1160        writer.close().unwrap();
1161        Bytes::from(output)
1162    });
1163
1164    /// Return the length of [`TEST_FILE_DATA`], in bytes
1165    fn test_file_len() -> u64 {
1166        TEST_FILE_DATA.len() as u64
1167    }
1168
1169    /// Return a range that covers the entire [`TEST_FILE_DATA`]
1170    fn test_file_range() -> Range<u64> {
1171        0..test_file_len()
1172    }
1173
1174    /// Return a slice of the test file data from the given range
1175    pub fn test_file_slice(range: Range<u64>) -> Bytes {
1176        let start: usize = range.start.try_into().unwrap();
1177        let end: usize = range.end.try_into().unwrap();
1178        TEST_FILE_DATA.slice(start..end)
1179    }
1180
1181    /// return the metadata for the test file
1182    pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1183        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1184        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1185        let metadata = metadata_decoder.try_decode().unwrap();
1186        let DecodeResult::Data(metadata) = metadata else {
1187            panic!("Expected metadata to be decoded successfully");
1188        };
1189        Arc::new(metadata)
1190    }
1191
1192    /// Push the given ranges to the metadata decoder, simulating reading from a file
1193    fn push_ranges_to_metadata_decoder(
1194        metadata_decoder: &mut ParquetMetaDataPushDecoder,
1195        ranges: Vec<Range<u64>>,
1196    ) {
1197        let data = ranges
1198            .iter()
1199            .map(|range| test_file_slice(range.clone()))
1200            .collect::<Vec<_>>();
1201        metadata_decoder.push_ranges(ranges, data).unwrap();
1202    }
1203
1204    fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1205        let data = ranges
1206            .iter()
1207            .map(|range| test_file_slice(range.clone()))
1208            .collect::<Vec<_>>();
1209        decoder.push_ranges(ranges, data).unwrap();
1210    }
1211
1212    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1213    fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1214        match result.expect("Expected Ok(DecodeResult::Data(T))") {
1215            DecodeResult::Data(data) => data,
1216            result => panic!("Expected DecodeResult::Data, got {result:?}"),
1217        }
1218    }
1219
1220    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1221    fn expect_needs_data<T: Debug>(
1222        result: Result<DecodeResult<T>, ParquetError>,
1223    ) -> Vec<Range<u64>> {
1224        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1225            DecodeResult::NeedsData(ranges) => ranges,
1226            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1227        }
1228    }
1229
1230    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1231        match result.expect("Expected Ok(DecodeResult::Finished)") {
1232            DecodeResult::Finished => {}
1233            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1234        }
1235    }
1236}