1use crate::DecodeResult;
19#[cfg(feature = "encryption")]
20use crate::encryption::decrypt::FileDecryptionProperties;
21use crate::errors::{ParquetError, Result};
22use crate::file::FOOTER_SIZE;
23use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
24use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData};
25use crate::file::page_index::index_reader::acc_range;
26use crate::file::reader::ChunkReader;
27use bytes::Bytes;
28use std::ops::Range;
29
30#[cfg_attr(
51 feature = "arrow",
52 doc = r##"
53```rust
54# use std::ops::Range;
55# use bytes::Bytes;
56# use arrow_array::record_batch;
57# use parquet::DecodeResult;
58# use parquet::arrow::ArrowWriter;
59# use parquet::errors::ParquetError;
60# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
61#
62# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
63# let file_bytes = {
64# let mut buffer = vec![0];
65# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
66# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
67# writer.write(&batch).unwrap();
68# writer.close().unwrap();
69# Bytes::from(buffer)
70# };
71# // mimic IO by returning a function that returns the bytes for a given range
72# let get_range = |range: &Range<u64>| -> Bytes {
73# let start = range.start as usize;
74# let end = range.end as usize;
75# file_bytes.slice(start..end)
76# };
77#
78# let file_len = file_bytes.len() as u64;
79// The `ParquetMetaDataPushDecoder` needs to know the file length.
80let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
81// try to decode the metadata. If more data is needed, the decoder will tell you what ranges
82loop {
83 match decoder.try_decode() {
84 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
85 Ok(DecodeResult::NeedsData(ranges)) => {
86 // The decoder needs more data
87 //
88 // In this example, we call a function that returns the bytes for each given range.
89 // In a real application, you would likely read the data from a file or network.
90 let data = ranges.iter().map(|range| get_range(range)).collect();
91 // Push the data into the decoder and try to decode again on the next iteration.
92 decoder.push_ranges(ranges, data).unwrap();
93 }
94 Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
95 Err(e) => return Err(e),
96 }
97}
98# }
99```
100"##
101)]
102#[cfg_attr(
123 feature = "arrow",
124 doc = r##"
125```rust
126# use std::ops::Range;
127# use bytes::Bytes;
128# use arrow_array::record_batch;
129# use parquet::DecodeResult;
130# use parquet::arrow::ArrowWriter;
131# use parquet::errors::ParquetError;
132# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
133#
134# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
135# let file_bytes = {
136# let mut buffer = vec![0];
137# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
138# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
139# writer.write(&batch).unwrap();
140# writer.close().unwrap();
141# Bytes::from(buffer)
142# };
143#
144let file_len = file_bytes.len() as u64;
145// For this example, we "prefetch" all the bytes which we have in memory,
146// but in a real application, you would likely read a chunk from the end
147// for example 1MB.
148let prefetched_bytes = file_bytes.clone();
149let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
150// push the prefetched bytes into the decoder
151decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
152// The decoder will now be able to decode the metadata. Note in a real application,
153// unless you can guarantee that the pushed data is enough to decode the metadata,
154// you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
155// as shown in the previous example
156 match decoder.try_decode() {
157 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
158 other => { panic!("expected DecodeResult::Data, got: {other:?}") }
159 }
160# }
161```
162"##
163)]
164#[cfg_attr(
173 feature = "arrow",
174 doc = r##"
175```rust
176# use std::ops::Range;
177# use bytes::Bytes;
178use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
179# use arrow_array::record_batch;
180# use parquet::DecodeResult;
181# use parquet::arrow::ArrowWriter;
182# use parquet::errors::ParquetError;
183# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
184#
185// This function decodes Parquet Metadata from anything that implements
186// [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
187async fn decode_metadata(
188 file_len: u64,
189 mut async_source: impl AsyncRead + AsyncSeek + Unpin
190) -> Result<ParquetMetaData, ParquetError> {
191 // We need a ParquetMetaDataPushDecoder to decode the metadata.
192 let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
193 loop {
194 match decoder.try_decode() {
195 Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
196 Ok(DecodeResult::NeedsData(ranges)) => {
197 // The decoder needs more data
198 //
199 // In this example we use the AsyncRead and AsyncSeek traits to read the
200 // required ranges from the async source.
201 let mut data = Vec::with_capacity(ranges.len());
202 for range in &ranges {
203 let mut buffer = vec![0; (range.end - range.start) as usize];
204 async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
205 async_source.read_exact(&mut buffer).await?;
206 data.push(Bytes::from(buffer));
207 }
208 // Push the data into the decoder and try to decode again on the next iteration.
209 decoder.push_ranges(ranges, data).unwrap();
210 }
211 Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
212 Err(e) => return Err(e),
213 }
214 }
215}
216```
217"##
218)]
219#[derive(Debug)]
221pub struct ParquetMetaDataPushDecoder {
222 state: DecodeState,
224 column_index_policy: PageIndexPolicy,
226 offset_index_policy: PageIndexPolicy,
228 buffers: crate::util::push_buffers::PushBuffers,
230 metadata_parser: MetadataParser,
232}
233
234impl ParquetMetaDataPushDecoder {
235 pub fn try_new(file_len: u64) -> Result<Self> {
242 if file_len < 8 {
243 return Err(ParquetError::General(format!(
244 "Parquet files are at least 8 bytes long, but file length is {file_len}"
245 )));
246 };
247
248 Ok(Self {
249 state: DecodeState::ReadingFooter,
250 column_index_policy: PageIndexPolicy::Optional,
251 offset_index_policy: PageIndexPolicy::Optional,
252 buffers: crate::util::push_buffers::PushBuffers::new(file_len),
253 metadata_parser: MetadataParser::new(),
254 })
255 }
256
257 pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
259 let mut new_self = Self::try_new(file_len)?;
260 new_self.state = DecodeState::ReadingMetadata(footer_tail);
261 Ok(new_self)
262 }
263
264 pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
269 let mut new_self = Self::try_new(file_len)?;
270 new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
271 Ok(new_self)
272 }
273
274 pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
285 self.column_index_policy = page_index_policy;
286 self.offset_index_policy = page_index_policy;
287 self
288 }
289
290 pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
292 self.column_index_policy = column_index_policy;
293 self
294 }
295
296 pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
298 self.offset_index_policy = offset_index_policy;
299 self
300 }
301
302 #[cfg(feature = "encryption")]
303 pub(crate) fn with_file_decryption_properties(
305 mut self,
306 file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
307 ) -> Self {
308 self.metadata_parser = self
309 .metadata_parser
310 .with_file_decryption_properties(file_decryption_properties);
311 self
312 }
313
314 pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
334 if matches!(&self.state, DecodeState::Finished) {
335 return Err(general_err!(
336 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
337 ));
338 }
339 self.buffers.push_ranges(ranges, buffers);
340 Ok(())
341 }
342
343 pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
345 if matches!(&self.state, DecodeState::Finished) {
346 return Err(general_err!(
347 "ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
348 ));
349 }
350 self.buffers.push_range(range, buffer);
351 Ok(())
352 }
353
354 pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
357 let file_len = self.buffers.file_len();
358 let footer_len = FOOTER_SIZE as u64;
359 loop {
360 match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
361 DecodeState::ReadingFooter => {
362 let footer_start = file_len.saturating_sub(footer_len);
364 let footer_range = footer_start..file_len;
365
366 if !self.buffers.has_range(&footer_range) {
367 self.state = DecodeState::ReadingFooter;
368 return Ok(needs_range(footer_range));
369 }
370 let footer_bytes = self.get_bytes(&footer_range)?;
371 let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
372
373 self.state = DecodeState::ReadingMetadata(footer_tail);
374 continue;
375 }
376
377 DecodeState::ReadingMetadata(footer_tail) => {
378 let metadata_len: u64 = footer_tail.metadata_length() as u64;
379 let metadata_start = file_len - footer_len - metadata_len;
380 let metadata_end = metadata_start + metadata_len;
381 let metadata_range = metadata_start..metadata_end;
382
383 if !self.buffers.has_range(&metadata_range) {
384 self.state = DecodeState::ReadingMetadata(footer_tail);
385 return Ok(needs_range(metadata_range));
386 }
387
388 let metadata = self.metadata_parser.decode_metadata(
389 &self.get_bytes(&metadata_range)?,
390 footer_tail.is_encrypted_footer(),
391 )?;
392 self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
395 continue;
396 }
397
398 DecodeState::ReadingPageIndex(mut metadata) => {
399 let range = range_for_page_index(
402 &metadata,
403 self.column_index_policy,
404 self.offset_index_policy,
405 );
406
407 let Some(page_index_range) = range else {
408 self.state = DecodeState::Finished;
409 return Ok(DecodeResult::Data(*metadata));
410 };
411
412 if !self.buffers.has_range(&page_index_range) {
413 self.state = DecodeState::ReadingPageIndex(metadata);
414 return Ok(needs_range(page_index_range));
415 }
416
417 let buffer = self.get_bytes(&page_index_range)?;
418 let offset = page_index_range.start;
419 parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
420 parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
421 self.state = DecodeState::Finished;
422 return Ok(DecodeResult::Data(*metadata));
423 }
424
425 DecodeState::Finished => return Ok(DecodeResult::Finished),
426 DecodeState::Intermediate => {
427 return Err(general_err!(
428 "ParquetMetaDataPushDecoder: internal error, invalid state"
429 ));
430 }
431 }
432 }
433 }
434
435 fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
437 let start = range.start;
438 let raw_len = range.end - range.start;
439 let len: usize = raw_len.try_into().map_err(|_| {
440 ParquetError::General(format!(
441 "ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
442 ))
443 })?;
444 self.buffers.get_bytes(start, len)
445 }
446}
447
448fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
450 DecodeResult::NeedsData(vec![range])
451}
452
453#[derive(Debug)]
455enum DecodeState {
456 ReadingFooter,
458 ReadingMetadata(FooterTail),
460 ReadingPageIndex(Box<ParquetMetaData>),
462 Finished,
464 Intermediate,
467}
468
469pub fn range_for_page_index(
474 metadata: &ParquetMetaData,
475 column_index_policy: PageIndexPolicy,
476 offset_index_policy: PageIndexPolicy,
477) -> Option<Range<u64>> {
478 let mut range = None;
479 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
480 if column_index_policy != PageIndexPolicy::Skip {
481 range = acc_range(range, c.column_index_range());
482 }
483 if offset_index_policy != PageIndexPolicy::Skip {
484 range = acc_range(range, c.offset_index_range());
485 }
486 }
487 range
488}
489
490#[cfg(all(test, feature = "arrow"))]
493mod tests {
494 use super::*;
495 use crate::arrow::ArrowWriter;
496 use crate::file::properties::WriterProperties;
497 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
498 use bytes::Bytes;
499 use std::fmt::Debug;
500 use std::ops::Range;
501 use std::sync::{Arc, LazyLock};
502
503 #[test]
505 fn test_metadata_decoder_all_data() {
506 let file_len = test_file_len();
507 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
508 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
510
511 let metadata = expect_data(metadata_decoder.try_decode());
513
514 assert_eq!(metadata.num_row_groups(), 2);
515 assert_eq!(metadata.row_group(0).num_rows(), 200);
516 assert_eq!(metadata.row_group(1).num_rows(), 200);
517 assert!(metadata.column_index().is_some());
518 assert!(metadata.offset_index().is_some());
519 }
520
521 #[test]
524 fn test_metadata_decoder_prefetch_success() {
525 let file_len = test_file_len();
526 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
527 let prefetch_range = (file_len - 2 * 1024)..file_len;
529 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
530
531 let metadata = expect_data(metadata_decoder.try_decode());
533 expect_finished(metadata_decoder.try_decode());
534 assert_eq!(metadata.num_row_groups(), 2);
535 assert_eq!(metadata.row_group(0).num_rows(), 200);
536 assert_eq!(metadata.row_group(1).num_rows(), 200);
537 assert!(metadata.column_index().is_some());
538 assert!(metadata.offset_index().is_some());
539 }
540
541 #[test]
544 fn test_metadata_decoder_prefetch_retry() {
545 let file_len = test_file_len();
546 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
547 let prefetch_range = (file_len - 1500)..file_len;
550 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
551
552 let ranges = expect_needs_data(metadata_decoder.try_decode());
556 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
557
558 let metadata = expect_data(metadata_decoder.try_decode());
560 expect_finished(metadata_decoder.try_decode());
561
562 assert_eq!(metadata.num_row_groups(), 2);
563 assert_eq!(metadata.row_group(0).num_rows(), 200);
564 assert_eq!(metadata.row_group(1).num_rows(), 200);
565 assert!(metadata.column_index().is_some());
566 assert!(metadata.offset_index().is_some());
567 }
568
569 #[test]
572 fn test_metadata_decoder_incremental() {
573 let file_len = TEST_FILE_DATA.len() as u64;
574 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
575 let ranges = expect_needs_data(metadata_decoder.try_decode());
576 assert_eq!(ranges.len(), 1);
577 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
578 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
579
580 let ranges = expect_needs_data(metadata_decoder.try_decode());
582 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
583
584 let ranges = expect_needs_data(metadata_decoder.try_decode());
586 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
587
588 let metadata = expect_data(metadata_decoder.try_decode());
590 expect_finished(metadata_decoder.try_decode());
591
592 assert_eq!(metadata.num_row_groups(), 2);
593 assert_eq!(metadata.row_group(0).num_rows(), 200);
594 assert_eq!(metadata.row_group(1).num_rows(), 200);
595 assert!(metadata.column_index().is_some());
596 assert!(metadata.offset_index().is_some());
597 }
598
599 #[test]
602 fn test_metadata_decoder_incremental_no_page_index() {
603 let file_len = TEST_FILE_DATA.len() as u64;
604 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
605 .unwrap()
606 .with_page_index_policy(PageIndexPolicy::Skip);
607 let ranges = expect_needs_data(metadata_decoder.try_decode());
608 assert_eq!(ranges.len(), 1);
609 assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
610 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
611
612 let ranges = expect_needs_data(metadata_decoder.try_decode());
614 push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
615
616 let metadata = expect_data(metadata_decoder.try_decode());
618 expect_finished(metadata_decoder.try_decode());
619
620 assert_eq!(metadata.num_row_groups(), 2);
621 assert_eq!(metadata.row_group(0).num_rows(), 200);
622 assert_eq!(metadata.row_group(1).num_rows(), 200);
623 assert!(metadata.column_index().is_none()); assert!(metadata.offset_index().is_none()); }
626
627 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
628 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
631 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
632 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
633 if i % 2 == 0 {
634 format!("string_{i}")
635 } else {
636 format!("A string larger than 12 bytes and thus not inlined {i}")
637 }
638 })));
639
640 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
641 });
642
643 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
645 let input_batch = &TEST_BATCH;
646 let mut output = Vec::new();
647
648 let writer_options = WriterProperties::builder()
649 .set_max_row_group_size(200)
650 .set_data_page_row_count_limit(100)
651 .build();
652 let mut writer =
653 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
654
655 let mut row_remain = input_batch.num_rows();
658 while row_remain > 0 {
659 let chunk_size = row_remain.min(50);
660 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
661 writer.write(&chunk).unwrap();
662 row_remain -= chunk_size;
663 }
664 writer.close().unwrap();
665 Bytes::from(output)
666 });
667
668 fn test_file_len() -> u64 {
670 TEST_FILE_DATA.len() as u64
671 }
672
673 fn test_file_range() -> Range<u64> {
675 0..test_file_len()
676 }
677
678 pub fn test_file_slice(range: Range<u64>) -> Bytes {
680 let start: usize = range.start.try_into().unwrap();
681 let end: usize = range.end.try_into().unwrap();
682 TEST_FILE_DATA.slice(start..end)
683 }
684
685 fn push_ranges_to_metadata_decoder(
687 metadata_decoder: &mut ParquetMetaDataPushDecoder,
688 ranges: Vec<Range<u64>>,
689 ) {
690 let data = ranges
691 .iter()
692 .map(|range| test_file_slice(range.clone()))
693 .collect::<Vec<_>>();
694 metadata_decoder.push_ranges(ranges, data).unwrap();
695 }
696
697 fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
699 match result.expect("Expected Ok(DecodeResult::Data(T))") {
700 DecodeResult::Data(data) => data,
701 result => panic!("Expected DecodeResult::Data, got {result:?}"),
702 }
703 }
704
705 fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
707 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
708 DecodeResult::NeedsData(ranges) => ranges,
709 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
710 }
711 }
712
713 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
714 match result.expect("Expected Ok(DecodeResult::Finished)") {
715 DecodeResult::Finished => {}
716 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
717 }
718 }
719}