parquet/file/metadata/
push_decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::metadata::parser::{parse_column_index, parse_offset_index, MetadataParser};
22use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData};
23use crate::file::page_index::index_reader::acc_range;
24use crate::file::reader::ChunkReader;
25use crate::file::FOOTER_SIZE;
26use crate::DecodeResult;
27use bytes::Bytes;
28use std::ops::Range;
29
30/// A push decoder for [`ParquetMetaData`].
31///
32/// This structure implements a push API for decoding Parquet metadata, which
33/// decouples IO from the metadata decoding logic (sometimes referred to as
34/// [Sans-IO]).
35///
36/// See [`ParquetMetaDataReader`] for a pull-based API that incorporates IO and
37/// is simpler to use for basic use cases. This decoder is best for customizing
38/// your IO operations to minimize bytes read, prefetch data, or use async IO.
39///
40/// [Sans-IO]: https://sans-io.readthedocs.io
41/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
42///
43/// # Example
44///
45/// The most basic usage is to feed the decoder with the necessary byte ranges
46/// as requested as shown below. This minimizes the number of bytes read, but
47/// requires the most IO operations - one to read the footer and then one
48/// to read the metadata, and possibly more if page indexes are requested.
49///
50/// ```rust
51/// # use std::ops::Range;
52/// # use bytes::Bytes;
53/// # use arrow_array::record_batch;
54/// # use parquet::DecodeResult;
55/// # use parquet::arrow::ArrowWriter;
56/// # use parquet::errors::ParquetError;
57/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
58/// #
59/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
60/// # let file_bytes = {
61/// #   let mut buffer = vec![0];
62/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
63/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
64/// #   writer.write(&batch).unwrap();
65/// #   writer.close().unwrap();
66/// #   Bytes::from(buffer)
67/// # };
68/// # // mimic IO by returning a function that returns the bytes for a given range
69/// # let get_range = |range: &Range<u64>| -> Bytes {
70/// #    let start = range.start as usize;
71/// #     let end = range.end as usize;
72/// #    file_bytes.slice(start..end)
73/// # };
74/// #
75/// # let file_len = file_bytes.len() as u64;
76/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
77/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
78/// // try to decode the metadata. If more data is needed, the decoder will tell you what ranges
79/// loop {
80///     match decoder.try_decode() {
81///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
82///        Ok(DecodeResult::NeedsData(ranges)) => {
83///           // The decoder needs more data
84///           //
85///           // In this example, we call a function that returns the bytes for each given range.
86///           // In a real application, you would likely read the data from a file or network.
87///           let data = ranges.iter().map(|range| get_range(range)).collect();
88///           // Push the data into the decoder and try to decode again on the next iteration.
89///           decoder.push_ranges(ranges, data).unwrap();
90///        }
91///        Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
92///        Err(e) => return Err(e),
93///     }
94/// }
95/// # }
96/// ```
97///
98/// # Example with "prefetching"
99///
100/// By default, the [`ParquetMetaDataPushDecoder`] will request only the exact byte
101/// ranges it needs. This minimizes the number of bytes read, however it
102/// requires at least two IO operations to read the metadata - one to read the
103/// footer and then one to read the metadata.
104///
105/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
106/// IO operations are required to read the metadata, as the page index is
107/// not part of the normal metadata footer.
108///
109/// To reduce the number of IO operations in systems with high per operation
110/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
111/// the data into the decoder before calling [`Self::try_decode`]. If you do
112/// not push enough bytes, the decoder will return the ranges that are still
113/// needed.
114///
115/// This approach can also be used when you have the entire file already in memory
116/// for other reasons.
117///
118/// ```rust
119/// # use std::ops::Range;
120/// # use bytes::Bytes;
121/// # use arrow_array::record_batch;
122/// # use parquet::DecodeResult;
123/// # use parquet::arrow::ArrowWriter;
124/// # use parquet::errors::ParquetError;
125/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
126/// #
127/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
128/// # let file_bytes = {
129/// #   let mut buffer = vec![0];
130/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
131/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
132/// #   writer.write(&batch).unwrap();
133/// #   writer.close().unwrap();
134/// #   Bytes::from(buffer)
135/// # };
136/// #
137/// let file_len = file_bytes.len() as u64;
138/// // For this example, we "prefetch" all the bytes which we have in memory,
139/// // but in a real application, you would likely read a chunk from the end
140/// // for example 1MB.
141/// let prefetched_bytes = file_bytes.clone();
142/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
143/// // push the prefetched bytes into the decoder
144/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
145/// // The decoder will now be able to decode the metadata. Note in a real application,
146/// // unless you can guarantee that the pushed data is enough to decode the metadata,
147/// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
148/// // as shown in  the previous example
149///  match decoder.try_decode() {
150///      Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
151///      other => { panic!("expected DecodeResult::Data, got: {other:?}") }
152///  }
153/// # }
154/// ```
155///
156/// # Example using [`AsyncRead`]
157///
158/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can
159/// provide byte ranges, including async IO sources. However, it does not
160/// implement async IO itself. To use async IO, you simply write an async
161/// wrapper around it that reads the required byte ranges and pushes them into the
162/// decoder.
163///
164/// ```rust
165/// # use std::ops::Range;
166/// # use bytes::Bytes;
167/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
168/// # use arrow_array::record_batch;
169/// # use parquet::DecodeResult;
170/// # use parquet::arrow::ArrowWriter;
171/// # use parquet::errors::ParquetError;
172/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
173/// #
174/// // This function decodes Parquet Metadata from anything that implements
175/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
176/// async fn decode_metadata(
177///   file_len: u64,
178///   mut async_source: impl AsyncRead + AsyncSeek + Unpin
179/// ) -> Result<ParquetMetaData, ParquetError> {
180///   // We need a ParquetMetaDataPushDecoder to decode the metadata.
181///   let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
182///   loop {
183///     match decoder.try_decode() {
184///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
185///        Ok(DecodeResult::NeedsData(ranges)) => {
186///           // The decoder needs more data
187///           //
188///           // In this example we use the AsyncRead and AsyncSeek traits to read the
189///           // required ranges from the async source.
190///           let mut data = Vec::with_capacity(ranges.len());
191///           for range in &ranges {
192///             let mut buffer = vec![0; (range.end - range.start) as usize];
193///             async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
194///             async_source.read_exact(&mut buffer).await?;
195///             data.push(Bytes::from(buffer));
196///           }
197///           // Push the data into the decoder and try to decode again on the next iteration.
198///           decoder.push_ranges(ranges, data).unwrap();
199///        }
200///        Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
201///        Err(e) => return Err(e),
202///     }
203///   }
204/// }
205/// ```
206/// [`AsyncRead`]: tokio::io::AsyncRead
207#[derive(Debug)]
208pub struct ParquetMetaDataPushDecoder {
209    /// Decoding state
210    state: DecodeState,
211    /// policy for loading ColumnIndex (part of the PageIndex)
212    column_index_policy: PageIndexPolicy,
213    /// policy for loading OffsetIndex (part of the PageIndex)
214    offset_index_policy: PageIndexPolicy,
215    /// Underlying buffers
216    buffers: crate::util::push_buffers::PushBuffers,
217    /// Encryption API
218    metadata_parser: MetadataParser,
219}
220
221impl ParquetMetaDataPushDecoder {
222    /// Create a new `ParquetMetaDataPushDecoder` with the given file length.
223    ///
224    /// By default, this will read page indexes and column indexes. See
225    /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail.
226    ///
227    /// See examples on [`ParquetMetaDataPushDecoder`].
228    pub fn try_new(file_len: u64) -> Result<Self> {
229        if file_len < 8 {
230            return Err(ParquetError::General(format!(
231                "Parquet files are at least 8 bytes long, but file length is {file_len}"
232            )));
233        };
234
235        Ok(Self {
236            state: DecodeState::ReadingFooter,
237            column_index_policy: PageIndexPolicy::Optional,
238            offset_index_policy: PageIndexPolicy::Optional,
239            buffers: crate::util::push_buffers::PushBuffers::new(file_len),
240            metadata_parser: MetadataParser::new(),
241        })
242    }
243
244    /// Begin decoding from the given footer tail.
245    pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
246        let mut new_self = Self::try_new(file_len)?;
247        new_self.state = DecodeState::ReadingMetadata(footer_tail);
248        Ok(new_self)
249    }
250
251    /// Create a decoder with the given `ParquetMetaData` already known.
252    ///
253    /// This can be used to parse and populate the page index structures
254    /// after the metadata has already been decoded.
255    pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
256        let mut new_self = Self::try_new(file_len)?;
257        new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
258        Ok(new_self)
259    }
260
261    /// Enable or disable reading the page index structures described in
262    /// "[Parquet page index] Layout to Support Page Skipping".
263    ///
264    /// Defaults to [`PageIndexPolicy::Optional`]
265    ///
266    /// This requires
267    /// 1. The Parquet file to have been written with page indexes
268    /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer)
269    ///
270    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
271    pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
272        self.column_index_policy = page_index_policy;
273        self.offset_index_policy = page_index_policy;
274        self
275    }
276
277    /// Set the policy for reading the ColumnIndex (part of the PageIndex)
278    pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
279        self.column_index_policy = column_index_policy;
280        self
281    }
282
283    /// Set the policy for reading the OffsetIndex (part of the PageIndex)
284    pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
285        self.offset_index_policy = offset_index_policy;
286        self
287    }
288
289    #[cfg(feature = "encryption")]
290    /// Provide decryption properties for decoding encrypted Parquet files
291    pub(crate) fn with_file_decryption_properties(
292        mut self,
293        file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
294    ) -> Self {
295        self.metadata_parser = self
296            .metadata_parser
297            .with_file_decryption_properties(file_decryption_properties);
298        self
299    }
300
301    /// Push the data into the decoder's buffer.
302    ///
303    /// The decoder does not immediately attempt to decode the metadata
304    /// after pushing data. Instead, it accumulates the pushed data until you
305    /// call [`Self::try_decode`].
306    ///
307    /// # Determining required data:
308    ///
309    /// To determine what ranges are required to decode the metadata, you can
310    /// either:
311    ///
312    /// 1. Call [`Self::try_decode`] first to get the exact ranges required (see
313    ///    example on [`Self`])
314    ///
315    /// 2. Speculatively push any data that you have available, which may
316    ///    include more than the footer data or requested bytes.
317    ///
318    /// Speculatively pushing data can be used when  "prefetching" data. See
319    /// example on [`Self`]
320    pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
321        if matches!(&self.state, DecodeState::Finished) {
322            return Err(general_err!(
323                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
324            ));
325        }
326        self.buffers.push_ranges(ranges, buffers);
327        Ok(())
328    }
329
330    /// Pushes a single range of data into the decoder's buffer.
331    pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
332        if matches!(&self.state, DecodeState::Finished) {
333            return Err(general_err!(
334                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
335            ));
336        }
337        self.buffers.push_range(range, buffer);
338        Ok(())
339    }
340
341    /// Try to decode the metadata from the pushed data, returning the
342    /// decoded metadata or an error if not enough data is available.
343    pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
344        let file_len = self.buffers.file_len();
345        let footer_len = FOOTER_SIZE as u64;
346        loop {
347            match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
348                DecodeState::ReadingFooter => {
349                    // need to have the last 8 bytes of the file to decode the metadata
350                    let footer_start = file_len.saturating_sub(footer_len);
351                    let footer_range = footer_start..file_len;
352
353                    if !self.buffers.has_range(&footer_range) {
354                        self.state = DecodeState::ReadingFooter;
355                        return Ok(needs_range(footer_range));
356                    }
357                    let footer_bytes = self.get_bytes(&footer_range)?;
358                    let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
359
360                    self.state = DecodeState::ReadingMetadata(footer_tail);
361                    continue;
362                }
363
364                DecodeState::ReadingMetadata(footer_tail) => {
365                    let metadata_len: u64 = footer_tail.metadata_length() as u64;
366                    let metadata_start = file_len - footer_len - metadata_len;
367                    let metadata_end = metadata_start + metadata_len;
368                    let metadata_range = metadata_start..metadata_end;
369
370                    if !self.buffers.has_range(&metadata_range) {
371                        self.state = DecodeState::ReadingMetadata(footer_tail);
372                        return Ok(needs_range(metadata_range));
373                    }
374
375                    let metadata = self.metadata_parser.decode_metadata(
376                        &self.get_bytes(&metadata_range)?,
377                        footer_tail.is_encrypted_footer(),
378                    )?;
379                    // Note: ReadingPageIndex first checks if page indexes are needed
380                    // and is a no-op if not
381                    self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
382                    continue;
383                }
384
385                DecodeState::ReadingPageIndex(mut metadata) => {
386                    // First determine if any page indexes are needed based on
387                    // the specified policies
388                    let range = range_for_page_index(
389                        &metadata,
390                        self.column_index_policy,
391                        self.offset_index_policy,
392                    );
393
394                    let Some(page_index_range) = range else {
395                        self.state = DecodeState::Finished;
396                        return Ok(DecodeResult::Data(*metadata));
397                    };
398
399                    if !self.buffers.has_range(&page_index_range) {
400                        self.state = DecodeState::ReadingPageIndex(metadata);
401                        return Ok(needs_range(page_index_range));
402                    }
403
404                    let buffer = self.get_bytes(&page_index_range)?;
405                    let offset = page_index_range.start;
406                    parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
407                    parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
408                    self.state = DecodeState::Finished;
409                    return Ok(DecodeResult::Data(*metadata));
410                }
411
412                DecodeState::Finished => return Ok(DecodeResult::Finished),
413                DecodeState::Intermediate => {
414                    return Err(general_err!(
415                        "ParquetMetaDataPushDecoder: internal error, invalid state"
416                    ));
417                }
418            }
419        }
420    }
421
422    /// Returns the bytes for the given range from the internal buffer
423    fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
424        let start = range.start;
425        let raw_len = range.end - range.start;
426        let len: usize = raw_len.try_into().map_err(|_| {
427            ParquetError::General(format!(
428                "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
429            ))
430        })?;
431        self.buffers.get_bytes(start, len)
432    }
433}
434
435/// returns a DecodeResults that describes needing the given range
436fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
437    DecodeResult::NeedsData(vec![range])
438}
439
440/// Decoding state machine
441#[derive(Debug)]
442enum DecodeState {
443    /// Reading the last 8 bytes of the file
444    ReadingFooter,
445    /// Reading the metadata thrift structure
446    ReadingMetadata(FooterTail),
447    // Actively reading the page index
448    ReadingPageIndex(Box<ParquetMetaData>),
449    // Decoding is complete
450    Finished,
451    /// State left during the `try_decode` method so something valid is present.
452    /// This state should never be observed.
453    Intermediate,
454}
455
456/// Returns the byte range needed to read the offset/page indexes, based on the
457/// specified policies
458///
459/// Returns None if no page indexes are needed
460pub fn range_for_page_index(
461    metadata: &ParquetMetaData,
462    column_index_policy: PageIndexPolicy,
463    offset_index_policy: PageIndexPolicy,
464) -> Option<Range<u64>> {
465    let mut range = None;
466    for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
467        if column_index_policy != PageIndexPolicy::Skip {
468            range = acc_range(range, c.column_index_range());
469        }
470        if offset_index_policy != PageIndexPolicy::Skip {
471            range = acc_range(range, c.offset_index_range());
472        }
473    }
474    range
475}
476
477// These tests use the arrow writer to create a parquet file in memory
478// so they need the arrow feature and the test feature
479#[cfg(all(test, feature = "arrow"))]
480mod tests {
481    use super::*;
482    use crate::arrow::ArrowWriter;
483    use crate::file::properties::WriterProperties;
484    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
485    use bytes::Bytes;
486    use std::fmt::Debug;
487    use std::ops::Range;
488    use std::sync::{Arc, LazyLock};
489
490    /// It is possible to decode the metadata from the entire file at once before being asked
491    #[test]
492    fn test_metadata_decoder_all_data() {
493        let file_len = test_file_len();
494        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
495        // Push the entire file data into the metadata decoder
496        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
497
498        // should be able to decode the metadata without needing more data
499        let metadata = expect_data(metadata_decoder.try_decode());
500
501        assert_eq!(metadata.num_row_groups(), 2);
502        assert_eq!(metadata.row_group(0).num_rows(), 200);
503        assert_eq!(metadata.row_group(1).num_rows(), 200);
504        assert!(metadata.column_index().is_some());
505        assert!(metadata.offset_index().is_some());
506    }
507
508    /// It is possible to feed some, but not all, of the footer into the metadata decoder
509    /// before asked. This avoids multiple IO requests
510    #[test]
511    fn test_metadata_decoder_prefetch_success() {
512        let file_len = test_file_len();
513        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
514        // simulate pre-fetching the last 2k bytes of the file without asking the decoder
515        let prefetch_range = (file_len - 2 * 1024)..file_len;
516        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
517
518        // expect the decoder has enough data to decode the metadata
519        let metadata = expect_data(metadata_decoder.try_decode());
520        expect_finished(metadata_decoder.try_decode());
521        assert_eq!(metadata.num_row_groups(), 2);
522        assert_eq!(metadata.row_group(0).num_rows(), 200);
523        assert_eq!(metadata.row_group(1).num_rows(), 200);
524        assert!(metadata.column_index().is_some());
525        assert!(metadata.offset_index().is_some());
526    }
527
528    /// It is possible to pre-fetch some, but not all, of the necessary data
529    /// data
530    #[test]
531    fn test_metadata_decoder_prefetch_retry() {
532        let file_len = test_file_len();
533        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
534        // simulate pre-fetching the last 1500 bytes of the file.
535        // this is enough to read the footer thrift metadata, but not the offset indexes
536        let prefetch_range = (file_len - 1500)..file_len;
537        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
538
539        // expect another request is needed to read the offset indexes (note
540        // try_decode only returns NeedsData once, whereas without any prefetching it would
541        // return NeedsData three times)
542        let ranges = expect_needs_data(metadata_decoder.try_decode());
543        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
544
545        // expect the decoder has enough data to decode the metadata
546        let metadata = expect_data(metadata_decoder.try_decode());
547        expect_finished(metadata_decoder.try_decode());
548
549        assert_eq!(metadata.num_row_groups(), 2);
550        assert_eq!(metadata.row_group(0).num_rows(), 200);
551        assert_eq!(metadata.row_group(1).num_rows(), 200);
552        assert!(metadata.column_index().is_some());
553        assert!(metadata.offset_index().is_some());
554    }
555
556    /// Decode the metadata incrementally, simulating a scenario where exactly the data needed
557    /// is read in each step
558    #[test]
559    fn test_metadata_decoder_incremental() {
560        let file_len = TEST_FILE_DATA.len() as u64;
561        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
562        let ranges = expect_needs_data(metadata_decoder.try_decode());
563        assert_eq!(ranges.len(), 1);
564        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
565        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
566
567        // expect the first request to read the footer
568        let ranges = expect_needs_data(metadata_decoder.try_decode());
569        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
570
571        // expect the second request to read the offset indexes
572        let ranges = expect_needs_data(metadata_decoder.try_decode());
573        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
574
575        // expect the third request to read the actual data
576        let metadata = expect_data(metadata_decoder.try_decode());
577        expect_finished(metadata_decoder.try_decode());
578
579        assert_eq!(metadata.num_row_groups(), 2);
580        assert_eq!(metadata.row_group(0).num_rows(), 200);
581        assert_eq!(metadata.row_group(1).num_rows(), 200);
582        assert!(metadata.column_index().is_some());
583        assert!(metadata.offset_index().is_some());
584    }
585
586    /// Decode the metadata incrementally, but without reading the page indexes
587    /// (so only two requests)
588    #[test]
589    fn test_metadata_decoder_incremental_no_page_index() {
590        let file_len = TEST_FILE_DATA.len() as u64;
591        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
592            .unwrap()
593            .with_page_index_policy(PageIndexPolicy::Skip);
594        let ranges = expect_needs_data(metadata_decoder.try_decode());
595        assert_eq!(ranges.len(), 1);
596        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
597        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
598
599        // expect the first request to read the footer
600        let ranges = expect_needs_data(metadata_decoder.try_decode());
601        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
602
603        // expect NO second request to read the offset indexes, should just cough up the metadata
604        let metadata = expect_data(metadata_decoder.try_decode());
605        expect_finished(metadata_decoder.try_decode());
606
607        assert_eq!(metadata.num_row_groups(), 2);
608        assert_eq!(metadata.row_group(0).num_rows(), 200);
609        assert_eq!(metadata.row_group(1).num_rows(), 200);
610        assert!(metadata.column_index().is_none()); // of course, we did not read the column index
611        assert!(metadata.offset_index().is_none()); // or the offset index
612    }
613
614    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
615        // Input batch has 400 rows, with 3 columns: "a", "b", "c"
616        // Note c is a different types (so the data page sizes will be different)
617        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
618        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
619        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
620            if i % 2 == 0 {
621                format!("string_{i}")
622            } else {
623                format!("A string larger than 12 bytes and thus not inlined {i}")
624            }
625        })));
626
627        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
628    });
629
630    /// Create a parquet file in memory for testing. See [`test_file_range`] for details.
631    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
632        let input_batch = &TEST_BATCH;
633        let mut output = Vec::new();
634
635        let writer_options = WriterProperties::builder()
636            .set_max_row_group_size(200)
637            .set_data_page_row_count_limit(100)
638            .build();
639        let mut writer =
640            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
641
642        // since the limits are only enforced on batch boundaries, write the input
643        // batch in chunks of 50
644        let mut row_remain = input_batch.num_rows();
645        while row_remain > 0 {
646            let chunk_size = row_remain.min(50);
647            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
648            writer.write(&chunk).unwrap();
649            row_remain -= chunk_size;
650        }
651        writer.close().unwrap();
652        Bytes::from(output)
653    });
654
655    /// Return the length of the test file in bytes
656    fn test_file_len() -> u64 {
657        TEST_FILE_DATA.len() as u64
658    }
659
660    /// Return the range of the entire test file
661    fn test_file_range() -> Range<u64> {
662        0..test_file_len()
663    }
664
665    /// Return a slice of the test file data from the given range
666    pub fn test_file_slice(range: Range<u64>) -> Bytes {
667        let start: usize = range.start.try_into().unwrap();
668        let end: usize = range.end.try_into().unwrap();
669        TEST_FILE_DATA.slice(start..end)
670    }
671
672    /// Push the given ranges to the metadata decoder, simulating reading from a file
673    fn push_ranges_to_metadata_decoder(
674        metadata_decoder: &mut ParquetMetaDataPushDecoder,
675        ranges: Vec<Range<u64>>,
676    ) {
677        let data = ranges
678            .iter()
679            .map(|range| test_file_slice(range.clone()))
680            .collect::<Vec<_>>();
681        metadata_decoder.push_ranges(ranges, data).unwrap();
682    }
683
684    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
685    fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
686        match result.expect("Expected Ok(DecodeResult::Data(T))") {
687            DecodeResult::Data(data) => data,
688            result => panic!("Expected DecodeResult::Data, got {result:?}"),
689        }
690    }
691
692    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
693    fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
694        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
695            DecodeResult::NeedsData(ranges) => ranges,
696            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
697        }
698    }
699
700    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
701        match result.expect("Expected Ok(DecodeResult::Finished)") {
702            DecodeResult::Finished => {}
703            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
704        }
705    }
706}