parquet/file/metadata/push_decoder.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::metadata::parser::{parse_column_index, parse_offset_index, MetadataParser};
22use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData};
23use crate::file::page_index::index_reader::acc_range;
24use crate::file::reader::ChunkReader;
25use crate::file::FOOTER_SIZE;
26use crate::DecodeResult;
27use bytes::Bytes;
28use std::ops::Range;
29
30/// A push decoder for [`ParquetMetaData`].
31///
32/// This structure implements a push API for decoding Parquet metadata, which
33/// decouples IO from the metadata decoding logic (sometimes referred to as
34/// [Sans-IO]).
35///
36/// See [`ParquetMetaDataReader`] for a pull-based API that incorporates IO and
37/// is simpler to use for basic use cases. This decoder is best for customizing
38/// your IO operations to minimize bytes read, prefetch data, or use async IO.
39///
40/// [Sans-IO]: https://sans-io.readthedocs.io
41/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
42///
43/// # Example
44///
45/// The most basic usage is to feed the decoder with the necessary byte ranges
46/// as requested as shown below. This minimizes the number of bytes read, but
47/// requires the most IO operations - one to read the footer and then one
48/// to read the metadata, and possibly more if page indexes are requested.
49///
50/// ```rust
51/// # use std::ops::Range;
52/// # use bytes::Bytes;
53/// # use arrow_array::record_batch;
54/// # use parquet::DecodeResult;
55/// # use parquet::arrow::ArrowWriter;
56/// # use parquet::errors::ParquetError;
57/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
58/// #
59/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
60/// # let file_bytes = {
61/// # let mut buffer = vec![0];
62/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
63/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
64/// # writer.write(&batch).unwrap();
65/// # writer.close().unwrap();
66/// # Bytes::from(buffer)
67/// # };
68/// # // mimic IO by returning a function that returns the bytes for a given range
69/// # let get_range = |range: &Range<u64>| -> Bytes {
70/// # let start = range.start as usize;
71/// # let end = range.end as usize;
72/// # file_bytes.slice(start..end)
73/// # };
74/// #
75/// # let file_len = file_bytes.len() as u64;
76/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
77/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
78/// // try to decode the metadata. If more data is needed, the decoder will tell you what ranges
79/// loop {
80/// match decoder.try_decode() {
81/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
82/// Ok(DecodeResult::NeedsData(ranges)) => {
83/// // The decoder needs more data
84/// //
85/// // In this example, we call a function that returns the bytes for each given range.
86/// // In a real application, you would likely read the data from a file or network.
87/// let data = ranges.iter().map(|range| get_range(range)).collect();
88/// // Push the data into the decoder and try to decode again on the next iteration.
89/// decoder.push_ranges(ranges, data).unwrap();
90/// }
91/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
92/// Err(e) => return Err(e),
93/// }
94/// }
95/// # }
96/// ```
97///
98/// # Example with "prefetching"
99///
100/// By default, the [`ParquetMetaDataPushDecoder`] will request only the exact byte
101/// ranges it needs. This minimizes the number of bytes read, however it
102/// requires at least two IO operations to read the metadata - one to read the
103/// footer and then one to read the metadata.
104///
105/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
106/// IO operations are required to read the metadata, as the page index is
107/// not part of the normal metadata footer.
108///
109/// To reduce the number of IO operations in systems with high per operation
110/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
111/// the data into the decoder before calling [`Self::try_decode`]. If you do
112/// not push enough bytes, the decoder will return the ranges that are still
113/// needed.
114///
115/// This approach can also be used when you have the entire file already in memory
116/// for other reasons.
117///
118/// ```rust
119/// # use std::ops::Range;
120/// # use bytes::Bytes;
121/// # use arrow_array::record_batch;
122/// # use parquet::DecodeResult;
123/// # use parquet::arrow::ArrowWriter;
124/// # use parquet::errors::ParquetError;
125/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
126/// #
127/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
128/// # let file_bytes = {
129/// # let mut buffer = vec![0];
130/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
131/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
132/// # writer.write(&batch).unwrap();
133/// # writer.close().unwrap();
134/// # Bytes::from(buffer)
135/// # };
136/// #
137/// let file_len = file_bytes.len() as u64;
138/// // For this example, we "prefetch" all the bytes which we have in memory,
139/// // but in a real application, you would likely read a chunk from the end
140/// // for example 1MB.
141/// let prefetched_bytes = file_bytes.clone();
142/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
143/// // push the prefetched bytes into the decoder
144/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
145/// // The decoder will now be able to decode the metadata. Note in a real application,
146/// // unless you can guarantee that the pushed data is enough to decode the metadata,
147/// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
148/// // as shown in the previous example
149/// match decoder.try_decode() {
150/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
151/// other => { panic!("expected DecodeResult::Data, got: {other:?}") }
152/// }
153/// # }
154/// ```
155///
156/// # Example using [`AsyncRead`]
157///
158/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can
159/// provide byte ranges, including async IO sources. However, it does not
160/// implement async IO itself. To use async IO, you simply write an async
161/// wrapper around it that reads the required byte ranges and pushes them into the
162/// decoder.
163///
164/// ```rust
165/// # use std::ops::Range;
166/// # use bytes::Bytes;
167/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
168/// # use arrow_array::record_batch;
169/// # use parquet::DecodeResult;
170/// # use parquet::arrow::ArrowWriter;
171/// # use parquet::errors::ParquetError;
172/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
173/// #
174/// // This function decodes Parquet Metadata from anything that implements
175/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
176/// async fn decode_metadata(
177/// file_len: u64,
178/// mut async_source: impl AsyncRead + AsyncSeek + Unpin
179/// ) -> Result<ParquetMetaData, ParquetError> {
180/// // We need a ParquetMetaDataPushDecoder to decode the metadata.
181/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
182/// loop {
183/// match decoder.try_decode() {
184/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
185/// Ok(DecodeResult::NeedsData(ranges)) => {
186/// // The decoder needs more data
187/// //
188/// // In this example we use the AsyncRead and AsyncSeek traits to read the
189/// // required ranges from the async source.
190/// let mut data = Vec::with_capacity(ranges.len());
191/// for range in &ranges {
192/// let mut buffer = vec![0; (range.end - range.start) as usize];
193/// async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
194/// async_source.read_exact(&mut buffer).await?;
195/// data.push(Bytes::from(buffer));
196/// }
197/// // Push the data into the decoder and try to decode again on the next iteration.
198/// decoder.push_ranges(ranges, data).unwrap();
199/// }
200/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
201/// Err(e) => return Err(e),
202/// }
203/// }
204/// }
205/// ```
206/// [`AsyncRead`]: tokio::io::AsyncRead
207#[derive(Debug)]
208pub struct ParquetMetaDataPushDecoder {
209 /// Decoding state
210 state: DecodeState,
211 /// policy for loading ColumnIndex (part of the PageIndex)
212 column_index_policy: PageIndexPolicy,
213 /// policy for loading OffsetIndex (part of the PageIndex)
214 offset_index_policy: PageIndexPolicy,
215 /// Underlying buffers
216 buffers: crate::util::push_buffers::PushBuffers,
217 /// Encryption API
218 metadata_parser: MetadataParser,
219}
220
221impl ParquetMetaDataPushDecoder {
222 /// Create a new `ParquetMetaDataPushDecoder` with the given file length.
223 ///
224 /// By default, this will read page indexes and column indexes. See
225 /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail.
226 ///
227 /// See examples on [`ParquetMetaDataPushDecoder`].
228 pub fn try_new(file_len: u64) -> Result<Self> {
229 if file_len < 8 {
230 return Err(ParquetError::General(format!(
231 "Parquet files are at least 8 bytes long, but file length is {file_len}"
232 )));
233 };
234
235 Ok(Self {
236 state: DecodeState::ReadingFooter,
237 column_index_policy: PageIndexPolicy::Optional,
238 offset_index_policy: PageIndexPolicy::Optional,
239 buffers: crate::util::push_buffers::PushBuffers::new(file_len),
240 metadata_parser: MetadataParser::new(),
241 })
242 }
243
244 /// Begin decoding from the given footer tail.
245 pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
246 let mut new_self = Self::try_new(file_len)?;
247 new_self.state = DecodeState::ReadingMetadata(footer_tail);
248 Ok(new_self)
249 }
250
251 /// Create a decoder with the given `ParquetMetaData` already known.
252 ///
253 /// This can be used to parse and populate the page index structures
254 /// after the metadata has already been decoded.
255 pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
256 let mut new_self = Self::try_new(file_len)?;
257 new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
258 Ok(new_self)
259 }
260
261 /// Enable or disable reading the page index structures described in
262 /// "[Parquet page index] Layout to Support Page Skipping".
263 ///
264 /// Defaults to [`PageIndexPolicy::Optional`]
265 ///
266 /// This requires
267 /// 1. The Parquet file to have been written with page indexes
268 /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer)
269 ///
270 /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
271 pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
272 self.column_index_policy = page_index_policy;
273 self.offset_index_policy = page_index_policy;
274 self
275 }
276
277 /// Set the policy for reading the ColumnIndex (part of the PageIndex)
278 pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
279 self.column_index_policy = column_index_policy;
280 self
281 }
282
283 /// Set the policy for reading the OffsetIndex (part of the PageIndex)
284 pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
285 self.offset_index_policy = offset_index_policy;
286 self
287 }
288
289 #[cfg(feature = "encryption")]
290 /// Provide decryption properties for decoding encrypted Parquet files
291 pub(crate) fn with_file_decryption_properties(
292 mut self,
293 file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
294 ) -> Self {
295 self.metadata_parser = self
296 .metadata_parser
297 .with_file_decryption_properties(file_decryption_properties);
298 self
299 }
300
301 /// Push the data into the decoder's buffer.
302 ///
303 /// The decoder does not immediately attempt to decode the metadata
304 /// after pushing data. Instead, it accumulates the pushed data until you
305 /// call [`Self::try_decode`].
306 ///
307 /// # Determining required data:
308 ///
309 /// To determine what ranges are required to decode the metadata, you can
310 /// either:
311 ///
312 /// 1. Call [`Self::try_decode`] first to get the exact ranges required (see
313 /// example on [`Self`])
314 ///
315 /// 2. Speculatively push any data that you have available, which may
316 /// include more than the footer data or requested bytes.
317 ///
318 /// Speculatively pushing data can be used when "prefetching" data. See
319 /// example on [`Self`]
320 pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
321 if matches!(&self.state, DecodeState::Finished) {
322 return Err(general_err!(
323 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
324 ));
325 }
326 self.buffers.push_ranges(ranges, buffers);
327 Ok(())
328 }
329
330 /// Pushes a single range of data into the decoder's buffer.
331 pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
332 if matches!(&self.state, DecodeState::Finished) {
333 return Err(general_err!(
334 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
335 ));
336 }
337 self.buffers.push_range(range, buffer);
338 Ok(())
339 }
340
341 /// Try to decode the metadata from the pushed data, returning the
342 /// decoded metadata or an error if not enough data is available.
343 pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
344 let file_len = self.buffers.file_len();
345 let footer_len = FOOTER_SIZE as u64;
346 loop {
347 match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
348 DecodeState::ReadingFooter => {
349 // need to have the last 8 bytes of the file to decode the metadata
350 let footer_start = file_len.saturating_sub(footer_len);
351 let footer_range = footer_start..file_len;
352
353 if !self.buffers.has_range(&footer_range) {
354 self.state = DecodeState::ReadingFooter;
355 return Ok(needs_range(footer_range));
356 }
357 let footer_bytes = self.get_bytes(&footer_range)?;
358 let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
359
360 self.state = DecodeState::ReadingMetadata(footer_tail);
361 continue;
362 }
363
364 DecodeState::ReadingMetadata(footer_tail) => {
365 let metadata_len: u64 = footer_tail.metadata_length() as u64;
366 let metadata_start = file_len - footer_len - metadata_len;
367 let metadata_end = metadata_start + metadata_len;
368 let metadata_range = metadata_start..metadata_end;
369
370 if !self.buffers.has_range(&metadata_range) {
371 self.state = DecodeState::ReadingMetadata(footer_tail);
372 return Ok(needs_range(metadata_range));
373 }
374
375 let metadata = self.metadata_parser.decode_metadata(
376 &self.get_bytes(&metadata_range)?,
377 footer_tail.is_encrypted_footer(),
378 )?;
379 // Note: ReadingPageIndex first checks if page indexes are needed
380 // and is a no-op if not
381 self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
382 continue;
383 }
384
385 DecodeState::ReadingPageIndex(mut metadata) => {
386 // First determine if any page indexes are needed based on
387 // the specified policies
388 let range = range_for_page_index(
389 &metadata,
390 self.column_index_policy,
391 self.offset_index_policy,
392 );
393
394 let Some(page_index_range) = range else {
395 self.state = DecodeState::Finished;
396 return Ok(DecodeResult::Data(*metadata));
397 };
398
399 if !self.buffers.has_range(&page_index_range) {
400 self.state = DecodeState::ReadingPageIndex(metadata);
401 return Ok(needs_range(page_index_range));
402 }
403
404 let buffer = self.get_bytes(&page_index_range)?;
405 let offset = page_index_range.start;
406 parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
407 parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
408 self.state = DecodeState::Finished;
409 return Ok(DecodeResult::Data(*metadata));
410 }
411
412 DecodeState::Finished => return Ok(DecodeResult::Finished),
413 DecodeState::Intermediate => {
414 return Err(general_err!(
415 "ParquetMetaDataPushDecoder: internal error, invalid state"
416 ));
417 }
418 }
419 }
420 }
421
422 /// Returns the bytes for the given range from the internal buffer
423 fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
424 let start = range.start;
425 let raw_len = range.end - range.start;
426 let len: usize = raw_len.try_into().map_err(|_| {
427 ParquetError::General(format!(
428 "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
429 ))
430 })?;
431 self.buffers.get_bytes(start, len)
432 }
433}
434
435/// returns a DecodeResults that describes needing the given range
436fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
437 DecodeResult::NeedsData(vec![range])
438}
439
440/// Decoding state machine
441#[derive(Debug)]
442enum DecodeState {
443 /// Reading the last 8 bytes of the file
444 ReadingFooter,
445 /// Reading the metadata thrift structure
446 ReadingMetadata(FooterTail),
447 // Actively reading the page index
448 ReadingPageIndex(Box<ParquetMetaData>),
449 // Decoding is complete
450 Finished,
451 /// State left during the `try_decode` method so something valid is present.
452 /// This state should never be observed.
453 Intermediate,
454}
455
456/// Returns the byte range needed to read the offset/page indexes, based on the
457/// specified policies
458///
459/// Returns None if no page indexes are needed
460pub fn range_for_page_index(
461 metadata: &ParquetMetaData,
462 column_index_policy: PageIndexPolicy,
463 offset_index_policy: PageIndexPolicy,
464) -> Option<Range<u64>> {
465 let mut range = None;
466 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
467 if column_index_policy != PageIndexPolicy::Skip {
468 range = acc_range(range, c.column_index_range());
469 }
470 if offset_index_policy != PageIndexPolicy::Skip {
471 range = acc_range(range, c.offset_index_range());
472 }
473 }
474 range
475}
476
477// These tests use the arrow writer to create a parquet file in memory
478// so they need the arrow feature and the test feature
479#[cfg(all(test, feature = "arrow"))]
480mod tests {
481 use super::*;
482 use crate::arrow::ArrowWriter;
483 use crate::file::properties::WriterProperties;
484 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
485 use bytes::Bytes;
486 use std::fmt::Debug;
487 use std::ops::Range;
488 use std::sync::{Arc, LazyLock};
489
490 /// It is possible to decode the metadata from the entire file at once before being asked
491 #[test]
492 fn test_metadata_decoder_all_data() {
493 let file_len = test_file_len();
494 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
495 // Push the entire file data into the metadata decoder
496 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
497
498 // should be able to decode the metadata without needing more data
499 let metadata = expect_data(metadata_decoder.try_decode());
500
501 assert_eq!(metadata.num_row_groups(), 2);
502 assert_eq!(metadata.row_group(0).num_rows(), 200);
503 assert_eq!(metadata.row_group(1).num_rows(), 200);
504 assert!(metadata.column_index().is_some());
505 assert!(metadata.offset_index().is_some());
506 }
507
508 /// It is possible to feed some, but not all, of the footer into the metadata decoder
509 /// before asked. This avoids multiple IO requests
510 #[test]
511 fn test_metadata_decoder_prefetch_success() {
512 let file_len = test_file_len();
513 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
514 // simulate pre-fetching the last 2k bytes of the file without asking the decoder
515 let prefetch_range = (file_len - 2 * 1024)..file_len;
516 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
517
518 // expect the decoder has enough data to decode the metadata
519 let metadata = expect_data(metadata_decoder.try_decode());
520 expect_finished(metadata_decoder.try_decode());
521 assert_eq!(metadata.num_row_groups(), 2);
522 assert_eq!(metadata.row_group(0).num_rows(), 200);
523 assert_eq!(metadata.row_group(1).num_rows(), 200);
524 assert!(metadata.column_index().is_some());
525 assert!(metadata.offset_index().is_some());
526 }
527
528 /// It is possible to pre-fetch some, but not all, of the necessary data
529 /// data
530 #[test]
531 fn test_metadata_decoder_prefetch_retry() {
532 let file_len = test_file_len();
533 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
534 // simulate pre-fetching the last 1500 bytes of the file.
535 // this is enough to read the footer thrift metadata, but not the offset indexes
536 let prefetch_range = (file_len - 1500)..file_len;
537 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
538
539 // expect another request is needed to read the offset indexes (note
540 // try_decode only returns NeedsData once, whereas without any prefetching it would
541 // return NeedsData three times)
542 let ranges = expect_needs_data(metadata_decoder.try_decode());
543 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
544
545 // expect the decoder has enough data to decode the metadata
546 let metadata = expect_data(metadata_decoder.try_decode());
547 expect_finished(metadata_decoder.try_decode());
548
549 assert_eq!(metadata.num_row_groups(), 2);
550 assert_eq!(metadata.row_group(0).num_rows(), 200);
551 assert_eq!(metadata.row_group(1).num_rows(), 200);
552 assert!(metadata.column_index().is_some());
553 assert!(metadata.offset_index().is_some());
554 }
555
556 /// Decode the metadata incrementally, simulating a scenario where exactly the data needed
557 /// is read in each step
558 #[test]
559 fn test_metadata_decoder_incremental() {
560 let file_len = TEST_FILE_DATA.len() as u64;
561 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
562 let ranges = expect_needs_data(metadata_decoder.try_decode());
563 assert_eq!(ranges.len(), 1);
564 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
565 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
566
567 // expect the first request to read the footer
568 let ranges = expect_needs_data(metadata_decoder.try_decode());
569 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
570
571 // expect the second request to read the offset indexes
572 let ranges = expect_needs_data(metadata_decoder.try_decode());
573 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
574
575 // expect the third request to read the actual data
576 let metadata = expect_data(metadata_decoder.try_decode());
577 expect_finished(metadata_decoder.try_decode());
578
579 assert_eq!(metadata.num_row_groups(), 2);
580 assert_eq!(metadata.row_group(0).num_rows(), 200);
581 assert_eq!(metadata.row_group(1).num_rows(), 200);
582 assert!(metadata.column_index().is_some());
583 assert!(metadata.offset_index().is_some());
584 }
585
586 /// Decode the metadata incrementally, but without reading the page indexes
587 /// (so only two requests)
588 #[test]
589 fn test_metadata_decoder_incremental_no_page_index() {
590 let file_len = TEST_FILE_DATA.len() as u64;
591 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
592 .unwrap()
593 .with_page_index_policy(PageIndexPolicy::Skip);
594 let ranges = expect_needs_data(metadata_decoder.try_decode());
595 assert_eq!(ranges.len(), 1);
596 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
597 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
598
599 // expect the first request to read the footer
600 let ranges = expect_needs_data(metadata_decoder.try_decode());
601 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
602
603 // expect NO second request to read the offset indexes, should just cough up the metadata
604 let metadata = expect_data(metadata_decoder.try_decode());
605 expect_finished(metadata_decoder.try_decode());
606
607 assert_eq!(metadata.num_row_groups(), 2);
608 assert_eq!(metadata.row_group(0).num_rows(), 200);
609 assert_eq!(metadata.row_group(1).num_rows(), 200);
610 assert!(metadata.column_index().is_none()); // of course, we did not read the column index
611 assert!(metadata.offset_index().is_none()); // or the offset index
612 }
613
614 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
615 // Input batch has 400 rows, with 3 columns: "a", "b", "c"
616 // Note c is a different types (so the data page sizes will be different)
617 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
618 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
619 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
620 if i % 2 == 0 {
621 format!("string_{i}")
622 } else {
623 format!("A string larger than 12 bytes and thus not inlined {i}")
624 }
625 })));
626
627 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
628 });
629
630 /// Create a parquet file in memory for testing. See [`test_file_range`] for details.
631 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
632 let input_batch = &TEST_BATCH;
633 let mut output = Vec::new();
634
635 let writer_options = WriterProperties::builder()
636 .set_max_row_group_size(200)
637 .set_data_page_row_count_limit(100)
638 .build();
639 let mut writer =
640 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
641
642 // since the limits are only enforced on batch boundaries, write the input
643 // batch in chunks of 50
644 let mut row_remain = input_batch.num_rows();
645 while row_remain > 0 {
646 let chunk_size = row_remain.min(50);
647 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
648 writer.write(&chunk).unwrap();
649 row_remain -= chunk_size;
650 }
651 writer.close().unwrap();
652 Bytes::from(output)
653 });
654
655 /// Return the length of the test file in bytes
656 fn test_file_len() -> u64 {
657 TEST_FILE_DATA.len() as u64
658 }
659
660 /// Return the range of the entire test file
661 fn test_file_range() -> Range<u64> {
662 0..test_file_len()
663 }
664
665 /// Return a slice of the test file data from the given range
666 pub fn test_file_slice(range: Range<u64>) -> Bytes {
667 let start: usize = range.start.try_into().unwrap();
668 let end: usize = range.end.try_into().unwrap();
669 TEST_FILE_DATA.slice(start..end)
670 }
671
672 /// Push the given ranges to the metadata decoder, simulating reading from a file
673 fn push_ranges_to_metadata_decoder(
674 metadata_decoder: &mut ParquetMetaDataPushDecoder,
675 ranges: Vec<Range<u64>>,
676 ) {
677 let data = ranges
678 .iter()
679 .map(|range| test_file_slice(range.clone()))
680 .collect::<Vec<_>>();
681 metadata_decoder.push_ranges(ranges, data).unwrap();
682 }
683
684 /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
685 fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
686 match result.expect("Expected Ok(DecodeResult::Data(T))") {
687 DecodeResult::Data(data) => data,
688 result => panic!("Expected DecodeResult::Data, got {result:?}"),
689 }
690 }
691
692 /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
693 fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
694 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
695 DecodeResult::NeedsData(ranges) => ranges,
696 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
697 }
698 }
699
700 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
701 match result.expect("Expected Ok(DecodeResult::Finished)") {
702 DecodeResult::Finished => {}
703 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
704 }
705 }
706}