1use crate::DecodeResult;
19#[cfg(feature = "encryption")]
20use crate::encryption::decrypt::FileDecryptionProperties;
21use crate::errors::{ParquetError, Result};
22use crate::file::FOOTER_SIZE;
23use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
24use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
25use crate::file::page_index::index_reader::acc_range;
26use crate::file::reader::ChunkReader;
27use bytes::Bytes;
28use std::ops::Range;
29use std::sync::Arc;
30
31#[cfg_attr(
52 feature = "arrow",
53 doc = r##"
54```rust
55# use std::ops::Range;
56# use bytes::Bytes;
57# use arrow_array::record_batch;
58# use parquet::DecodeResult;
59# use parquet::arrow::ArrowWriter;
60# use parquet::errors::ParquetError;
61# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
62#
63# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
64# let file_bytes = {
65# let mut buffer = vec![0];
66# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
67# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
68# writer.write(&batch).unwrap();
69# writer.close().unwrap();
70# Bytes::from(buffer)
71# };
72# // mimic IO by returning a function that returns the bytes for a given range
73# let get_range = |range: &Range<u64>| -> Bytes {
74# let start = range.start as usize;
75# let end = range.end as usize;
76# file_bytes.slice(start..end)
77# };
78#
79# let file_len = file_bytes.len() as u64;
80// The `ParquetMetaDataPushDecoder` needs to know the file length.
81let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
82// try to decode the metadata. If more data is needed, the decoder will tell you what ranges
83loop {
84 match decoder.try_decode() {
85 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
86 Ok(DecodeResult::NeedsData(ranges)) => {
87 // The decoder needs more data
88 //
89 // In this example, we call a function that returns the bytes for each given range.
90 // In a real application, you would likely read the data from a file or network.
91 let data = ranges.iter().map(|range| get_range(range)).collect();
92 // Push the data into the decoder and try to decode again on the next iteration.
93 decoder.push_ranges(ranges, data).unwrap();
94 }
95 Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
96 Err(e) => return Err(e),
97 }
98}
99# }
100```
101"##
102)]
103#[cfg_attr(
124 feature = "arrow",
125 doc = r##"
126```rust
127# use std::ops::Range;
128# use bytes::Bytes;
129# use arrow_array::record_batch;
130# use parquet::DecodeResult;
131# use parquet::arrow::ArrowWriter;
132# use parquet::errors::ParquetError;
133# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
134#
135# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
136# let file_bytes = {
137# let mut buffer = vec![0];
138# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
139# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
140# writer.write(&batch).unwrap();
141# writer.close().unwrap();
142# Bytes::from(buffer)
143# };
144#
145let file_len = file_bytes.len() as u64;
146// For this example, we "prefetch" all the bytes which we have in memory,
147// but in a real application, you would likely read a chunk from the end
148// for example 1MB.
149let prefetched_bytes = file_bytes.clone();
150let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
151// push the prefetched bytes into the decoder
152decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
153// The decoder will now be able to decode the metadata. Note in a real application,
154// unless you can guarantee that the pushed data is enough to decode the metadata,
155// you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
156// as shown in the previous example
157 match decoder.try_decode() {
158 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
159 other => { panic!("expected DecodeResult::Data, got: {other:?}") }
160 }
161# }
162```
163"##
164)]
165#[cfg_attr(
174 feature = "arrow",
175 doc = r##"
176```rust
177# use std::ops::Range;
178# use bytes::Bytes;
179use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
180# use arrow_array::record_batch;
181# use parquet::DecodeResult;
182# use parquet::arrow::ArrowWriter;
183# use parquet::errors::ParquetError;
184# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
185#
186// This function decodes Parquet Metadata from anything that implements
187// [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
188async fn decode_metadata(
189 file_len: u64,
190 mut async_source: impl AsyncRead + AsyncSeek + Unpin
191) -> Result<ParquetMetaData, ParquetError> {
192 // We need a ParquetMetaDataPushDecoder to decode the metadata.
193 let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
194 loop {
195 match decoder.try_decode() {
196 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
197 Ok(DecodeResult::NeedsData(ranges)) => {
198 // The decoder needs more data
199 //
200 // In this example we use the AsyncRead and AsyncSeek traits to read the
201 // required ranges from the async source.
202 let mut data = Vec::with_capacity(ranges.len());
203 for range in &ranges {
204 let mut buffer = vec![0; (range.end - range.start) as usize];
205 async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
206 async_source.read_exact(&mut buffer).await?;
207 data.push(Bytes::from(buffer));
208 }
209 // Push the data into the decoder and try to decode again on the next iteration.
210 decoder.push_ranges(ranges, data).unwrap();
211 }
212 Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
213 Err(e) => return Err(e),
214 }
215 }
216}
217```
218"##
219)]
220#[derive(Debug)]
222pub struct ParquetMetaDataPushDecoder {
223 state: DecodeState,
225 column_index_policy: PageIndexPolicy,
227 offset_index_policy: PageIndexPolicy,
229 buffers: crate::util::push_buffers::PushBuffers,
231 metadata_parser: MetadataParser,
233}
234
235impl ParquetMetaDataPushDecoder {
236 pub fn try_new(file_len: u64) -> Result<Self> {
243 if file_len < 8 {
244 return Err(ParquetError::General(format!(
245 "Parquet files are at least 8 bytes long, but file length is {file_len}"
246 )));
247 };
248
249 Ok(Self {
250 state: DecodeState::ReadingFooter,
251 column_index_policy: PageIndexPolicy::Optional,
252 offset_index_policy: PageIndexPolicy::Optional,
253 buffers: crate::util::push_buffers::PushBuffers::new(file_len),
254 metadata_parser: MetadataParser::new(),
255 })
256 }
257
258 pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
260 let mut new_self = Self::try_new(file_len)?;
261 new_self.state = DecodeState::ReadingMetadata(footer_tail);
262 Ok(new_self)
263 }
264
265 pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
270 let mut new_self = Self::try_new(file_len)?;
271 new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
272 Ok(new_self)
273 }
274
275 pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
286 self.column_index_policy = page_index_policy;
287 self.offset_index_policy = page_index_policy;
288 self
289 }
290
291 pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
293 self.column_index_policy = column_index_policy;
294 self
295 }
296
297 pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
299 self.offset_index_policy = offset_index_policy;
300 self
301 }
302
303 pub fn with_metadata_options(mut self, options: Option<Arc<ParquetMetaDataOptions>>) -> Self {
305 self.metadata_parser = self.metadata_parser.with_metadata_options(options);
306 self
307 }
308
309 #[cfg(feature = "encryption")]
310 pub(crate) fn with_file_decryption_properties(
312 mut self,
313 file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
314 ) -> Self {
315 self.metadata_parser = self
316 .metadata_parser
317 .with_file_decryption_properties(file_decryption_properties);
318 self
319 }
320
321 pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
341 if matches!(&self.state, DecodeState::Finished) {
342 return Err(general_err!(
343 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
344 ));
345 }
346 self.buffers.push_ranges(ranges, buffers);
347 Ok(())
348 }
349
350 pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
352 if matches!(&self.state, DecodeState::Finished) {
353 return Err(general_err!(
354 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
355 ));
356 }
357 self.buffers.push_range(range, buffer);
358 Ok(())
359 }
360
361 pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
364 let file_len = self.buffers.file_len();
365 let footer_len = FOOTER_SIZE as u64;
366 loop {
367 match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
368 DecodeState::ReadingFooter => {
369 let footer_start = file_len.saturating_sub(footer_len);
371 let footer_range = footer_start..file_len;
372
373 if !self.buffers.has_range(&footer_range) {
374 self.state = DecodeState::ReadingFooter;
375 return Ok(needs_range(footer_range));
376 }
377 let footer_bytes = self.get_bytes(&footer_range)?;
378 let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
379
380 self.state = DecodeState::ReadingMetadata(footer_tail);
381 continue;
382 }
383
384 DecodeState::ReadingMetadata(footer_tail) => {
385 let metadata_len: u64 = footer_tail.metadata_length() as u64;
386 let metadata_start = file_len - footer_len - metadata_len;
387 let metadata_end = metadata_start + metadata_len;
388 let metadata_range = metadata_start..metadata_end;
389
390 if !self.buffers.has_range(&metadata_range) {
391 self.state = DecodeState::ReadingMetadata(footer_tail);
392 return Ok(needs_range(metadata_range));
393 }
394
395 let metadata = self.metadata_parser.decode_metadata(
396 &self.get_bytes(&metadata_range)?,
397 footer_tail.is_encrypted_footer(),
398 )?;
399 self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
402 continue;
403 }
404
405 DecodeState::ReadingPageIndex(mut metadata) => {
406 let range = range_for_page_index(
409 &metadata,
410 self.column_index_policy,
411 self.offset_index_policy,
412 );
413
414 let Some(page_index_range) = range else {
415 self.state = DecodeState::Finished;
416 return Ok(DecodeResult::Data(*metadata));
417 };
418
419 if !self.buffers.has_range(&page_index_range) {
420 self.state = DecodeState::ReadingPageIndex(metadata);
421 return Ok(needs_range(page_index_range));
422 }
423
424 let buffer = self.get_bytes(&page_index_range)?;
425 let offset = page_index_range.start;
426 parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
427 parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
428 self.state = DecodeState::Finished;
429 return Ok(DecodeResult::Data(*metadata));
430 }
431
432 DecodeState::Finished => return Ok(DecodeResult::Finished),
433 DecodeState::Intermediate => {
434 return Err(general_err!(
435 "ParquetMetaDataPushDecoder: internal error, invalid state"
436 ));
437 }
438 }
439 }
440 }
441
442 fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
444 let start = range.start;
445 let raw_len = range.end - range.start;
446 let len: usize = raw_len.try_into().map_err(|_| {
447 ParquetError::General(format!(
448 "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
449 ))
450 })?;
451 self.buffers.get_bytes(start, len)
452 }
453}
454
455fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
457 DecodeResult::NeedsData(vec![range])
458}
459
460#[derive(Debug)]
462enum DecodeState {
463 ReadingFooter,
465 ReadingMetadata(FooterTail),
467 ReadingPageIndex(Box<ParquetMetaData>),
469 Finished,
471 Intermediate,
474}
475
476pub fn range_for_page_index(
481 metadata: &ParquetMetaData,
482 column_index_policy: PageIndexPolicy,
483 offset_index_policy: PageIndexPolicy,
484) -> Option<Range<u64>> {
485 let mut range = None;
486 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
487 if column_index_policy != PageIndexPolicy::Skip {
488 range = acc_range(range, c.column_index_range());
489 }
490 if offset_index_policy != PageIndexPolicy::Skip {
491 range = acc_range(range, c.offset_index_range());
492 }
493 }
494 range
495}
496
497#[cfg(all(test, feature = "arrow"))]
500mod tests {
501 use super::*;
502 use crate::arrow::ArrowWriter;
503 use crate::file::properties::WriterProperties;
504 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
505 use bytes::Bytes;
506 use std::fmt::Debug;
507 use std::ops::Range;
508 use std::sync::{Arc, LazyLock};
509
510 #[test]
512 fn test_metadata_decoder_all_data() {
513 let file_len = test_file_len();
514 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
515 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
517
518 let metadata = expect_data(metadata_decoder.try_decode());
520
521 assert_eq!(metadata.num_row_groups(), 2);
522 assert_eq!(metadata.row_group(0).num_rows(), 200);
523 assert_eq!(metadata.row_group(1).num_rows(), 200);
524 assert!(metadata.column_index().is_some());
525 assert!(metadata.offset_index().is_some());
526 }
527
528 #[test]
531 fn test_metadata_decoder_prefetch_success() {
532 let file_len = test_file_len();
533 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
534 let prefetch_range = (file_len - 2 * 1024)..file_len;
536 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
537
538 let metadata = expect_data(metadata_decoder.try_decode());
540 expect_finished(metadata_decoder.try_decode());
541 assert_eq!(metadata.num_row_groups(), 2);
542 assert_eq!(metadata.row_group(0).num_rows(), 200);
543 assert_eq!(metadata.row_group(1).num_rows(), 200);
544 assert!(metadata.column_index().is_some());
545 assert!(metadata.offset_index().is_some());
546 }
547
548 #[test]
551 fn test_metadata_decoder_prefetch_retry() {
552 let file_len = test_file_len();
553 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
554 let prefetch_range = (file_len - 1500)..file_len;
557 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
558
559 let ranges = expect_needs_data(metadata_decoder.try_decode());
563 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
564
565 let metadata = expect_data(metadata_decoder.try_decode());
567 expect_finished(metadata_decoder.try_decode());
568
569 assert_eq!(metadata.num_row_groups(), 2);
570 assert_eq!(metadata.row_group(0).num_rows(), 200);
571 assert_eq!(metadata.row_group(1).num_rows(), 200);
572 assert!(metadata.column_index().is_some());
573 assert!(metadata.offset_index().is_some());
574 }
575
576 #[test]
579 fn test_metadata_decoder_incremental() {
580 let file_len = TEST_FILE_DATA.len() as u64;
581 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
582 let ranges = expect_needs_data(metadata_decoder.try_decode());
583 assert_eq!(ranges.len(), 1);
584 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
585 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
586
587 let ranges = expect_needs_data(metadata_decoder.try_decode());
589 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
590
591 let ranges = expect_needs_data(metadata_decoder.try_decode());
593 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
594
595 let metadata = expect_data(metadata_decoder.try_decode());
597 expect_finished(metadata_decoder.try_decode());
598
599 assert_eq!(metadata.num_row_groups(), 2);
600 assert_eq!(metadata.row_group(0).num_rows(), 200);
601 assert_eq!(metadata.row_group(1).num_rows(), 200);
602 assert!(metadata.column_index().is_some());
603 assert!(metadata.offset_index().is_some());
604 }
605
606 #[test]
609 fn test_metadata_decoder_incremental_no_page_index() {
610 let file_len = TEST_FILE_DATA.len() as u64;
611 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
612 .unwrap()
613 .with_page_index_policy(PageIndexPolicy::Skip);
614 let ranges = expect_needs_data(metadata_decoder.try_decode());
615 assert_eq!(ranges.len(), 1);
616 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
617 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
618
619 let ranges = expect_needs_data(metadata_decoder.try_decode());
621 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
622
623 let metadata = expect_data(metadata_decoder.try_decode());
625 expect_finished(metadata_decoder.try_decode());
626
627 assert_eq!(metadata.num_row_groups(), 2);
628 assert_eq!(metadata.row_group(0).num_rows(), 200);
629 assert_eq!(metadata.row_group(1).num_rows(), 200);
630 assert!(metadata.column_index().is_none()); assert!(metadata.offset_index().is_none()); }
633
634 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
635 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
638 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
639 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
640 if i % 2 == 0 {
641 format!("string_{i}")
642 } else {
643 format!("A string larger than 12 bytes and thus not inlined {i}")
644 }
645 })));
646
647 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
648 });
649
650 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
652 let input_batch = &TEST_BATCH;
653 let mut output = Vec::new();
654
655 let writer_options = WriterProperties::builder()
656 .set_max_row_group_size(200)
657 .set_data_page_row_count_limit(100)
658 .build();
659 let mut writer =
660 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
661
662 let mut row_remain = input_batch.num_rows();
665 while row_remain > 0 {
666 let chunk_size = row_remain.min(50);
667 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
668 writer.write(&chunk).unwrap();
669 row_remain -= chunk_size;
670 }
671 writer.close().unwrap();
672 Bytes::from(output)
673 });
674
675 fn test_file_len() -> u64 {
677 TEST_FILE_DATA.len() as u64
678 }
679
680 fn test_file_range() -> Range<u64> {
682 0..test_file_len()
683 }
684
685 pub fn test_file_slice(range: Range<u64>) -> Bytes {
687 let start: usize = range.start.try_into().unwrap();
688 let end: usize = range.end.try_into().unwrap();
689 TEST_FILE_DATA.slice(start..end)
690 }
691
692 fn push_ranges_to_metadata_decoder(
694 metadata_decoder: &mut ParquetMetaDataPushDecoder,
695 ranges: Vec<Range<u64>>,
696 ) {
697 let data = ranges
698 .iter()
699 .map(|range| test_file_slice(range.clone()))
700 .collect::<Vec<_>>();
701 metadata_decoder.push_ranges(ranges, data).unwrap();
702 }
703
704 fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
706 match result.expect("Expected Ok(DecodeResult::Data(T))") {
707 DecodeResult::Data(data) => data,
708 result => panic!("Expected DecodeResult::Data, got {result:?}"),
709 }
710 }
711
712 fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
714 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
715 DecodeResult::NeedsData(ranges) => ranges,
716 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
717 }
718 }
719
720 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
721 match result.expect("Expected Ok(DecodeResult::Finished)") {
722 DecodeResult::Finished => {}
723 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
724 }
725 }
726}