1use crate::DecodeResult;
19#[cfg(feature = "encryption")]
20use crate::encryption::decrypt::FileDecryptionProperties;
21use crate::errors::{ParquetError, Result};
22use crate::file::FOOTER_SIZE;
23use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
24use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
25use crate::file::page_index::index_reader::acc_range;
26use crate::file::reader::ChunkReader;
27use bytes::Bytes;
28use std::ops::Range;
29use std::sync::Arc;
30
31#[cfg_attr(
52 feature = "arrow",
53 doc = r##"
54```rust
55# use std::ops::Range;
56# use bytes::Bytes;
57# use arrow_array::record_batch;
58# use parquet::DecodeResult;
59# use parquet::arrow::ArrowWriter;
60# use parquet::errors::ParquetError;
61# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
62#
63# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
64# let file_bytes = {
65# let mut buffer = vec![0];
66# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
67# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
68# writer.write(&batch).unwrap();
69# writer.close().unwrap();
70# Bytes::from(buffer)
71# };
72# // mimic IO by returning a function that returns the bytes for a given range
73# let get_range = |range: &Range<u64>| -> Bytes {
74# let start = range.start as usize;
75# let end = range.end as usize;
76# file_bytes.slice(start..end)
77# };
78#
79# let file_len = file_bytes.len() as u64;
80// The `ParquetMetaDataPushDecoder` needs to know the file length.
81let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
82// try to decode the metadata. If more data is needed, the decoder will tell you what ranges
83loop {
84 match decoder.try_decode() {
85 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
86 Ok(DecodeResult::NeedsData(ranges)) => {
87 // The decoder needs more data
88 //
89 // In this example, we call a function that returns the bytes for each given range.
90 // In a real application, you would likely read the data from a file or network.
91 let data = ranges.iter().map(|range| get_range(range)).collect();
92 // Push the data into the decoder and try to decode again on the next iteration.
93 decoder.push_ranges(ranges, data).unwrap();
94 }
95 Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
96 Err(e) => return Err(e),
97 }
98}
99# }
100```
101"##
102)]
103#[cfg_attr(
124 feature = "arrow",
125 doc = r##"
126```rust
127# use std::ops::Range;
128# use bytes::Bytes;
129# use arrow_array::record_batch;
130# use parquet::DecodeResult;
131# use parquet::arrow::ArrowWriter;
132# use parquet::errors::ParquetError;
133# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
134#
135# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
136# let file_bytes = {
137# let mut buffer = vec![0];
138# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
139# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
140# writer.write(&batch).unwrap();
141# writer.close().unwrap();
142# Bytes::from(buffer)
143# };
144#
145let file_len = file_bytes.len() as u64;
146// For this example, we "prefetch" all the bytes which we have in memory,
147// but in a real application, you would likely read a chunk from the end
148// for example 1MB.
149let prefetched_bytes = file_bytes.clone();
150let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
151// push the prefetched bytes into the decoder
152decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
153// The decoder will now be able to decode the metadata. Note in a real application,
154// unless you can guarantee that the pushed data is enough to decode the metadata,
155// you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
156// as shown in the previous example
157 match decoder.try_decode() {
158 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
159 other => { panic!("expected DecodeResult::Data, got: {other:?}") }
160 }
161# }
162```
163"##
164)]
165#[cfg_attr(
174 feature = "arrow",
175 doc = r##"
176```rust
177# use std::ops::Range;
178# use bytes::Bytes;
179use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
180# use arrow_array::record_batch;
181# use parquet::DecodeResult;
182# use parquet::arrow::ArrowWriter;
183# use parquet::errors::ParquetError;
184# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
185#
186// This function decodes Parquet Metadata from anything that implements
187// [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
188async fn decode_metadata(
189 file_len: u64,
190 mut async_source: impl AsyncRead + AsyncSeek + Unpin
191) -> Result<ParquetMetaData, ParquetError> {
192 // We need a ParquetMetaDataPushDecoder to decode the metadata.
193 let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
194 loop {
195 match decoder.try_decode() {
196 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
197 Ok(DecodeResult::NeedsData(ranges)) => {
198 // The decoder needs more data
199 //
200 // In this example we use the AsyncRead and AsyncSeek traits to read the
201 // required ranges from the async source.
202 let mut data = Vec::with_capacity(ranges.len());
203 for range in &ranges {
204 let mut buffer = vec![0; (range.end - range.start) as usize];
205 async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
206 async_source.read_exact(&mut buffer).await?;
207 data.push(Bytes::from(buffer));
208 }
209 // Push the data into the decoder and try to decode again on the next iteration.
210 decoder.push_ranges(ranges, data).unwrap();
211 }
212 Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
213 Err(e) => return Err(e),
214 }
215 }
216}
217```
218"##
219)]
220#[derive(Debug)]
222pub struct ParquetMetaDataPushDecoder {
223 state: DecodeState,
225 column_index_policy: PageIndexPolicy,
227 offset_index_policy: PageIndexPolicy,
229 buffers: crate::util::push_buffers::PushBuffers,
231 metadata_parser: MetadataParser,
233}
234
235impl ParquetMetaDataPushDecoder {
236 pub fn try_new(file_len: u64) -> Result<Self> {
243 if file_len < 8 {
244 return Err(ParquetError::General(format!(
245 "Parquet files are at least 8 bytes long, but file length is {file_len}"
246 )));
247 };
248
249 Ok(Self {
250 state: DecodeState::ReadingFooter,
251 column_index_policy: PageIndexPolicy::Optional,
252 offset_index_policy: PageIndexPolicy::Optional,
253 buffers: crate::util::push_buffers::PushBuffers::new(file_len),
254 metadata_parser: MetadataParser::new(),
255 })
256 }
257
258 pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
260 let mut new_self = Self::try_new(file_len)?;
261 new_self.state = DecodeState::ReadingMetadata(footer_tail);
262 Ok(new_self)
263 }
264
265 pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
270 let mut new_self = Self::try_new(file_len)?;
271 new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
272 Ok(new_self)
273 }
274
275 pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
286 self.column_index_policy = page_index_policy;
287 self.offset_index_policy = page_index_policy;
288 self
289 }
290
291 pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
293 self.column_index_policy = column_index_policy;
294 self
295 }
296
297 pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
299 self.offset_index_policy = offset_index_policy;
300 self
301 }
302
303 pub fn with_metadata_options(mut self, options: Option<Arc<ParquetMetaDataOptions>>) -> Self {
305 self.metadata_parser = self.metadata_parser.with_metadata_options(options);
306 self
307 }
308
309 #[cfg(feature = "encryption")]
310 pub fn with_file_decryption_properties(
312 mut self,
313 file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
314 ) -> Self {
315 self.metadata_parser = self
316 .metadata_parser
317 .with_file_decryption_properties(file_decryption_properties);
318 self
319 }
320
321 pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
341 if matches!(&self.state, DecodeState::Finished) {
342 return Err(general_err!(
343 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
344 ));
345 }
346 self.buffers.push_ranges(ranges, buffers);
347 Ok(())
348 }
349
350 pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
352 if matches!(&self.state, DecodeState::Finished) {
353 return Err(general_err!(
354 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
355 ));
356 }
357 self.buffers.push_range(range, buffer);
358 Ok(())
359 }
360
361 pub fn clear_all_ranges(&mut self) {
363 self.buffers.clear_all_ranges();
364 }
365
366 pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
369 let file_len = self.buffers.file_len();
370 let footer_len = FOOTER_SIZE as u64;
371 loop {
372 match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
373 DecodeState::ReadingFooter => {
374 let footer_start = file_len.saturating_sub(footer_len);
376 let footer_range = footer_start..file_len;
377
378 if !self.buffers.has_range(&footer_range) {
379 self.state = DecodeState::ReadingFooter;
380 return Ok(needs_range(footer_range));
381 }
382 let footer_bytes = self.get_bytes(&footer_range)?;
383 let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
384
385 self.state = DecodeState::ReadingMetadata(footer_tail);
386 continue;
387 }
388
389 DecodeState::ReadingMetadata(footer_tail) => {
390 let metadata_len: u64 = footer_tail.metadata_length() as u64;
391 let metadata_start = file_len - footer_len - metadata_len;
392 let metadata_end = metadata_start + metadata_len;
393 let metadata_range = metadata_start..metadata_end;
394
395 if !self.buffers.has_range(&metadata_range) {
396 self.state = DecodeState::ReadingMetadata(footer_tail);
397 return Ok(needs_range(metadata_range));
398 }
399
400 let metadata = self.metadata_parser.decode_metadata(
401 &self.get_bytes(&metadata_range)?,
402 footer_tail.is_encrypted_footer(),
403 )?;
404 self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
407 continue;
408 }
409
410 DecodeState::ReadingPageIndex(mut metadata) => {
411 let range = range_for_page_index(
414 &metadata,
415 self.column_index_policy,
416 self.offset_index_policy,
417 );
418
419 let Some(page_index_range) = range else {
420 self.state = DecodeState::Finished;
421 return Ok(DecodeResult::Data(*metadata));
422 };
423
424 if !self.buffers.has_range(&page_index_range) {
425 self.state = DecodeState::ReadingPageIndex(metadata);
426 return Ok(needs_range(page_index_range));
427 }
428
429 let buffer = self.get_bytes(&page_index_range)?;
430 let offset = page_index_range.start;
431 parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
432 parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
433 self.state = DecodeState::Finished;
434 return Ok(DecodeResult::Data(*metadata));
435 }
436
437 DecodeState::Finished => return Ok(DecodeResult::Finished),
438 DecodeState::Intermediate => {
439 return Err(general_err!(
440 "ParquetMetaDataPushDecoder: internal error, invalid state"
441 ));
442 }
443 }
444 }
445 }
446
447 fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
449 let start = range.start;
450 let raw_len = range.end - range.start;
451 let len: usize = raw_len.try_into().map_err(|_| {
452 ParquetError::General(format!(
453 "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
454 ))
455 })?;
456 self.buffers.get_bytes(start, len)
457 }
458}
459
460fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
462 DecodeResult::NeedsData(vec![range])
463}
464
465#[derive(Debug)]
467enum DecodeState {
468 ReadingFooter,
470 ReadingMetadata(FooterTail),
472 ReadingPageIndex(Box<ParquetMetaData>),
474 Finished,
476 Intermediate,
479}
480
481pub fn range_for_page_index(
486 metadata: &ParquetMetaData,
487 column_index_policy: PageIndexPolicy,
488 offset_index_policy: PageIndexPolicy,
489) -> Option<Range<u64>> {
490 let mut range = None;
491 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
492 if column_index_policy != PageIndexPolicy::Skip {
493 range = acc_range(range, c.column_index_range());
494 }
495 if offset_index_policy != PageIndexPolicy::Skip {
496 range = acc_range(range, c.offset_index_range());
497 }
498 }
499 range
500}
501
502#[cfg(all(test, feature = "arrow"))]
505mod tests {
506 use super::*;
507 use crate::arrow::ArrowWriter;
508 use crate::file::properties::WriterProperties;
509 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
510 use bytes::Bytes;
511 use std::fmt::Debug;
512 use std::ops::Range;
513 use std::sync::{Arc, LazyLock};
514
515 #[test]
517 fn test_metadata_decoder_all_data() {
518 let file_len = test_file_len();
519 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
520 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
522
523 let metadata = expect_data(metadata_decoder.try_decode());
525
526 assert_eq!(metadata.num_row_groups(), 2);
527 assert_eq!(metadata.row_group(0).num_rows(), 200);
528 assert_eq!(metadata.row_group(1).num_rows(), 200);
529 assert!(metadata.column_index().is_some());
530 assert!(metadata.offset_index().is_some());
531 }
532
533 #[test]
536 fn test_metadata_decoder_prefetch_success() {
537 let file_len = test_file_len();
538 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
539 let prefetch_range = (file_len - 2 * 1024)..file_len;
541 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
542
543 let metadata = expect_data(metadata_decoder.try_decode());
545 expect_finished(metadata_decoder.try_decode());
546 assert_eq!(metadata.num_row_groups(), 2);
547 assert_eq!(metadata.row_group(0).num_rows(), 200);
548 assert_eq!(metadata.row_group(1).num_rows(), 200);
549 assert!(metadata.column_index().is_some());
550 assert!(metadata.offset_index().is_some());
551 }
552
553 #[test]
556 fn test_metadata_decoder_prefetch_retry() {
557 let file_len = test_file_len();
558 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
559 let prefetch_range = (file_len - 1500)..file_len;
562 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
563
564 let ranges = expect_needs_data(metadata_decoder.try_decode());
568 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
569
570 let metadata = expect_data(metadata_decoder.try_decode());
572 expect_finished(metadata_decoder.try_decode());
573
574 assert_eq!(metadata.num_row_groups(), 2);
575 assert_eq!(metadata.row_group(0).num_rows(), 200);
576 assert_eq!(metadata.row_group(1).num_rows(), 200);
577 assert!(metadata.column_index().is_some());
578 assert!(metadata.offset_index().is_some());
579 }
580
581 #[test]
582 fn test_metadata_decoder_clear_all_ranges() {
583 let file_len = test_file_len();
584 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
585
586 metadata_decoder
587 .push_range(test_file_range(), TEST_FILE_DATA.clone())
588 .unwrap();
589 assert_eq!(metadata_decoder.buffers.buffered_bytes(), test_file_len());
590
591 metadata_decoder.clear_all_ranges();
592 assert_eq!(metadata_decoder.buffers.buffered_bytes(), 0);
593
594 let ranges = expect_needs_data(metadata_decoder.try_decode());
595 assert_eq!(ranges, vec![test_file_len() - 8..test_file_len()]);
596 }
597
598 #[test]
601 fn test_metadata_decoder_incremental() {
602 let file_len = TEST_FILE_DATA.len() as u64;
603 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
604 let ranges = expect_needs_data(metadata_decoder.try_decode());
605 assert_eq!(ranges.len(), 1);
606 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
607 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
608
609 let ranges = expect_needs_data(metadata_decoder.try_decode());
611 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
612
613 let ranges = expect_needs_data(metadata_decoder.try_decode());
615 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
616
617 let metadata = expect_data(metadata_decoder.try_decode());
619 expect_finished(metadata_decoder.try_decode());
620
621 assert_eq!(metadata.num_row_groups(), 2);
622 assert_eq!(metadata.row_group(0).num_rows(), 200);
623 assert_eq!(metadata.row_group(1).num_rows(), 200);
624 assert!(metadata.column_index().is_some());
625 assert!(metadata.offset_index().is_some());
626 }
627
628 #[test]
631 fn test_metadata_decoder_incremental_no_page_index() {
632 let file_len = TEST_FILE_DATA.len() as u64;
633 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
634 .unwrap()
635 .with_page_index_policy(PageIndexPolicy::Skip);
636 let ranges = expect_needs_data(metadata_decoder.try_decode());
637 assert_eq!(ranges.len(), 1);
638 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
639 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
640
641 let ranges = expect_needs_data(metadata_decoder.try_decode());
643 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
644
645 let metadata = expect_data(metadata_decoder.try_decode());
647 expect_finished(metadata_decoder.try_decode());
648
649 assert_eq!(metadata.num_row_groups(), 2);
650 assert_eq!(metadata.row_group(0).num_rows(), 200);
651 assert_eq!(metadata.row_group(1).num_rows(), 200);
652 assert!(metadata.column_index().is_none()); assert!(metadata.offset_index().is_none()); }
655
656 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
657 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
660 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
661 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
662 if i % 2 == 0 {
663 format!("string_{i}")
664 } else {
665 format!("A string larger than 12 bytes and thus not inlined {i}")
666 }
667 })));
668
669 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
670 });
671
672 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
674 let input_batch = &TEST_BATCH;
675 let mut output = Vec::new();
676
677 let writer_options = WriterProperties::builder()
678 .set_max_row_group_row_count(Some(200))
679 .set_data_page_row_count_limit(100)
680 .build();
681 let mut writer =
682 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
683
684 let mut row_remain = input_batch.num_rows();
687 while row_remain > 0 {
688 let chunk_size = row_remain.min(50);
689 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
690 writer.write(&chunk).unwrap();
691 row_remain -= chunk_size;
692 }
693 writer.close().unwrap();
694 Bytes::from(output)
695 });
696
697 fn test_file_len() -> u64 {
699 TEST_FILE_DATA.len() as u64
700 }
701
702 fn test_file_range() -> Range<u64> {
704 0..test_file_len()
705 }
706
707 pub fn test_file_slice(range: Range<u64>) -> Bytes {
709 let start: usize = range.start.try_into().unwrap();
710 let end: usize = range.end.try_into().unwrap();
711 TEST_FILE_DATA.slice(start..end)
712 }
713
714 fn push_ranges_to_metadata_decoder(
716 metadata_decoder: &mut ParquetMetaDataPushDecoder,
717 ranges: Vec<Range<u64>>,
718 ) {
719 let data = ranges
720 .iter()
721 .map(|range| test_file_slice(range.clone()))
722 .collect::<Vec<_>>();
723 metadata_decoder.push_ranges(ranges, data).unwrap();
724 }
725
726 fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
728 match result.expect("Expected Ok(DecodeResult::Data(T))") {
729 DecodeResult::Data(data) => data,
730 result => panic!("Expected DecodeResult::Data, got {result:?}"),
731 }
732 }
733
734 fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
736 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
737 DecodeResult::NeedsData(ranges) => ranges,
738 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
739 }
740 }
741
742 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
743 match result.expect("Expected Ok(DecodeResult::Finished)") {
744 DecodeResult::Finished => {}
745 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
746 }
747 }
748}