parquet/file/metadata/
push_decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19#[cfg(feature = "encryption")]
20use crate::encryption::decrypt::FileDecryptionProperties;
21use crate::errors::{ParquetError, Result};
22use crate::file::FOOTER_SIZE;
23use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
24use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
25use crate::file::page_index::index_reader::acc_range;
26use crate::file::reader::ChunkReader;
27use bytes::Bytes;
28use std::ops::Range;
29use std::sync::Arc;
30
31/// A push decoder for [`ParquetMetaData`].
32///
33/// This structure implements a push API for decoding Parquet metadata, which
34/// decouples IO from the metadata decoding logic (sometimes referred to as
35/// [Sans-IO]).
36///
37/// See [`ParquetMetaDataReader`] for a pull-based API that incorporates IO and
38/// is simpler to use for basic use cases. This decoder is best for customizing
39/// your IO operations to minimize bytes read, prefetch data, or use async IO.
40///
41/// [Sans-IO]: https://sans-io.readthedocs.io
42/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
43///
44/// # Example
45///
46/// The most basic usage is to feed the decoder with the necessary byte ranges
47/// as requested as shown below. This minimizes the number of bytes read, but
48/// requires the most IO operations - one to read the footer and then one
49/// to read the metadata, and possibly more if page indexes are requested.
50///
51#[cfg_attr(
52    feature = "arrow",
53    doc = r##"
54```rust
55# use std::ops::Range;
56# use bytes::Bytes;
57# use arrow_array::record_batch;
58# use parquet::DecodeResult;
59# use parquet::arrow::ArrowWriter;
60# use parquet::errors::ParquetError;
61# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
62#
63# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
64# let file_bytes = {
65#   let mut buffer = vec![0];
66#   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
67#   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
68#   writer.write(&batch).unwrap();
69#   writer.close().unwrap();
70#   Bytes::from(buffer)
71# };
72# // mimic IO by returning a function that returns the bytes for a given range
73# let get_range = |range: &Range<u64>| -> Bytes {
74#    let start = range.start as usize;
75#     let end = range.end as usize;
76#    file_bytes.slice(start..end)
77# };
78#
79# let file_len = file_bytes.len() as u64;
80// The `ParquetMetaDataPushDecoder` needs to know the file length.
81let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
82// try to decode the metadata. If more data is needed, the decoder will tell you what ranges
83loop {
84    match decoder.try_decode() {
85       Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
86       Ok(DecodeResult::NeedsData(ranges)) => {
87          // The decoder needs more data
88          //
89          // In this example, we call a function that returns the bytes for each given range.
90          // In a real application, you would likely read the data from a file or network.
91          let data = ranges.iter().map(|range| get_range(range)).collect();
92          // Push the data into the decoder and try to decode again on the next iteration.
93          decoder.push_ranges(ranges, data).unwrap();
94       }
95       Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
96       Err(e) => return Err(e),
97    }
98}
99# }
100```
101"##
102)]
103///
104/// # Example with "prefetching"
105///
106/// By default, the [`ParquetMetaDataPushDecoder`] will request only the exact byte
107/// ranges it needs. This minimizes the number of bytes read, however it
108/// requires at least two IO operations to read the metadata - one to read the
109/// footer and then one to read the metadata.
110///
111/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
112/// IO operations are required to read the metadata, as the page index is
113/// not part of the normal metadata footer.
114///
115/// To reduce the number of IO operations in systems with high per operation
116/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
117/// the data into the decoder before calling [`Self::try_decode`]. If you do
118/// not push enough bytes, the decoder will return the ranges that are still
119/// needed.
120///
121/// This approach can also be used when you have the entire file already in memory
122/// for other reasons.
123#[cfg_attr(
124    feature = "arrow",
125    doc = r##"
126```rust
127# use std::ops::Range;
128# use bytes::Bytes;
129# use arrow_array::record_batch;
130# use parquet::DecodeResult;
131# use parquet::arrow::ArrowWriter;
132# use parquet::errors::ParquetError;
133# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
134#
135# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
136# let file_bytes = {
137#   let mut buffer = vec![0];
138#   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
139#   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
140#   writer.write(&batch).unwrap();
141#   writer.close().unwrap();
142#   Bytes::from(buffer)
143# };
144#
145let file_len = file_bytes.len() as u64;
146// For this example, we "prefetch" all the bytes which we have in memory,
147// but in a real application, you would likely read a chunk from the end
148// for example 1MB.
149let prefetched_bytes = file_bytes.clone();
150let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
151// push the prefetched bytes into the decoder
152decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
153// The decoder will now be able to decode the metadata. Note in a real application,
154// unless you can guarantee that the pushed data is enough to decode the metadata,
155// you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
156// as shown in  the previous example
157    match decoder.try_decode() {
158        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
159        other => { panic!("expected DecodeResult::Data, got: {other:?}") }
160    }
161# }
162```
163"##
164)]
165///
166/// # Example using [`AsyncRead`]
167///
168/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can
169/// provide byte ranges, including async IO sources. However, it does not
170/// implement async IO itself. To use async IO, you simply write an async
171/// wrapper around it that reads the required byte ranges and pushes them into the
172/// decoder.
173#[cfg_attr(
174    feature = "arrow",
175    doc = r##"
176```rust
177# use std::ops::Range;
178# use bytes::Bytes;
179use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
180# use arrow_array::record_batch;
181# use parquet::DecodeResult;
182# use parquet::arrow::ArrowWriter;
183# use parquet::errors::ParquetError;
184# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
185#
186// This function decodes Parquet Metadata from anything that implements
187// [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
188async fn decode_metadata(
189  file_len: u64,
190  mut async_source: impl AsyncRead + AsyncSeek + Unpin
191) -> Result<ParquetMetaData, ParquetError> {
192  // We need a ParquetMetaDataPushDecoder to decode the metadata.
193  let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
194  loop {
195    match decoder.try_decode() {
196       Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
197       Ok(DecodeResult::NeedsData(ranges)) => {
198          // The decoder needs more data
199          //
200          // In this example we use the AsyncRead and AsyncSeek traits to read the
201          // required ranges from the async source.
202          let mut data = Vec::with_capacity(ranges.len());
203          for range in &ranges {
204            let mut buffer = vec![0; (range.end - range.start) as usize];
205            async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
206            async_source.read_exact(&mut buffer).await?;
207            data.push(Bytes::from(buffer));
208          }
209          // Push the data into the decoder and try to decode again on the next iteration.
210          decoder.push_ranges(ranges, data).unwrap();
211       }
212       Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
213       Err(e) => return Err(e),
214    }
215  }
216}
217```
218"##
219)]
220/// [`AsyncRead`]: tokio::io::AsyncRead
221#[derive(Debug)]
222pub struct ParquetMetaDataPushDecoder {
223    /// Decoding state
224    state: DecodeState,
225    /// policy for loading ColumnIndex (part of the PageIndex)
226    column_index_policy: PageIndexPolicy,
227    /// policy for loading OffsetIndex (part of the PageIndex)
228    offset_index_policy: PageIndexPolicy,
229    /// Underlying buffers
230    buffers: crate::util::push_buffers::PushBuffers,
231    /// Encryption API
232    metadata_parser: MetadataParser,
233}
234
235impl ParquetMetaDataPushDecoder {
236    /// Create a new `ParquetMetaDataPushDecoder` with the given file length.
237    ///
238    /// By default, this will read page indexes and column indexes. See
239    /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail.
240    ///
241    /// See examples on [`ParquetMetaDataPushDecoder`].
242    pub fn try_new(file_len: u64) -> Result<Self> {
243        if file_len < 8 {
244            return Err(ParquetError::General(format!(
245                "Parquet files are at least 8 bytes long, but file length is {file_len}"
246            )));
247        };
248
249        Ok(Self {
250            state: DecodeState::ReadingFooter,
251            column_index_policy: PageIndexPolicy::Optional,
252            offset_index_policy: PageIndexPolicy::Optional,
253            buffers: crate::util::push_buffers::PushBuffers::new(file_len),
254            metadata_parser: MetadataParser::new(),
255        })
256    }
257
258    /// Begin decoding from the given footer tail.
259    pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
260        let mut new_self = Self::try_new(file_len)?;
261        new_self.state = DecodeState::ReadingMetadata(footer_tail);
262        Ok(new_self)
263    }
264
265    /// Create a decoder with the given `ParquetMetaData` already known.
266    ///
267    /// This can be used to parse and populate the page index structures
268    /// after the metadata has already been decoded.
269    pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
270        let mut new_self = Self::try_new(file_len)?;
271        new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
272        Ok(new_self)
273    }
274
275    /// Enable or disable reading the page index structures described in
276    /// "[Parquet page index] Layout to Support Page Skipping".
277    ///
278    /// Defaults to [`PageIndexPolicy::Optional`]
279    ///
280    /// This requires
281    /// 1. The Parquet file to have been written with page indexes
282    /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer)
283    ///
284    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
285    pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
286        self.column_index_policy = page_index_policy;
287        self.offset_index_policy = page_index_policy;
288        self
289    }
290
291    /// Set the policy for reading the ColumnIndex (part of the PageIndex)
292    pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
293        self.column_index_policy = column_index_policy;
294        self
295    }
296
297    /// Set the policy for reading the OffsetIndex (part of the PageIndex)
298    pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
299        self.offset_index_policy = offset_index_policy;
300        self
301    }
302
303    /// Set the options to use when decoding the Parquet metadata.
304    pub fn with_metadata_options(mut self, options: Option<Arc<ParquetMetaDataOptions>>) -> Self {
305        self.metadata_parser = self.metadata_parser.with_metadata_options(options);
306        self
307    }
308
309    #[cfg(feature = "encryption")]
310    /// Provide decryption properties for decoding encrypted Parquet files
311    pub(crate) fn with_file_decryption_properties(
312        mut self,
313        file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
314    ) -> Self {
315        self.metadata_parser = self
316            .metadata_parser
317            .with_file_decryption_properties(file_decryption_properties);
318        self
319    }
320
321    /// Push the data into the decoder's buffer.
322    ///
323    /// The decoder does not immediately attempt to decode the metadata
324    /// after pushing data. Instead, it accumulates the pushed data until you
325    /// call [`Self::try_decode`].
326    ///
327    /// # Determining required data:
328    ///
329    /// To determine what ranges are required to decode the metadata, you can
330    /// either:
331    ///
332    /// 1. Call [`Self::try_decode`] first to get the exact ranges required (see
333    ///    example on [`Self`])
334    ///
335    /// 2. Speculatively push any data that you have available, which may
336    ///    include more than the footer data or requested bytes.
337    ///
338    /// Speculatively pushing data can be used when  "prefetching" data. See
339    /// example on [`Self`]
340    pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
341        if matches!(&self.state, DecodeState::Finished) {
342            return Err(general_err!(
343                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
344            ));
345        }
346        self.buffers.push_ranges(ranges, buffers);
347        Ok(())
348    }
349
350    /// Pushes a single range of data into the decoder's buffer.
351    pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
352        if matches!(&self.state, DecodeState::Finished) {
353            return Err(general_err!(
354                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
355            ));
356        }
357        self.buffers.push_range(range, buffer);
358        Ok(())
359    }
360
361    /// Try to decode the metadata from the pushed data, returning the
362    /// decoded metadata or an error if not enough data is available.
363    pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
364        let file_len = self.buffers.file_len();
365        let footer_len = FOOTER_SIZE as u64;
366        loop {
367            match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
368                DecodeState::ReadingFooter => {
369                    // need to have the last 8 bytes of the file to decode the metadata
370                    let footer_start = file_len.saturating_sub(footer_len);
371                    let footer_range = footer_start..file_len;
372
373                    if !self.buffers.has_range(&footer_range) {
374                        self.state = DecodeState::ReadingFooter;
375                        return Ok(needs_range(footer_range));
376                    }
377                    let footer_bytes = self.get_bytes(&footer_range)?;
378                    let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
379
380                    self.state = DecodeState::ReadingMetadata(footer_tail);
381                    continue;
382                }
383
384                DecodeState::ReadingMetadata(footer_tail) => {
385                    let metadata_len: u64 = footer_tail.metadata_length() as u64;
386                    let metadata_start = file_len - footer_len - metadata_len;
387                    let metadata_end = metadata_start + metadata_len;
388                    let metadata_range = metadata_start..metadata_end;
389
390                    if !self.buffers.has_range(&metadata_range) {
391                        self.state = DecodeState::ReadingMetadata(footer_tail);
392                        return Ok(needs_range(metadata_range));
393                    }
394
395                    let metadata = self.metadata_parser.decode_metadata(
396                        &self.get_bytes(&metadata_range)?,
397                        footer_tail.is_encrypted_footer(),
398                    )?;
399                    // Note: ReadingPageIndex first checks if page indexes are needed
400                    // and is a no-op if not
401                    self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
402                    continue;
403                }
404
405                DecodeState::ReadingPageIndex(mut metadata) => {
406                    // First determine if any page indexes are needed based on
407                    // the specified policies
408                    let range = range_for_page_index(
409                        &metadata,
410                        self.column_index_policy,
411                        self.offset_index_policy,
412                    );
413
414                    let Some(page_index_range) = range else {
415                        self.state = DecodeState::Finished;
416                        return Ok(DecodeResult::Data(*metadata));
417                    };
418
419                    if !self.buffers.has_range(&page_index_range) {
420                        self.state = DecodeState::ReadingPageIndex(metadata);
421                        return Ok(needs_range(page_index_range));
422                    }
423
424                    let buffer = self.get_bytes(&page_index_range)?;
425                    let offset = page_index_range.start;
426                    parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
427                    parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
428                    self.state = DecodeState::Finished;
429                    return Ok(DecodeResult::Data(*metadata));
430                }
431
432                DecodeState::Finished => return Ok(DecodeResult::Finished),
433                DecodeState::Intermediate => {
434                    return Err(general_err!(
435                        "ParquetMetaDataPushDecoder: internal error, invalid state"
436                    ));
437                }
438            }
439        }
440    }
441
442    /// Returns the bytes for the given range from the internal buffer
443    fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
444        let start = range.start;
445        let raw_len = range.end - range.start;
446        let len: usize = raw_len.try_into().map_err(|_| {
447            ParquetError::General(format!(
448                "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
449            ))
450        })?;
451        self.buffers.get_bytes(start, len)
452    }
453}
454
455/// returns a DecodeResults that describes needing the given range
456fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
457    DecodeResult::NeedsData(vec![range])
458}
459
460/// Decoding state machine
461#[derive(Debug)]
462enum DecodeState {
463    /// Reading the last 8 bytes of the file
464    ReadingFooter,
465    /// Reading the metadata thrift structure
466    ReadingMetadata(FooterTail),
467    // Actively reading the page index
468    ReadingPageIndex(Box<ParquetMetaData>),
469    // Decoding is complete
470    Finished,
471    /// State left during the `try_decode` method so something valid is present.
472    /// This state should never be observed.
473    Intermediate,
474}
475
476/// Returns the byte range needed to read the offset/page indexes, based on the
477/// specified policies
478///
479/// Returns None if no page indexes are needed
480pub fn range_for_page_index(
481    metadata: &ParquetMetaData,
482    column_index_policy: PageIndexPolicy,
483    offset_index_policy: PageIndexPolicy,
484) -> Option<Range<u64>> {
485    let mut range = None;
486    for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
487        if column_index_policy != PageIndexPolicy::Skip {
488            range = acc_range(range, c.column_index_range());
489        }
490        if offset_index_policy != PageIndexPolicy::Skip {
491            range = acc_range(range, c.offset_index_range());
492        }
493    }
494    range
495}
496
497// These tests use the arrow writer to create a parquet file in memory
498// so they need the arrow feature and the test feature
499#[cfg(all(test, feature = "arrow"))]
500mod tests {
501    use super::*;
502    use crate::arrow::ArrowWriter;
503    use crate::file::properties::WriterProperties;
504    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
505    use bytes::Bytes;
506    use std::fmt::Debug;
507    use std::ops::Range;
508    use std::sync::{Arc, LazyLock};
509
510    /// It is possible to decode the metadata from the entire file at once before being asked
511    #[test]
512    fn test_metadata_decoder_all_data() {
513        let file_len = test_file_len();
514        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
515        // Push the entire file data into the metadata decoder
516        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
517
518        // should be able to decode the metadata without needing more data
519        let metadata = expect_data(metadata_decoder.try_decode());
520
521        assert_eq!(metadata.num_row_groups(), 2);
522        assert_eq!(metadata.row_group(0).num_rows(), 200);
523        assert_eq!(metadata.row_group(1).num_rows(), 200);
524        assert!(metadata.column_index().is_some());
525        assert!(metadata.offset_index().is_some());
526    }
527
528    /// It is possible to feed some, but not all, of the footer into the metadata decoder
529    /// before asked. This avoids multiple IO requests
530    #[test]
531    fn test_metadata_decoder_prefetch_success() {
532        let file_len = test_file_len();
533        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
534        // simulate pre-fetching the last 2k bytes of the file without asking the decoder
535        let prefetch_range = (file_len - 2 * 1024)..file_len;
536        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
537
538        // expect the decoder has enough data to decode the metadata
539        let metadata = expect_data(metadata_decoder.try_decode());
540        expect_finished(metadata_decoder.try_decode());
541        assert_eq!(metadata.num_row_groups(), 2);
542        assert_eq!(metadata.row_group(0).num_rows(), 200);
543        assert_eq!(metadata.row_group(1).num_rows(), 200);
544        assert!(metadata.column_index().is_some());
545        assert!(metadata.offset_index().is_some());
546    }
547
548    /// It is possible to pre-fetch some, but not all, of the necessary data
549    /// data
550    #[test]
551    fn test_metadata_decoder_prefetch_retry() {
552        let file_len = test_file_len();
553        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
554        // simulate pre-fetching the last 1500 bytes of the file.
555        // this is enough to read the footer thrift metadata, but not the offset indexes
556        let prefetch_range = (file_len - 1500)..file_len;
557        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
558
559        // expect another request is needed to read the offset indexes (note
560        // try_decode only returns NeedsData once, whereas without any prefetching it would
561        // return NeedsData three times)
562        let ranges = expect_needs_data(metadata_decoder.try_decode());
563        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
564
565        // expect the decoder has enough data to decode the metadata
566        let metadata = expect_data(metadata_decoder.try_decode());
567        expect_finished(metadata_decoder.try_decode());
568
569        assert_eq!(metadata.num_row_groups(), 2);
570        assert_eq!(metadata.row_group(0).num_rows(), 200);
571        assert_eq!(metadata.row_group(1).num_rows(), 200);
572        assert!(metadata.column_index().is_some());
573        assert!(metadata.offset_index().is_some());
574    }
575
576    /// Decode the metadata incrementally, simulating a scenario where exactly the data needed
577    /// is read in each step
578    #[test]
579    fn test_metadata_decoder_incremental() {
580        let file_len = TEST_FILE_DATA.len() as u64;
581        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
582        let ranges = expect_needs_data(metadata_decoder.try_decode());
583        assert_eq!(ranges.len(), 1);
584        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
585        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
586
587        // expect the first request to read the footer
588        let ranges = expect_needs_data(metadata_decoder.try_decode());
589        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
590
591        // expect the second request to read the offset indexes
592        let ranges = expect_needs_data(metadata_decoder.try_decode());
593        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
594
595        // expect the third request to read the actual data
596        let metadata = expect_data(metadata_decoder.try_decode());
597        expect_finished(metadata_decoder.try_decode());
598
599        assert_eq!(metadata.num_row_groups(), 2);
600        assert_eq!(metadata.row_group(0).num_rows(), 200);
601        assert_eq!(metadata.row_group(1).num_rows(), 200);
602        assert!(metadata.column_index().is_some());
603        assert!(metadata.offset_index().is_some());
604    }
605
606    /// Decode the metadata incrementally, but without reading the page indexes
607    /// (so only two requests)
608    #[test]
609    fn test_metadata_decoder_incremental_no_page_index() {
610        let file_len = TEST_FILE_DATA.len() as u64;
611        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
612            .unwrap()
613            .with_page_index_policy(PageIndexPolicy::Skip);
614        let ranges = expect_needs_data(metadata_decoder.try_decode());
615        assert_eq!(ranges.len(), 1);
616        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
617        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
618
619        // expect the first request to read the footer
620        let ranges = expect_needs_data(metadata_decoder.try_decode());
621        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
622
623        // expect NO second request to read the offset indexes, should just cough up the metadata
624        let metadata = expect_data(metadata_decoder.try_decode());
625        expect_finished(metadata_decoder.try_decode());
626
627        assert_eq!(metadata.num_row_groups(), 2);
628        assert_eq!(metadata.row_group(0).num_rows(), 200);
629        assert_eq!(metadata.row_group(1).num_rows(), 200);
630        assert!(metadata.column_index().is_none()); // of course, we did not read the column index
631        assert!(metadata.offset_index().is_none()); // or the offset index
632    }
633
634    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
635        // Input batch has 400 rows, with 3 columns: "a", "b", "c"
636        // Note c is a different types (so the data page sizes will be different)
637        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
638        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
639        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
640            if i % 2 == 0 {
641                format!("string_{i}")
642            } else {
643                format!("A string larger than 12 bytes and thus not inlined {i}")
644            }
645        })));
646
647        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
648    });
649
650    /// Create a parquet file in memory for testing. See [`test_file_range`] for details.
651    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
652        let input_batch = &TEST_BATCH;
653        let mut output = Vec::new();
654
655        let writer_options = WriterProperties::builder()
656            .set_max_row_group_size(200)
657            .set_data_page_row_count_limit(100)
658            .build();
659        let mut writer =
660            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
661
662        // since the limits are only enforced on batch boundaries, write the input
663        // batch in chunks of 50
664        let mut row_remain = input_batch.num_rows();
665        while row_remain > 0 {
666            let chunk_size = row_remain.min(50);
667            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
668            writer.write(&chunk).unwrap();
669            row_remain -= chunk_size;
670        }
671        writer.close().unwrap();
672        Bytes::from(output)
673    });
674
675    /// Return the length of the test file in bytes
676    fn test_file_len() -> u64 {
677        TEST_FILE_DATA.len() as u64
678    }
679
680    /// Return the range of the entire test file
681    fn test_file_range() -> Range<u64> {
682        0..test_file_len()
683    }
684
685    /// Return a slice of the test file data from the given range
686    pub fn test_file_slice(range: Range<u64>) -> Bytes {
687        let start: usize = range.start.try_into().unwrap();
688        let end: usize = range.end.try_into().unwrap();
689        TEST_FILE_DATA.slice(start..end)
690    }
691
692    /// Push the given ranges to the metadata decoder, simulating reading from a file
693    fn push_ranges_to_metadata_decoder(
694        metadata_decoder: &mut ParquetMetaDataPushDecoder,
695        ranges: Vec<Range<u64>>,
696    ) {
697        let data = ranges
698            .iter()
699            .map(|range| test_file_slice(range.clone()))
700            .collect::<Vec<_>>();
701        metadata_decoder.push_ranges(ranges, data).unwrap();
702    }
703
704    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
705    fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
706        match result.expect("Expected Ok(DecodeResult::Data(T))") {
707            DecodeResult::Data(data) => data,
708            result => panic!("Expected DecodeResult::Data, got {result:?}"),
709        }
710    }
711
712    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
713    fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
714        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
715            DecodeResult::NeedsData(ranges) => ranges,
716            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
717        }
718    }
719
720    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
721        match result.expect("Expected Ok(DecodeResult::Finished)") {
722            DecodeResult::Finished => {}
723            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
724        }
725    }
726}