parquet/file/metadata/
push_decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19#[cfg(feature = "encryption")]
20use crate::encryption::decrypt::FileDecryptionProperties;
21use crate::errors::{ParquetError, Result};
22use crate::file::FOOTER_SIZE;
23use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
24use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData};
25use crate::file::page_index::index_reader::acc_range;
26use crate::file::reader::ChunkReader;
27use bytes::Bytes;
28use std::ops::Range;
29
30/// A push decoder for [`ParquetMetaData`].
31///
32/// This structure implements a push API for decoding Parquet metadata, which
33/// decouples IO from the metadata decoding logic (sometimes referred to as
34/// [Sans-IO]).
35///
36/// See [`ParquetMetaDataReader`] for a pull-based API that incorporates IO and
37/// is simpler to use for basic use cases. This decoder is best for customizing
38/// your IO operations to minimize bytes read, prefetch data, or use async IO.
39///
40/// [Sans-IO]: https://sans-io.readthedocs.io
41/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
42///
43/// # Example
44///
45/// The most basic usage is to feed the decoder with the necessary byte ranges
46/// as requested as shown below. This minimizes the number of bytes read, but
47/// requires the most IO operations - one to read the footer and then one
48/// to read the metadata, and possibly more if page indexes are requested.
49///
50#[cfg_attr(
51    feature = "arrow",
52    doc = r##"
53```rust
54# use std::ops::Range;
55# use bytes::Bytes;
56# use arrow_array::record_batch;
57# use parquet::DecodeResult;
58# use parquet::arrow::ArrowWriter;
59# use parquet::errors::ParquetError;
60# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
61#
62# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
63# let file_bytes = {
64#   let mut buffer = vec![0];
65#   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
66#   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
67#   writer.write(&batch).unwrap();
68#   writer.close().unwrap();
69#   Bytes::from(buffer)
70# };
71# // mimic IO by returning a function that returns the bytes for a given range
72# let get_range = |range: &Range<u64>| -> Bytes {
73#    let start = range.start as usize;
74#     let end = range.end as usize;
75#    file_bytes.slice(start..end)
76# };
77#
78# let file_len = file_bytes.len() as u64;
79// The `ParquetMetaDataPushDecoder` needs to know the file length.
80let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
81// try to decode the metadata. If more data is needed, the decoder will tell you what ranges
82loop {
83    match decoder.try_decode() {
84       Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
85       Ok(DecodeResult::NeedsData(ranges)) => {
86          // The decoder needs more data
87          //
88          // In this example, we call a function that returns the bytes for each given range.
89          // In a real application, you would likely read the data from a file or network.
90          let data = ranges.iter().map(|range| get_range(range)).collect();
91          // Push the data into the decoder and try to decode again on the next iteration.
92          decoder.push_ranges(ranges, data).unwrap();
93       }
94       Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
95       Err(e) => return Err(e),
96    }
97}
98# }
99```
100"##
101)]
102///
103/// # Example with "prefetching"
104///
105/// By default, the [`ParquetMetaDataPushDecoder`] will request only the exact byte
106/// ranges it needs. This minimizes the number of bytes read, however it
107/// requires at least two IO operations to read the metadata - one to read the
108/// footer and then one to read the metadata.
109///
110/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
111/// IO operations are required to read the metadata, as the page index is
112/// not part of the normal metadata footer.
113///
114/// To reduce the number of IO operations in systems with high per operation
115/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
116/// the data into the decoder before calling [`Self::try_decode`]. If you do
117/// not push enough bytes, the decoder will return the ranges that are still
118/// needed.
119///
120/// This approach can also be used when you have the entire file already in memory
121/// for other reasons.
122#[cfg_attr(
123    feature = "arrow",
124    doc = r##"
125```rust
126# use std::ops::Range;
127# use bytes::Bytes;
128# use arrow_array::record_batch;
129# use parquet::DecodeResult;
130# use parquet::arrow::ArrowWriter;
131# use parquet::errors::ParquetError;
132# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
133#
134# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
135# let file_bytes = {
136#   let mut buffer = vec![0];
137#   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
138#   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
139#   writer.write(&batch).unwrap();
140#   writer.close().unwrap();
141#   Bytes::from(buffer)
142# };
143#
144let file_len = file_bytes.len() as u64;
145// For this example, we "prefetch" all the bytes which we have in memory,
146// but in a real application, you would likely read a chunk from the end
147// for example 1MB.
148let prefetched_bytes = file_bytes.clone();
149let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
150// push the prefetched bytes into the decoder
151decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
152// The decoder will now be able to decode the metadata. Note in a real application,
153// unless you can guarantee that the pushed data is enough to decode the metadata,
154// you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
155// as shown in  the previous example
156    match decoder.try_decode() {
157        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
158        other => { panic!("expected DecodeResult::Data, got: {other:?}") }
159    }
160# }
161```
162"##
163)]
164///
165/// # Example using [`AsyncRead`]
166///
167/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can
168/// provide byte ranges, including async IO sources. However, it does not
169/// implement async IO itself. To use async IO, you simply write an async
170/// wrapper around it that reads the required byte ranges and pushes them into the
171/// decoder.
172#[cfg_attr(
173    feature = "arrow",
174    doc = r##"
175```rust
176# use std::ops::Range;
177# use bytes::Bytes;
178use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
179# use arrow_array::record_batch;
180# use parquet::DecodeResult;
181# use parquet::arrow::ArrowWriter;
182# use parquet::errors::ParquetError;
183# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
184#
185// This function decodes Parquet Metadata from anything that implements
186// [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
187async fn decode_metadata(
188  file_len: u64,
189  mut async_source: impl AsyncRead + AsyncSeek + Unpin
190) -> Result<ParquetMetaData, ParquetError> {
191  // We need a ParquetMetaDataPushDecoder to decode the metadata.
192  let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
193  loop {
194    match decoder.try_decode() {
195       Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
196       Ok(DecodeResult::NeedsData(ranges)) => {
197          // The decoder needs more data
198          //
199          // In this example we use the AsyncRead and AsyncSeek traits to read the
200          // required ranges from the async source.
201          let mut data = Vec::with_capacity(ranges.len());
202          for range in &ranges {
203            let mut buffer = vec![0; (range.end - range.start) as usize];
204            async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
205            async_source.read_exact(&mut buffer).await?;
206            data.push(Bytes::from(buffer));
207          }
208          // Push the data into the decoder and try to decode again on the next iteration.
209          decoder.push_ranges(ranges, data).unwrap();
210       }
211       Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
212       Err(e) => return Err(e),
213    }
214  }
215}
216```
217"##
218)]
219/// [`AsyncRead`]: tokio::io::AsyncRead
220#[derive(Debug)]
221pub struct ParquetMetaDataPushDecoder {
222    /// Decoding state
223    state: DecodeState,
224    /// policy for loading ColumnIndex (part of the PageIndex)
225    column_index_policy: PageIndexPolicy,
226    /// policy for loading OffsetIndex (part of the PageIndex)
227    offset_index_policy: PageIndexPolicy,
228    /// Underlying buffers
229    buffers: crate::util::push_buffers::PushBuffers,
230    /// Encryption API
231    metadata_parser: MetadataParser,
232}
233
234impl ParquetMetaDataPushDecoder {
235    /// Create a new `ParquetMetaDataPushDecoder` with the given file length.
236    ///
237    /// By default, this will read page indexes and column indexes. See
238    /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail.
239    ///
240    /// See examples on [`ParquetMetaDataPushDecoder`].
241    pub fn try_new(file_len: u64) -> Result<Self> {
242        if file_len < 8 {
243            return Err(ParquetError::General(format!(
244                "Parquet files are at least 8 bytes long, but file length is {file_len}"
245            )));
246        };
247
248        Ok(Self {
249            state: DecodeState::ReadingFooter,
250            column_index_policy: PageIndexPolicy::Optional,
251            offset_index_policy: PageIndexPolicy::Optional,
252            buffers: crate::util::push_buffers::PushBuffers::new(file_len),
253            metadata_parser: MetadataParser::new(),
254        })
255    }
256
257    /// Begin decoding from the given footer tail.
258    pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
259        let mut new_self = Self::try_new(file_len)?;
260        new_self.state = DecodeState::ReadingMetadata(footer_tail);
261        Ok(new_self)
262    }
263
264    /// Create a decoder with the given `ParquetMetaData` already known.
265    ///
266    /// This can be used to parse and populate the page index structures
267    /// after the metadata has already been decoded.
268    pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
269        let mut new_self = Self::try_new(file_len)?;
270        new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
271        Ok(new_self)
272    }
273
274    /// Enable or disable reading the page index structures described in
275    /// "[Parquet page index] Layout to Support Page Skipping".
276    ///
277    /// Defaults to [`PageIndexPolicy::Optional`]
278    ///
279    /// This requires
280    /// 1. The Parquet file to have been written with page indexes
281    /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer)
282    ///
283    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
284    pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
285        self.column_index_policy = page_index_policy;
286        self.offset_index_policy = page_index_policy;
287        self
288    }
289
290    /// Set the policy for reading the ColumnIndex (part of the PageIndex)
291    pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
292        self.column_index_policy = column_index_policy;
293        self
294    }
295
296    /// Set the policy for reading the OffsetIndex (part of the PageIndex)
297    pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
298        self.offset_index_policy = offset_index_policy;
299        self
300    }
301
302    #[cfg(feature = "encryption")]
303    /// Provide decryption properties for decoding encrypted Parquet files
304    pub(crate) fn with_file_decryption_properties(
305        mut self,
306        file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
307    ) -> Self {
308        self.metadata_parser = self
309            .metadata_parser
310            .with_file_decryption_properties(file_decryption_properties);
311        self
312    }
313
314    /// Push the data into the decoder's buffer.
315    ///
316    /// The decoder does not immediately attempt to decode the metadata
317    /// after pushing data. Instead, it accumulates the pushed data until you
318    /// call [`Self::try_decode`].
319    ///
320    /// # Determining required data:
321    ///
322    /// To determine what ranges are required to decode the metadata, you can
323    /// either:
324    ///
325    /// 1. Call [`Self::try_decode`] first to get the exact ranges required (see
326    ///    example on [`Self`])
327    ///
328    /// 2. Speculatively push any data that you have available, which may
329    ///    include more than the footer data or requested bytes.
330    ///
331    /// Speculatively pushing data can be used when  "prefetching" data. See
332    /// example on [`Self`]
333    pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
334        if matches!(&self.state, DecodeState::Finished) {
335            return Err(general_err!(
336                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
337            ));
338        }
339        self.buffers.push_ranges(ranges, buffers);
340        Ok(())
341    }
342
343    /// Pushes a single range of data into the decoder's buffer.
344    pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
345        if matches!(&self.state, DecodeState::Finished) {
346            return Err(general_err!(
347                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
348            ));
349        }
350        self.buffers.push_range(range, buffer);
351        Ok(())
352    }
353
354    /// Try to decode the metadata from the pushed data, returning the
355    /// decoded metadata or an error if not enough data is available.
356    pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
357        let file_len = self.buffers.file_len();
358        let footer_len = FOOTER_SIZE as u64;
359        loop {
360            match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
361                DecodeState::ReadingFooter => {
362                    // need to have the last 8 bytes of the file to decode the metadata
363                    let footer_start = file_len.saturating_sub(footer_len);
364                    let footer_range = footer_start..file_len;
365
366                    if !self.buffers.has_range(&footer_range) {
367                        self.state = DecodeState::ReadingFooter;
368                        return Ok(needs_range(footer_range));
369                    }
370                    let footer_bytes = self.get_bytes(&footer_range)?;
371                    let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
372
373                    self.state = DecodeState::ReadingMetadata(footer_tail);
374                    continue;
375                }
376
377                DecodeState::ReadingMetadata(footer_tail) => {
378                    let metadata_len: u64 = footer_tail.metadata_length() as u64;
379                    let metadata_start = file_len - footer_len - metadata_len;
380                    let metadata_end = metadata_start + metadata_len;
381                    let metadata_range = metadata_start..metadata_end;
382
383                    if !self.buffers.has_range(&metadata_range) {
384                        self.state = DecodeState::ReadingMetadata(footer_tail);
385                        return Ok(needs_range(metadata_range));
386                    }
387
388                    let metadata = self.metadata_parser.decode_metadata(
389                        &self.get_bytes(&metadata_range)?,
390                        footer_tail.is_encrypted_footer(),
391                    )?;
392                    // Note: ReadingPageIndex first checks if page indexes are needed
393                    // and is a no-op if not
394                    self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
395                    continue;
396                }
397
398                DecodeState::ReadingPageIndex(mut metadata) => {
399                    // First determine if any page indexes are needed based on
400                    // the specified policies
401                    let range = range_for_page_index(
402                        &metadata,
403                        self.column_index_policy,
404                        self.offset_index_policy,
405                    );
406
407                    let Some(page_index_range) = range else {
408                        self.state = DecodeState::Finished;
409                        return Ok(DecodeResult::Data(*metadata));
410                    };
411
412                    if !self.buffers.has_range(&page_index_range) {
413                        self.state = DecodeState::ReadingPageIndex(metadata);
414                        return Ok(needs_range(page_index_range));
415                    }
416
417                    let buffer = self.get_bytes(&page_index_range)?;
418                    let offset = page_index_range.start;
419                    parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
420                    parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
421                    self.state = DecodeState::Finished;
422                    return Ok(DecodeResult::Data(*metadata));
423                }
424
425                DecodeState::Finished => return Ok(DecodeResult::Finished),
426                DecodeState::Intermediate => {
427                    return Err(general_err!(
428                        "ParquetMetaDataPushDecoder: internal error, invalid state"
429                    ));
430                }
431            }
432        }
433    }
434
435    /// Returns the bytes for the given range from the internal buffer
436    fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
437        let start = range.start;
438        let raw_len = range.end - range.start;
439        let len: usize = raw_len.try_into().map_err(|_| {
440            ParquetError::General(format!(
441                "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
442            ))
443        })?;
444        self.buffers.get_bytes(start, len)
445    }
446}
447
448/// returns a DecodeResults that describes needing the given range
449fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
450    DecodeResult::NeedsData(vec![range])
451}
452
453/// Decoding state machine
454#[derive(Debug)]
455enum DecodeState {
456    /// Reading the last 8 bytes of the file
457    ReadingFooter,
458    /// Reading the metadata thrift structure
459    ReadingMetadata(FooterTail),
460    // Actively reading the page index
461    ReadingPageIndex(Box<ParquetMetaData>),
462    // Decoding is complete
463    Finished,
464    /// State left during the `try_decode` method so something valid is present.
465    /// This state should never be observed.
466    Intermediate,
467}
468
469/// Returns the byte range needed to read the offset/page indexes, based on the
470/// specified policies
471///
472/// Returns None if no page indexes are needed
473pub fn range_for_page_index(
474    metadata: &ParquetMetaData,
475    column_index_policy: PageIndexPolicy,
476    offset_index_policy: PageIndexPolicy,
477) -> Option<Range<u64>> {
478    let mut range = None;
479    for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
480        if column_index_policy != PageIndexPolicy::Skip {
481            range = acc_range(range, c.column_index_range());
482        }
483        if offset_index_policy != PageIndexPolicy::Skip {
484            range = acc_range(range, c.offset_index_range());
485        }
486    }
487    range
488}
489
490// These tests use the arrow writer to create a parquet file in memory
491// so they need the arrow feature and the test feature
492#[cfg(all(test, feature = "arrow"))]
493mod tests {
494    use super::*;
495    use crate::arrow::ArrowWriter;
496    use crate::file::properties::WriterProperties;
497    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
498    use bytes::Bytes;
499    use std::fmt::Debug;
500    use std::ops::Range;
501    use std::sync::{Arc, LazyLock};
502
503    /// It is possible to decode the metadata from the entire file at once before being asked
504    #[test]
505    fn test_metadata_decoder_all_data() {
506        let file_len = test_file_len();
507        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
508        // Push the entire file data into the metadata decoder
509        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
510
511        // should be able to decode the metadata without needing more data
512        let metadata = expect_data(metadata_decoder.try_decode());
513
514        assert_eq!(metadata.num_row_groups(), 2);
515        assert_eq!(metadata.row_group(0).num_rows(), 200);
516        assert_eq!(metadata.row_group(1).num_rows(), 200);
517        assert!(metadata.column_index().is_some());
518        assert!(metadata.offset_index().is_some());
519    }
520
521    /// It is possible to feed some, but not all, of the footer into the metadata decoder
522    /// before asked. This avoids multiple IO requests
523    #[test]
524    fn test_metadata_decoder_prefetch_success() {
525        let file_len = test_file_len();
526        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
527        // simulate pre-fetching the last 2k bytes of the file without asking the decoder
528        let prefetch_range = (file_len - 2 * 1024)..file_len;
529        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
530
531        // expect the decoder has enough data to decode the metadata
532        let metadata = expect_data(metadata_decoder.try_decode());
533        expect_finished(metadata_decoder.try_decode());
534        assert_eq!(metadata.num_row_groups(), 2);
535        assert_eq!(metadata.row_group(0).num_rows(), 200);
536        assert_eq!(metadata.row_group(1).num_rows(), 200);
537        assert!(metadata.column_index().is_some());
538        assert!(metadata.offset_index().is_some());
539    }
540
541    /// It is possible to pre-fetch some, but not all, of the necessary data
542    /// data
543    #[test]
544    fn test_metadata_decoder_prefetch_retry() {
545        let file_len = test_file_len();
546        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
547        // simulate pre-fetching the last 1500 bytes of the file.
548        // this is enough to read the footer thrift metadata, but not the offset indexes
549        let prefetch_range = (file_len - 1500)..file_len;
550        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
551
552        // expect another request is needed to read the offset indexes (note
553        // try_decode only returns NeedsData once, whereas without any prefetching it would
554        // return NeedsData three times)
555        let ranges = expect_needs_data(metadata_decoder.try_decode());
556        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
557
558        // expect the decoder has enough data to decode the metadata
559        let metadata = expect_data(metadata_decoder.try_decode());
560        expect_finished(metadata_decoder.try_decode());
561
562        assert_eq!(metadata.num_row_groups(), 2);
563        assert_eq!(metadata.row_group(0).num_rows(), 200);
564        assert_eq!(metadata.row_group(1).num_rows(), 200);
565        assert!(metadata.column_index().is_some());
566        assert!(metadata.offset_index().is_some());
567    }
568
569    /// Decode the metadata incrementally, simulating a scenario where exactly the data needed
570    /// is read in each step
571    #[test]
572    fn test_metadata_decoder_incremental() {
573        let file_len = TEST_FILE_DATA.len() as u64;
574        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
575        let ranges = expect_needs_data(metadata_decoder.try_decode());
576        assert_eq!(ranges.len(), 1);
577        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
578        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
579
580        // expect the first request to read the footer
581        let ranges = expect_needs_data(metadata_decoder.try_decode());
582        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
583
584        // expect the second request to read the offset indexes
585        let ranges = expect_needs_data(metadata_decoder.try_decode());
586        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
587
588        // expect the third request to read the actual data
589        let metadata = expect_data(metadata_decoder.try_decode());
590        expect_finished(metadata_decoder.try_decode());
591
592        assert_eq!(metadata.num_row_groups(), 2);
593        assert_eq!(metadata.row_group(0).num_rows(), 200);
594        assert_eq!(metadata.row_group(1).num_rows(), 200);
595        assert!(metadata.column_index().is_some());
596        assert!(metadata.offset_index().is_some());
597    }
598
599    /// Decode the metadata incrementally, but without reading the page indexes
600    /// (so only two requests)
601    #[test]
602    fn test_metadata_decoder_incremental_no_page_index() {
603        let file_len = TEST_FILE_DATA.len() as u64;
604        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
605            .unwrap()
606            .with_page_index_policy(PageIndexPolicy::Skip);
607        let ranges = expect_needs_data(metadata_decoder.try_decode());
608        assert_eq!(ranges.len(), 1);
609        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
610        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
611
612        // expect the first request to read the footer
613        let ranges = expect_needs_data(metadata_decoder.try_decode());
614        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
615
616        // expect NO second request to read the offset indexes, should just cough up the metadata
617        let metadata = expect_data(metadata_decoder.try_decode());
618        expect_finished(metadata_decoder.try_decode());
619
620        assert_eq!(metadata.num_row_groups(), 2);
621        assert_eq!(metadata.row_group(0).num_rows(), 200);
622        assert_eq!(metadata.row_group(1).num_rows(), 200);
623        assert!(metadata.column_index().is_none()); // of course, we did not read the column index
624        assert!(metadata.offset_index().is_none()); // or the offset index
625    }
626
627    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
628        // Input batch has 400 rows, with 3 columns: "a", "b", "c"
629        // Note c is a different types (so the data page sizes will be different)
630        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
631        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
632        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
633            if i % 2 == 0 {
634                format!("string_{i}")
635            } else {
636                format!("A string larger than 12 bytes and thus not inlined {i}")
637            }
638        })));
639
640        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
641    });
642
643    /// Create a parquet file in memory for testing. See [`test_file_range`] for details.
644    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
645        let input_batch = &TEST_BATCH;
646        let mut output = Vec::new();
647
648        let writer_options = WriterProperties::builder()
649            .set_max_row_group_size(200)
650            .set_data_page_row_count_limit(100)
651            .build();
652        let mut writer =
653            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
654
655        // since the limits are only enforced on batch boundaries, write the input
656        // batch in chunks of 50
657        let mut row_remain = input_batch.num_rows();
658        while row_remain > 0 {
659            let chunk_size = row_remain.min(50);
660            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
661            writer.write(&chunk).unwrap();
662            row_remain -= chunk_size;
663        }
664        writer.close().unwrap();
665        Bytes::from(output)
666    });
667
668    /// Return the length of the test file in bytes
669    fn test_file_len() -> u64 {
670        TEST_FILE_DATA.len() as u64
671    }
672
673    /// Return the range of the entire test file
674    fn test_file_range() -> Range<u64> {
675        0..test_file_len()
676    }
677
678    /// Return a slice of the test file data from the given range
679    pub fn test_file_slice(range: Range<u64>) -> Bytes {
680        let start: usize = range.start.try_into().unwrap();
681        let end: usize = range.end.try_into().unwrap();
682        TEST_FILE_DATA.slice(start..end)
683    }
684
685    /// Push the given ranges to the metadata decoder, simulating reading from a file
686    fn push_ranges_to_metadata_decoder(
687        metadata_decoder: &mut ParquetMetaDataPushDecoder,
688        ranges: Vec<Range<u64>>,
689    ) {
690        let data = ranges
691            .iter()
692            .map(|range| test_file_slice(range.clone()))
693            .collect::<Vec<_>>();
694        metadata_decoder.push_ranges(ranges, data).unwrap();
695    }
696
697    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
698    fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
699        match result.expect("Expected Ok(DecodeResult::Data(T))") {
700            DecodeResult::Data(data) => data,
701            result => panic!("Expected DecodeResult::Data, got {result:?}"),
702        }
703    }
704
705    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
706    fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
707        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
708            DecodeResult::NeedsData(ranges) => ranges,
709            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
710        }
711    }
712
713    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
714        match result.expect("Expected Ok(DecodeResult::Finished)") {
715            DecodeResult::Finished => {}
716            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
717        }
718    }
719}