Skip to main content

parquet/file/metadata/
push_decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19#[cfg(feature = "encryption")]
20use crate::encryption::decrypt::FileDecryptionProperties;
21use crate::errors::{ParquetError, Result};
22use crate::file::FOOTER_SIZE;
23use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
24use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
25use crate::file::page_index::index_reader::acc_range;
26use crate::file::reader::ChunkReader;
27use bytes::Bytes;
28use std::ops::Range;
29use std::sync::Arc;
30
31/// A push decoder for [`ParquetMetaData`].
32///
33/// This structure implements a push API for decoding Parquet metadata, which
34/// decouples IO from the metadata decoding logic (sometimes referred to as
35/// [Sans-IO]).
36///
37/// See [`ParquetMetaDataReader`] for a pull-based API that incorporates IO and
38/// is simpler to use for basic use cases. This decoder is best for customizing
39/// your IO operations to minimize bytes read, prefetch data, or use async IO.
40///
41/// [Sans-IO]: https://sans-io.readthedocs.io
42/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
43///
44/// # Example
45///
46/// The most basic usage is to feed the decoder with the necessary byte ranges
47/// as requested as shown below. This minimizes the number of bytes read, but
48/// requires the most IO operations - one to read the footer and then one
49/// to read the metadata, and possibly more if page indexes are requested.
50///
51#[cfg_attr(
52    feature = "arrow",
53    doc = r##"
54```rust
55# use std::ops::Range;
56# use bytes::Bytes;
57# use arrow_array::record_batch;
58# use parquet::DecodeResult;
59# use parquet::arrow::ArrowWriter;
60# use parquet::errors::ParquetError;
61# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
62#
63# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
64# let file_bytes = {
65#   let mut buffer = vec![0];
66#   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
67#   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
68#   writer.write(&batch).unwrap();
69#   writer.close().unwrap();
70#   Bytes::from(buffer)
71# };
72# // mimic IO by returning a function that returns the bytes for a given range
73# let get_range = |range: &Range<u64>| -> Bytes {
74#    let start = range.start as usize;
75#     let end = range.end as usize;
76#    file_bytes.slice(start..end)
77# };
78#
79# let file_len = file_bytes.len() as u64;
80// The `ParquetMetaDataPushDecoder` needs to know the file length.
81let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
82// try to decode the metadata. If more data is needed, the decoder will tell you what ranges
83loop {
84    match decoder.try_decode() {
85       Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
86       Ok(DecodeResult::NeedsData(ranges)) => {
87          // The decoder needs more data
88          //
89          // In this example, we call a function that returns the bytes for each given range.
90          // In a real application, you would likely read the data from a file or network.
91          let data = ranges.iter().map(|range| get_range(range)).collect();
92          // Push the data into the decoder and try to decode again on the next iteration.
93          decoder.push_ranges(ranges, data).unwrap();
94       }
95       Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
96       Err(e) => return Err(e),
97    }
98}
99# }
100```
101"##
102)]
103///
104/// # Example with "prefetching"
105///
106/// By default, the [`ParquetMetaDataPushDecoder`] will request only the exact byte
107/// ranges it needs. This minimizes the number of bytes read, however it
108/// requires at least two IO operations to read the metadata - one to read the
109/// footer and then one to read the metadata.
110///
111/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
112/// IO operations are required to read the metadata, as the page index is
113/// not part of the normal metadata footer.
114///
115/// To reduce the number of IO operations in systems with high per operation
116/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
117/// the data into the decoder before calling [`Self::try_decode`]. If you do
118/// not push enough bytes, the decoder will return the ranges that are still
119/// needed.
120///
121/// This approach can also be used when you have the entire file already in memory
122/// for other reasons.
123#[cfg_attr(
124    feature = "arrow",
125    doc = r##"
126```rust
127# use std::ops::Range;
128# use bytes::Bytes;
129# use arrow_array::record_batch;
130# use parquet::DecodeResult;
131# use parquet::arrow::ArrowWriter;
132# use parquet::errors::ParquetError;
133# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
134#
135# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
136# let file_bytes = {
137#   let mut buffer = vec![0];
138#   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
139#   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
140#   writer.write(&batch).unwrap();
141#   writer.close().unwrap();
142#   Bytes::from(buffer)
143# };
144#
145let file_len = file_bytes.len() as u64;
146// For this example, we "prefetch" all the bytes which we have in memory,
147// but in a real application, you would likely read a chunk from the end
148// for example 1MB.
149let prefetched_bytes = file_bytes.clone();
150let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
151// push the prefetched bytes into the decoder
152decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
153// The decoder will now be able to decode the metadata. Note in a real application,
154// unless you can guarantee that the pushed data is enough to decode the metadata,
155// you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
156// as shown in  the previous example
157    match decoder.try_decode() {
158        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
159        other => { panic!("expected DecodeResult::Data, got: {other:?}") }
160    }
161# }
162```
163"##
164)]
165///
166/// # Example using [`AsyncRead`]
167///
168/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can
169/// provide byte ranges, including async IO sources. However, it does not
170/// implement async IO itself. To use async IO, you simply write an async
171/// wrapper around it that reads the required byte ranges and pushes them into the
172/// decoder.
173#[cfg_attr(
174    feature = "arrow",
175    doc = r##"
176```rust
177# use std::ops::Range;
178# use bytes::Bytes;
179use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
180# use arrow_array::record_batch;
181# use parquet::DecodeResult;
182# use parquet::arrow::ArrowWriter;
183# use parquet::errors::ParquetError;
184# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
185#
186// This function decodes Parquet Metadata from anything that implements
187// [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
188async fn decode_metadata(
189  file_len: u64,
190  mut async_source: impl AsyncRead + AsyncSeek + Unpin
191) -> Result<ParquetMetaData, ParquetError> {
192  // We need a ParquetMetaDataPushDecoder to decode the metadata.
193  let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
194  loop {
195    match decoder.try_decode() {
196       Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
197       Ok(DecodeResult::NeedsData(ranges)) => {
198          // The decoder needs more data
199          //
200          // In this example we use the AsyncRead and AsyncSeek traits to read the
201          // required ranges from the async source.
202          let mut data = Vec::with_capacity(ranges.len());
203          for range in &ranges {
204            let mut buffer = vec![0; (range.end - range.start) as usize];
205            async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
206            async_source.read_exact(&mut buffer).await?;
207            data.push(Bytes::from(buffer));
208          }
209          // Push the data into the decoder and try to decode again on the next iteration.
210          decoder.push_ranges(ranges, data).unwrap();
211       }
212       Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
213       Err(e) => return Err(e),
214    }
215  }
216}
217```
218"##
219)]
220/// [`AsyncRead`]: tokio::io::AsyncRead
221#[derive(Debug)]
222pub struct ParquetMetaDataPushDecoder {
223    /// Decoding state
224    state: DecodeState,
225    /// policy for loading ColumnIndex (part of the PageIndex)
226    column_index_policy: PageIndexPolicy,
227    /// policy for loading OffsetIndex (part of the PageIndex)
228    offset_index_policy: PageIndexPolicy,
229    /// Underlying buffers
230    buffers: crate::util::push_buffers::PushBuffers,
231    /// Encryption API
232    metadata_parser: MetadataParser,
233}
234
235impl ParquetMetaDataPushDecoder {
236    /// Create a new `ParquetMetaDataPushDecoder` with the given file length.
237    ///
238    /// By default, this will read page indexes and column indexes. See
239    /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail.
240    ///
241    /// See examples on [`ParquetMetaDataPushDecoder`].
242    pub fn try_new(file_len: u64) -> Result<Self> {
243        if file_len < 8 {
244            return Err(ParquetError::General(format!(
245                "Parquet files are at least 8 bytes long, but file length is {file_len}"
246            )));
247        };
248
249        Ok(Self {
250            state: DecodeState::ReadingFooter,
251            column_index_policy: PageIndexPolicy::Optional,
252            offset_index_policy: PageIndexPolicy::Optional,
253            buffers: crate::util::push_buffers::PushBuffers::new(file_len),
254            metadata_parser: MetadataParser::new(),
255        })
256    }
257
258    /// Begin decoding from the given footer tail.
259    pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
260        let mut new_self = Self::try_new(file_len)?;
261        new_self.state = DecodeState::ReadingMetadata(footer_tail);
262        Ok(new_self)
263    }
264
265    /// Create a decoder with the given `ParquetMetaData` already known.
266    ///
267    /// This can be used to parse and populate the page index structures
268    /// after the metadata has already been decoded.
269    pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
270        let mut new_self = Self::try_new(file_len)?;
271        new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
272        Ok(new_self)
273    }
274
275    /// Enable or disable reading the page index structures described in
276    /// "[Parquet page index] Layout to Support Page Skipping".
277    ///
278    /// Defaults to [`PageIndexPolicy::Optional`]
279    ///
280    /// This requires
281    /// 1. The Parquet file to have been written with page indexes
282    /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer)
283    ///
284    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
285    pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
286        self.column_index_policy = page_index_policy;
287        self.offset_index_policy = page_index_policy;
288        self
289    }
290
291    /// Set the policy for reading the ColumnIndex (part of the PageIndex)
292    pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
293        self.column_index_policy = column_index_policy;
294        self
295    }
296
297    /// Set the policy for reading the OffsetIndex (part of the PageIndex)
298    pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
299        self.offset_index_policy = offset_index_policy;
300        self
301    }
302
303    /// Set the options to use when decoding the Parquet metadata.
304    pub fn with_metadata_options(mut self, options: Option<Arc<ParquetMetaDataOptions>>) -> Self {
305        self.metadata_parser = self.metadata_parser.with_metadata_options(options);
306        self
307    }
308
309    #[cfg(feature = "encryption")]
310    /// Provide decryption properties for decoding encrypted Parquet files
311    pub fn with_file_decryption_properties(
312        mut self,
313        file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
314    ) -> Self {
315        self.metadata_parser = self
316            .metadata_parser
317            .with_file_decryption_properties(file_decryption_properties);
318        self
319    }
320
321    /// Push the data into the decoder's buffer.
322    ///
323    /// The decoder does not immediately attempt to decode the metadata
324    /// after pushing data. Instead, it accumulates the pushed data until you
325    /// call [`Self::try_decode`].
326    ///
327    /// # Determining required data:
328    ///
329    /// To determine what ranges are required to decode the metadata, you can
330    /// either:
331    ///
332    /// 1. Call [`Self::try_decode`] first to get the exact ranges required (see
333    ///    example on [`Self`])
334    ///
335    /// 2. Speculatively push any data that you have available, which may
336    ///    include more than the footer data or requested bytes.
337    ///
338    /// Speculatively pushing data can be used when  "prefetching" data. See
339    /// example on [`Self`]
340    pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
341        if matches!(&self.state, DecodeState::Finished) {
342            return Err(general_err!(
343                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
344            ));
345        }
346        self.buffers.push_ranges(ranges, buffers);
347        Ok(())
348    }
349
350    /// Pushes a single range of data into the decoder's buffer.
351    pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
352        if matches!(&self.state, DecodeState::Finished) {
353            return Err(general_err!(
354                "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
355            ));
356        }
357        self.buffers.push_range(range, buffer);
358        Ok(())
359    }
360
361    /// Clear any staged byte ranges currently buffered for future decode work.
362    pub fn clear_all_ranges(&mut self) {
363        self.buffers.clear_all_ranges();
364    }
365
366    /// Try to decode the metadata from the pushed data, returning the
367    /// decoded metadata or an error if not enough data is available.
368    pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
369        let file_len = self.buffers.file_len();
370        let footer_len = FOOTER_SIZE as u64;
371        loop {
372            match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
373                DecodeState::ReadingFooter => {
374                    // need to have the last 8 bytes of the file to decode the metadata
375                    let footer_start = file_len.saturating_sub(footer_len);
376                    let footer_range = footer_start..file_len;
377
378                    if !self.buffers.has_range(&footer_range) {
379                        self.state = DecodeState::ReadingFooter;
380                        return Ok(needs_range(footer_range));
381                    }
382                    let footer_bytes = self.get_bytes(&footer_range)?;
383                    let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
384
385                    self.state = DecodeState::ReadingMetadata(footer_tail);
386                    continue;
387                }
388
389                DecodeState::ReadingMetadata(footer_tail) => {
390                    let metadata_len: u64 = footer_tail.metadata_length() as u64;
391                    let metadata_start = file_len - footer_len - metadata_len;
392                    let metadata_end = metadata_start + metadata_len;
393                    let metadata_range = metadata_start..metadata_end;
394
395                    if !self.buffers.has_range(&metadata_range) {
396                        self.state = DecodeState::ReadingMetadata(footer_tail);
397                        return Ok(needs_range(metadata_range));
398                    }
399
400                    let metadata = self.metadata_parser.decode_metadata(
401                        &self.get_bytes(&metadata_range)?,
402                        footer_tail.is_encrypted_footer(),
403                    )?;
404                    // Note: ReadingPageIndex first checks if page indexes are needed
405                    // and is a no-op if not
406                    self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
407                    continue;
408                }
409
410                DecodeState::ReadingPageIndex(mut metadata) => {
411                    // First determine if any page indexes are needed based on
412                    // the specified policies
413                    let range = range_for_page_index(
414                        &metadata,
415                        self.column_index_policy,
416                        self.offset_index_policy,
417                    );
418
419                    let Some(page_index_range) = range else {
420                        self.state = DecodeState::Finished;
421                        return Ok(DecodeResult::Data(*metadata));
422                    };
423
424                    if !self.buffers.has_range(&page_index_range) {
425                        self.state = DecodeState::ReadingPageIndex(metadata);
426                        return Ok(needs_range(page_index_range));
427                    }
428
429                    let buffer = self.get_bytes(&page_index_range)?;
430                    let offset = page_index_range.start;
431                    parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
432                    parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
433                    self.state = DecodeState::Finished;
434                    return Ok(DecodeResult::Data(*metadata));
435                }
436
437                DecodeState::Finished => return Ok(DecodeResult::Finished),
438                DecodeState::Intermediate => {
439                    return Err(general_err!(
440                        "ParquetMetaDataPushDecoder: internal error, invalid state"
441                    ));
442                }
443            }
444        }
445    }
446
447    /// Returns the bytes for the given range from the internal buffer
448    fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
449        let start = range.start;
450        let raw_len = range.end - range.start;
451        let len: usize = raw_len.try_into().map_err(|_| {
452            ParquetError::General(format!(
453                "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
454            ))
455        })?;
456        self.buffers.get_bytes(start, len)
457    }
458}
459
460/// returns a DecodeResults that describes needing the given range
461fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
462    DecodeResult::NeedsData(vec![range])
463}
464
465/// Decoding state machine
466#[derive(Debug)]
467enum DecodeState {
468    /// Reading the last 8 bytes of the file
469    ReadingFooter,
470    /// Reading the metadata thrift structure
471    ReadingMetadata(FooterTail),
472    // Actively reading the page index
473    ReadingPageIndex(Box<ParquetMetaData>),
474    // Decoding is complete
475    Finished,
476    /// State left during the `try_decode` method so something valid is present.
477    /// This state should never be observed.
478    Intermediate,
479}
480
481/// Returns the byte range needed to read the offset/page indexes, based on the
482/// specified policies
483///
484/// Returns None if no page indexes are needed
485pub fn range_for_page_index(
486    metadata: &ParquetMetaData,
487    column_index_policy: PageIndexPolicy,
488    offset_index_policy: PageIndexPolicy,
489) -> Option<Range<u64>> {
490    let mut range = None;
491    for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
492        if column_index_policy != PageIndexPolicy::Skip {
493            range = acc_range(range, c.column_index_range());
494        }
495        if offset_index_policy != PageIndexPolicy::Skip {
496            range = acc_range(range, c.offset_index_range());
497        }
498    }
499    range
500}
501
502// These tests use the arrow writer to create a parquet file in memory
503// so they need the arrow feature and the test feature
504#[cfg(all(test, feature = "arrow"))]
505mod tests {
506    use super::*;
507    use crate::arrow::ArrowWriter;
508    use crate::file::properties::WriterProperties;
509    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
510    use bytes::Bytes;
511    use std::fmt::Debug;
512    use std::ops::Range;
513    use std::sync::{Arc, LazyLock};
514
515    /// It is possible to decode the metadata from the entire file at once before being asked
516    #[test]
517    fn test_metadata_decoder_all_data() {
518        let file_len = test_file_len();
519        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
520        // Push the entire file data into the metadata decoder
521        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
522
523        // should be able to decode the metadata without needing more data
524        let metadata = expect_data(metadata_decoder.try_decode());
525
526        assert_eq!(metadata.num_row_groups(), 2);
527        assert_eq!(metadata.row_group(0).num_rows(), 200);
528        assert_eq!(metadata.row_group(1).num_rows(), 200);
529        assert!(metadata.column_index().is_some());
530        assert!(metadata.offset_index().is_some());
531    }
532
533    /// It is possible to feed some, but not all, of the footer into the metadata decoder
534    /// before asked. This avoids multiple IO requests
535    #[test]
536    fn test_metadata_decoder_prefetch_success() {
537        let file_len = test_file_len();
538        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
539        // simulate pre-fetching the last 2k bytes of the file without asking the decoder
540        let prefetch_range = (file_len - 2 * 1024)..file_len;
541        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
542
543        // expect the decoder has enough data to decode the metadata
544        let metadata = expect_data(metadata_decoder.try_decode());
545        expect_finished(metadata_decoder.try_decode());
546        assert_eq!(metadata.num_row_groups(), 2);
547        assert_eq!(metadata.row_group(0).num_rows(), 200);
548        assert_eq!(metadata.row_group(1).num_rows(), 200);
549        assert!(metadata.column_index().is_some());
550        assert!(metadata.offset_index().is_some());
551    }
552
553    /// It is possible to pre-fetch some, but not all, of the necessary data
554    /// data
555    #[test]
556    fn test_metadata_decoder_prefetch_retry() {
557        let file_len = test_file_len();
558        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
559        // simulate pre-fetching the last 1500 bytes of the file.
560        // this is enough to read the footer thrift metadata, but not the offset indexes
561        let prefetch_range = (file_len - 1500)..file_len;
562        push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
563
564        // expect another request is needed to read the offset indexes (note
565        // try_decode only returns NeedsData once, whereas without any prefetching it would
566        // return NeedsData three times)
567        let ranges = expect_needs_data(metadata_decoder.try_decode());
568        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
569
570        // expect the decoder has enough data to decode the metadata
571        let metadata = expect_data(metadata_decoder.try_decode());
572        expect_finished(metadata_decoder.try_decode());
573
574        assert_eq!(metadata.num_row_groups(), 2);
575        assert_eq!(metadata.row_group(0).num_rows(), 200);
576        assert_eq!(metadata.row_group(1).num_rows(), 200);
577        assert!(metadata.column_index().is_some());
578        assert!(metadata.offset_index().is_some());
579    }
580
581    #[test]
582    fn test_metadata_decoder_clear_all_ranges() {
583        let file_len = test_file_len();
584        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
585
586        metadata_decoder
587            .push_range(test_file_range(), TEST_FILE_DATA.clone())
588            .unwrap();
589        assert_eq!(metadata_decoder.buffers.buffered_bytes(), test_file_len());
590
591        metadata_decoder.clear_all_ranges();
592        assert_eq!(metadata_decoder.buffers.buffered_bytes(), 0);
593
594        let ranges = expect_needs_data(metadata_decoder.try_decode());
595        assert_eq!(ranges, vec![test_file_len() - 8..test_file_len()]);
596    }
597
598    /// Decode the metadata incrementally, simulating a scenario where exactly the data needed
599    /// is read in each step
600    #[test]
601    fn test_metadata_decoder_incremental() {
602        let file_len = TEST_FILE_DATA.len() as u64;
603        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
604        let ranges = expect_needs_data(metadata_decoder.try_decode());
605        assert_eq!(ranges.len(), 1);
606        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
607        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
608
609        // expect the first request to read the footer
610        let ranges = expect_needs_data(metadata_decoder.try_decode());
611        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
612
613        // expect the second request to read the offset indexes
614        let ranges = expect_needs_data(metadata_decoder.try_decode());
615        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
616
617        // expect the third request to read the actual data
618        let metadata = expect_data(metadata_decoder.try_decode());
619        expect_finished(metadata_decoder.try_decode());
620
621        assert_eq!(metadata.num_row_groups(), 2);
622        assert_eq!(metadata.row_group(0).num_rows(), 200);
623        assert_eq!(metadata.row_group(1).num_rows(), 200);
624        assert!(metadata.column_index().is_some());
625        assert!(metadata.offset_index().is_some());
626    }
627
628    /// Decode the metadata incrementally, but without reading the page indexes
629    /// (so only two requests)
630    #[test]
631    fn test_metadata_decoder_incremental_no_page_index() {
632        let file_len = TEST_FILE_DATA.len() as u64;
633        let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
634            .unwrap()
635            .with_page_index_policy(PageIndexPolicy::Skip);
636        let ranges = expect_needs_data(metadata_decoder.try_decode());
637        assert_eq!(ranges.len(), 1);
638        assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
639        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
640
641        // expect the first request to read the footer
642        let ranges = expect_needs_data(metadata_decoder.try_decode());
643        push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
644
645        // expect NO second request to read the offset indexes, should just cough up the metadata
646        let metadata = expect_data(metadata_decoder.try_decode());
647        expect_finished(metadata_decoder.try_decode());
648
649        assert_eq!(metadata.num_row_groups(), 2);
650        assert_eq!(metadata.row_group(0).num_rows(), 200);
651        assert_eq!(metadata.row_group(1).num_rows(), 200);
652        assert!(metadata.column_index().is_none()); // of course, we did not read the column index
653        assert!(metadata.offset_index().is_none()); // or the offset index
654    }
655
656    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
657        // Input batch has 400 rows, with 3 columns: "a", "b", "c"
658        // Note c is a different types (so the data page sizes will be different)
659        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
660        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
661        let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
662            if i % 2 == 0 {
663                format!("string_{i}")
664            } else {
665                format!("A string larger than 12 bytes and thus not inlined {i}")
666            }
667        })));
668
669        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
670    });
671
672    /// Create a parquet file in memory for testing. See [`test_file_range`] for details.
673    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
674        let input_batch = &TEST_BATCH;
675        let mut output = Vec::new();
676
677        let writer_options = WriterProperties::builder()
678            .set_max_row_group_row_count(Some(200))
679            .set_data_page_row_count_limit(100)
680            .build();
681        let mut writer =
682            ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
683
684        // since the limits are only enforced on batch boundaries, write the input
685        // batch in chunks of 50
686        let mut row_remain = input_batch.num_rows();
687        while row_remain > 0 {
688            let chunk_size = row_remain.min(50);
689            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
690            writer.write(&chunk).unwrap();
691            row_remain -= chunk_size;
692        }
693        writer.close().unwrap();
694        Bytes::from(output)
695    });
696
697    /// Return the length of the test file in bytes
698    fn test_file_len() -> u64 {
699        TEST_FILE_DATA.len() as u64
700    }
701
702    /// Return the range of the entire test file
703    fn test_file_range() -> Range<u64> {
704        0..test_file_len()
705    }
706
707    /// Return a slice of the test file data from the given range
708    pub fn test_file_slice(range: Range<u64>) -> Bytes {
709        let start: usize = range.start.try_into().unwrap();
710        let end: usize = range.end.try_into().unwrap();
711        TEST_FILE_DATA.slice(start..end)
712    }
713
714    /// Push the given ranges to the metadata decoder, simulating reading from a file
715    fn push_ranges_to_metadata_decoder(
716        metadata_decoder: &mut ParquetMetaDataPushDecoder,
717        ranges: Vec<Range<u64>>,
718    ) {
719        let data = ranges
720            .iter()
721            .map(|range| test_file_slice(range.clone()))
722            .collect::<Vec<_>>();
723        metadata_decoder.push_ranges(ranges, data).unwrap();
724    }
725
726    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
727    fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
728        match result.expect("Expected Ok(DecodeResult::Data(T))") {
729            DecodeResult::Data(data) => data,
730            result => panic!("Expected DecodeResult::Data, got {result:?}"),
731        }
732    }
733
734    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
735    fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
736        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
737            DecodeResult::NeedsData(ranges) => ranges,
738            result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
739        }
740    }
741
742    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
743        match result.expect("Expected Ok(DecodeResult::Finished)") {
744            DecodeResult::Finished => {}
745            result => panic!("Expected DecodeResult::Finished, got {result:?}"),
746        }
747    }
748}