1use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30 metadata::*,
31 properties::{ReaderProperties, ReaderPropertiesPtr},
32 reader::*,
33 statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39#[cfg(feature = "encryption")]
40use crate::thrift::TCompactSliceInputProtocol;
41use crate::thrift::TSerializable;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45use thrift::protocol::TCompactInputProtocol;
46
47impl TryFrom<File> for SerializedFileReader<File> {
48 type Error = ParquetError;
49
50 fn try_from(file: File) -> Result<Self> {
51 Self::new(file)
52 }
53}
54
55impl TryFrom<&Path> for SerializedFileReader<File> {
56 type Error = ParquetError;
57
58 fn try_from(path: &Path) -> Result<Self> {
59 let file = File::open(path)?;
60 Self::try_from(file)
61 }
62}
63
64impl TryFrom<String> for SerializedFileReader<File> {
65 type Error = ParquetError;
66
67 fn try_from(path: String) -> Result<Self> {
68 Self::try_from(Path::new(&path))
69 }
70}
71
72impl TryFrom<&str> for SerializedFileReader<File> {
73 type Error = ParquetError;
74
75 fn try_from(path: &str) -> Result<Self> {
76 Self::try_from(Path::new(&path))
77 }
78}
79
80impl IntoIterator for SerializedFileReader<File> {
83 type Item = Result<Row>;
84 type IntoIter = RowIter<'static>;
85
86 fn into_iter(self) -> Self::IntoIter {
87 RowIter::from_file_into(Box::new(self))
88 }
89}
90
91pub struct SerializedFileReader<R: ChunkReader> {
96 chunk_reader: Arc<R>,
97 metadata: Arc<ParquetMetaData>,
98 props: ReaderPropertiesPtr,
99}
100
101pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
105
106#[derive(Default)]
110pub struct ReadOptionsBuilder {
111 predicates: Vec<ReadGroupPredicate>,
112 enable_page_index: bool,
113 props: Option<ReaderProperties>,
114}
115
116impl ReadOptionsBuilder {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125 self.predicates.push(predicate);
126 self
127 }
128
129 pub fn with_range(mut self, start: i64, end: i64) -> Self {
132 assert!(start < end);
133 let predicate = move |rg: &RowGroupMetaData, _: usize| {
134 let mid = get_midpoint_offset(rg);
135 mid >= start && mid < end
136 };
137 self.predicates.push(Box::new(predicate));
138 self
139 }
140
141 pub fn with_page_index(mut self) -> Self {
146 self.enable_page_index = true;
147 self
148 }
149
150 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152 self.props = Some(properties);
153 self
154 }
155
156 pub fn build(self) -> ReadOptions {
158 let props = self
159 .props
160 .unwrap_or_else(|| ReaderProperties::builder().build());
161 ReadOptions {
162 predicates: self.predicates,
163 enable_page_index: self.enable_page_index,
164 props,
165 }
166 }
167}
168
169pub struct ReadOptions {
174 predicates: Vec<ReadGroupPredicate>,
175 enable_page_index: bool,
176 props: ReaderProperties,
177}
178
179impl<R: 'static + ChunkReader> SerializedFileReader<R> {
180 pub fn new(chunk_reader: R) -> Result<Self> {
183 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
184 let props = Arc::new(ReaderProperties::builder().build());
185 Ok(Self {
186 chunk_reader: Arc::new(chunk_reader),
187 metadata: Arc::new(metadata),
188 props,
189 })
190 }
191
192 #[allow(deprecated)]
195 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
196 let mut metadata_builder = ParquetMetaDataReader::new()
197 .parse_and_finish(&chunk_reader)?
198 .into_builder();
199 let mut predicates = options.predicates;
200
201 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
203 let mut keep = true;
204 for predicate in &mut predicates {
205 if !predicate(&rg_meta, i) {
206 keep = false;
207 break;
208 }
209 }
210 if keep {
211 metadata_builder = metadata_builder.add_row_group(rg_meta);
212 }
213 }
214
215 let mut metadata = metadata_builder.build();
216
217 if options.enable_page_index {
219 let mut reader =
220 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
221 reader.read_page_indexes(&chunk_reader)?;
222 metadata = reader.finish()?;
223 }
224
225 Ok(Self {
226 chunk_reader: Arc::new(chunk_reader),
227 metadata: Arc::new(metadata),
228 props: Arc::new(options.props),
229 })
230 }
231}
232
233fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
235 let col = meta.column(0);
236 let mut offset = col.data_page_offset();
237 if let Some(dic_offset) = col.dictionary_page_offset() {
238 if offset > dic_offset {
239 offset = dic_offset
240 }
241 };
242 offset + meta.compressed_size() / 2
243}
244
245impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
246 fn metadata(&self) -> &ParquetMetaData {
247 &self.metadata
248 }
249
250 fn num_row_groups(&self) -> usize {
251 self.metadata.num_row_groups()
252 }
253
254 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
255 let row_group_metadata = self.metadata.row_group(i);
256 let props = Arc::clone(&self.props);
258 let f = Arc::clone(&self.chunk_reader);
259 Ok(Box::new(SerializedRowGroupReader::new(
260 f,
261 row_group_metadata,
262 self.metadata.offset_index().map(|x| x[i].as_slice()),
263 props,
264 )?))
265 }
266
267 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
268 RowIter::from_file(projection, self)
269 }
270}
271
272pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
274 chunk_reader: Arc<R>,
275 metadata: &'a RowGroupMetaData,
276 offset_index: Option<&'a [OffsetIndexMetaData]>,
277 props: ReaderPropertiesPtr,
278 bloom_filters: Vec<Option<Sbbf>>,
279}
280
281impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
282 pub fn new(
284 chunk_reader: Arc<R>,
285 metadata: &'a RowGroupMetaData,
286 offset_index: Option<&'a [OffsetIndexMetaData]>,
287 props: ReaderPropertiesPtr,
288 ) -> Result<Self> {
289 let bloom_filters = if props.read_bloom_filter() {
290 metadata
291 .columns()
292 .iter()
293 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
294 .collect::<Result<Vec<_>>>()?
295 } else {
296 std::iter::repeat_n(None, metadata.columns().len()).collect()
297 };
298 Ok(Self {
299 chunk_reader,
300 metadata,
301 offset_index,
302 props,
303 bloom_filters,
304 })
305 }
306}
307
308impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
309 fn metadata(&self) -> &RowGroupMetaData {
310 self.metadata
311 }
312
313 fn num_columns(&self) -> usize {
314 self.metadata.num_columns()
315 }
316
317 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
319 let col = self.metadata.column(i);
320
321 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
322
323 let props = Arc::clone(&self.props);
324 Ok(Box::new(SerializedPageReader::new_with_properties(
325 Arc::clone(&self.chunk_reader),
326 col,
327 usize::try_from(self.metadata.num_rows())?,
328 page_locations,
329 props,
330 )?))
331 }
332
333 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
335 self.bloom_filters[i].as_ref()
336 }
337
338 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
339 RowIter::from_row_group(projection, self)
340 }
341}
342
343pub(crate) fn decode_page(
345 page_header: PageHeader,
346 buffer: Bytes,
347 physical_type: Type,
348 decompressor: Option<&mut Box<dyn Codec>>,
349) -> Result<Page> {
350 #[cfg(feature = "crc")]
352 if let Some(expected_crc) = page_header.crc {
353 let crc = crc32fast::hash(&buffer);
354 if crc != expected_crc as u32 {
355 return Err(general_err!("Page CRC checksum mismatch"));
356 }
357 }
358
359 let mut offset: usize = 0;
366 let mut can_decompress = true;
367
368 if let Some(ref header_v2) = page_header.data_page_header_v2 {
369 if header_v2.definition_levels_byte_length < 0
370 || header_v2.repetition_levels_byte_length < 0
371 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
372 > page_header.uncompressed_page_size
373 {
374 return Err(general_err!(
375 "DataPage v2 header contains implausible values \
376 for definition_levels_byte_length ({}) \
377 and repetition_levels_byte_length ({}) \
378 given DataPage header provides uncompressed_page_size ({})",
379 header_v2.definition_levels_byte_length,
380 header_v2.repetition_levels_byte_length,
381 page_header.uncompressed_page_size
382 ));
383 }
384 offset = usize::try_from(
385 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
386 )?;
387 can_decompress = header_v2.is_compressed.unwrap_or(true);
389 }
390
391 let buffer = match decompressor {
394 Some(decompressor) if can_decompress => {
395 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
396 let decompressed_size = uncompressed_page_size - offset;
397 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
398 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
399 if decompressed_size > 0 {
400 let compressed = &buffer.as_ref()[offset..];
401 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
402 }
403
404 if decompressed.len() != uncompressed_page_size {
405 return Err(general_err!(
406 "Actual decompressed size doesn't match the expected one ({} vs {})",
407 decompressed.len(),
408 uncompressed_page_size
409 ));
410 }
411
412 Bytes::from(decompressed)
413 }
414 _ => buffer,
415 };
416
417 let result = match page_header.type_ {
418 PageType::DICTIONARY_PAGE => {
419 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
420 ParquetError::General("Missing dictionary page header".to_string())
421 })?;
422 let is_sorted = dict_header.is_sorted.unwrap_or(false);
423 Page::DictionaryPage {
424 buf: buffer,
425 num_values: dict_header.num_values.try_into()?,
426 encoding: Encoding::try_from(dict_header.encoding)?,
427 is_sorted,
428 }
429 }
430 PageType::DATA_PAGE => {
431 let header = page_header
432 .data_page_header
433 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
434 Page::DataPage {
435 buf: buffer,
436 num_values: header.num_values.try_into()?,
437 encoding: Encoding::try_from(header.encoding)?,
438 def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
439 rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
440 statistics: statistics::from_thrift(physical_type, header.statistics)?,
441 }
442 }
443 PageType::DATA_PAGE_V2 => {
444 let header = page_header
445 .data_page_header_v2
446 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
447 let is_compressed = header.is_compressed.unwrap_or(true);
448 Page::DataPageV2 {
449 buf: buffer,
450 num_values: header.num_values.try_into()?,
451 encoding: Encoding::try_from(header.encoding)?,
452 num_nulls: header.num_nulls.try_into()?,
453 num_rows: header.num_rows.try_into()?,
454 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
455 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
456 is_compressed,
457 statistics: statistics::from_thrift(physical_type, header.statistics)?,
458 }
459 }
460 _ => {
461 unimplemented!("Page type {:?} is not supported", page_header.type_)
463 }
464 };
465
466 Ok(result)
467}
468
469enum SerializedPageReaderState {
470 Values {
471 offset: u64,
474
475 remaining_bytes: u64,
478
479 next_page_header: Option<Box<PageHeader>>,
481
482 page_index: usize,
484
485 require_dictionary: bool,
487 },
488 Pages {
489 page_locations: VecDeque<PageLocation>,
491 dictionary_page: Option<PageLocation>,
493 total_rows: usize,
495 page_index: usize,
497 },
498}
499
500#[derive(Default)]
501struct SerializedPageReaderContext {
502 #[cfg(feature = "encryption")]
504 crypto_context: Option<Arc<CryptoContext>>,
505}
506
507pub struct SerializedPageReader<R: ChunkReader> {
509 reader: Arc<R>,
511
512 decompressor: Option<Box<dyn Codec>>,
514
515 physical_type: Type,
517
518 state: SerializedPageReaderState,
519
520 context: SerializedPageReaderContext,
521}
522
523impl<R: ChunkReader> SerializedPageReader<R> {
524 pub fn new(
526 reader: Arc<R>,
527 column_chunk_metadata: &ColumnChunkMetaData,
528 total_rows: usize,
529 page_locations: Option<Vec<PageLocation>>,
530 ) -> Result<Self> {
531 let props = Arc::new(ReaderProperties::builder().build());
532 SerializedPageReader::new_with_properties(
533 reader,
534 column_chunk_metadata,
535 total_rows,
536 page_locations,
537 props,
538 )
539 }
540
541 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
543 pub(crate) fn add_crypto_context(
544 self,
545 _rg_idx: usize,
546 _column_idx: usize,
547 _parquet_meta_data: &ParquetMetaData,
548 _column_chunk_metadata: &ColumnChunkMetaData,
549 ) -> Result<SerializedPageReader<R>> {
550 Ok(self)
551 }
552
553 #[cfg(feature = "encryption")]
555 pub(crate) fn add_crypto_context(
556 mut self,
557 rg_idx: usize,
558 column_idx: usize,
559 parquet_meta_data: &ParquetMetaData,
560 column_chunk_metadata: &ColumnChunkMetaData,
561 ) -> Result<SerializedPageReader<R>> {
562 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
563 return Ok(self);
564 };
565 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
566 return Ok(self);
567 };
568 let crypto_context =
569 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
570 self.context.crypto_context = Some(Arc::new(crypto_context));
571 Ok(self)
572 }
573
574 pub fn new_with_properties(
576 reader: Arc<R>,
577 meta: &ColumnChunkMetaData,
578 total_rows: usize,
579 page_locations: Option<Vec<PageLocation>>,
580 props: ReaderPropertiesPtr,
581 ) -> Result<Self> {
582 let decompressor = create_codec(meta.compression(), props.codec_options())?;
583 let (start, len) = meta.byte_range();
584
585 let state = match page_locations {
586 Some(locations) => {
587 let dictionary_page = match locations.first() {
590 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
591 offset: start as i64,
592 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
593 first_row_index: 0,
594 }),
595 _ => None,
596 };
597
598 SerializedPageReaderState::Pages {
599 page_locations: locations.into(),
600 dictionary_page,
601 total_rows,
602 page_index: 0,
603 }
604 }
605 None => SerializedPageReaderState::Values {
606 offset: start,
607 remaining_bytes: len,
608 next_page_header: None,
609 page_index: 0,
610 require_dictionary: meta.dictionary_page_offset().is_some(),
611 },
612 };
613 Ok(Self {
614 reader,
615 decompressor,
616 state,
617 physical_type: meta.column_type(),
618 context: Default::default(),
619 })
620 }
621
622 #[cfg(test)]
628 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
629 match &mut self.state {
630 SerializedPageReaderState::Values {
631 offset,
632 remaining_bytes,
633 next_page_header,
634 page_index,
635 require_dictionary,
636 } => {
637 loop {
638 if *remaining_bytes == 0 {
639 return Ok(None);
640 }
641 return if let Some(header) = next_page_header.as_ref() {
642 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
643 Ok(Some(*offset))
644 } else {
645 *next_page_header = None;
647 continue;
648 }
649 } else {
650 let mut read = self.reader.get_read(*offset)?;
651 let (header_len, header) = Self::read_page_header_len(
652 &self.context,
653 &mut read,
654 *page_index,
655 *require_dictionary,
656 )?;
657 *offset += header_len as u64;
658 *remaining_bytes -= header_len as u64;
659 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
660 Ok(Some(*offset))
661 } else {
662 continue;
664 };
665 *next_page_header = Some(Box::new(header));
666 page_meta
667 };
668 }
669 }
670 SerializedPageReaderState::Pages {
671 page_locations,
672 dictionary_page,
673 ..
674 } => {
675 if let Some(page) = dictionary_page {
676 Ok(Some(page.offset as u64))
677 } else if let Some(page) = page_locations.front() {
678 Ok(Some(page.offset as u64))
679 } else {
680 Ok(None)
681 }
682 }
683 }
684 }
685
686 fn read_page_header_len<T: Read>(
687 context: &SerializedPageReaderContext,
688 input: &mut T,
689 page_index: usize,
690 dictionary_page: bool,
691 ) -> Result<(usize, PageHeader)> {
692 struct TrackedRead<R> {
694 inner: R,
695 bytes_read: usize,
696 }
697
698 impl<R: Read> Read for TrackedRead<R> {
699 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
700 let v = self.inner.read(buf)?;
701 self.bytes_read += v;
702 Ok(v)
703 }
704 }
705
706 let mut tracked = TrackedRead {
707 inner: input,
708 bytes_read: 0,
709 };
710 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
711 Ok((tracked.bytes_read, header))
712 }
713
714 fn read_page_header_len_from_bytes(
715 context: &SerializedPageReaderContext,
716 buffer: &[u8],
717 page_index: usize,
718 dictionary_page: bool,
719 ) -> Result<(usize, PageHeader)> {
720 let mut input = std::io::Cursor::new(buffer);
721 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
722 let header_len = input.position() as usize;
723 Ok((header_len, header))
724 }
725}
726
727#[cfg(not(feature = "encryption"))]
728impl SerializedPageReaderContext {
729 fn read_page_header<T: Read>(
730 &self,
731 input: &mut T,
732 _page_index: usize,
733 _dictionary_page: bool,
734 ) -> Result<PageHeader> {
735 let mut prot = TCompactInputProtocol::new(input);
736 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
737 }
738
739 fn decrypt_page_data<T>(
740 &self,
741 buffer: T,
742 _page_index: usize,
743 _dictionary_page: bool,
744 ) -> Result<T> {
745 Ok(buffer)
746 }
747}
748
749#[cfg(feature = "encryption")]
750impl SerializedPageReaderContext {
751 fn read_page_header<T: Read>(
752 &self,
753 input: &mut T,
754 page_index: usize,
755 dictionary_page: bool,
756 ) -> Result<PageHeader> {
757 match self.page_crypto_context(page_index, dictionary_page) {
758 None => {
759 let mut prot = TCompactInputProtocol::new(input);
760 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
761 }
762 Some(page_crypto_context) => {
763 let data_decryptor = page_crypto_context.data_decryptor();
764 let aad = page_crypto_context.create_page_header_aad()?;
765
766 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
767 ParquetError::General(format!(
768 "Error decrypting page header for column {}, decryption key may be wrong",
769 page_crypto_context.column_ordinal
770 ))
771 })?;
772
773 let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
774 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
775 }
776 }
777 }
778
779 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
780 where
781 T: AsRef<[u8]>,
782 T: From<Vec<u8>>,
783 {
784 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
785 if let Some(page_crypto_context) = page_crypto_context {
786 let decryptor = page_crypto_context.data_decryptor();
787 let aad = page_crypto_context.create_page_aad()?;
788 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
789 Ok(T::from(decrypted))
790 } else {
791 Ok(buffer)
792 }
793 }
794
795 fn page_crypto_context(
796 &self,
797 page_index: usize,
798 dictionary_page: bool,
799 ) -> Option<Arc<CryptoContext>> {
800 self.crypto_context.as_ref().map(|c| {
801 Arc::new(if dictionary_page {
802 c.for_dictionary_page()
803 } else {
804 c.with_page_ordinal(page_index)
805 })
806 })
807 }
808}
809
810impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
811 type Item = Result<Page>;
812
813 fn next(&mut self) -> Option<Self::Item> {
814 self.get_next_page().transpose()
815 }
816}
817
818fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
819 if header_len as u64 > remaining_bytes {
820 return Err(eof_err!("Invalid page header"));
821 }
822 Ok(())
823}
824
825fn verify_page_size(
826 compressed_size: i32,
827 uncompressed_size: i32,
828 remaining_bytes: u64,
829) -> Result<()> {
830 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
834 return Err(eof_err!("Invalid page header"));
835 }
836 Ok(())
837}
838
839impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
840 fn get_next_page(&mut self) -> Result<Option<Page>> {
841 loop {
842 let page = match &mut self.state {
843 SerializedPageReaderState::Values {
844 offset,
845 remaining_bytes: remaining,
846 next_page_header,
847 page_index,
848 require_dictionary,
849 } => {
850 if *remaining == 0 {
851 return Ok(None);
852 }
853
854 let mut read = self.reader.get_read(*offset)?;
855 let header = if let Some(header) = next_page_header.take() {
856 *header
857 } else {
858 let (header_len, header) = Self::read_page_header_len(
859 &self.context,
860 &mut read,
861 *page_index,
862 *require_dictionary,
863 )?;
864 verify_page_header_len(header_len, *remaining)?;
865 *offset += header_len as u64;
866 *remaining -= header_len as u64;
867 header
868 };
869 verify_page_size(
870 header.compressed_page_size,
871 header.uncompressed_page_size,
872 *remaining,
873 )?;
874 let data_len = header.compressed_page_size as usize;
875 *offset += data_len as u64;
876 *remaining -= data_len as u64;
877
878 if header.type_ == PageType::INDEX_PAGE {
879 continue;
880 }
881
882 let mut buffer = Vec::with_capacity(data_len);
883 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
884
885 if read != data_len {
886 return Err(eof_err!(
887 "Expected to read {} bytes of page, read only {}",
888 data_len,
889 read
890 ));
891 }
892
893 let buffer =
894 self.context
895 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
896
897 let page = decode_page(
898 header,
899 Bytes::from(buffer),
900 self.physical_type,
901 self.decompressor.as_mut(),
902 )?;
903 if page.is_data_page() {
904 *page_index += 1;
905 } else if page.is_dictionary_page() {
906 *require_dictionary = false;
907 }
908 page
909 }
910 SerializedPageReaderState::Pages {
911 page_locations,
912 dictionary_page,
913 page_index,
914 ..
915 } => {
916 let (front, is_dictionary_page) = match dictionary_page.take() {
917 Some(front) => (front, true),
918 None => match page_locations.pop_front() {
919 Some(front) => (front, false),
920 None => return Ok(None),
921 },
922 };
923
924 let page_len = usize::try_from(front.compressed_page_size)?;
925 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
926
927 let (offset, header) = Self::read_page_header_len_from_bytes(
928 &self.context,
929 buffer.as_ref(),
930 *page_index,
931 is_dictionary_page,
932 )?;
933 let bytes = buffer.slice(offset..);
934 let bytes =
935 self.context
936 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
937
938 if !is_dictionary_page {
939 *page_index += 1;
940 }
941 decode_page(
942 header,
943 bytes,
944 self.physical_type,
945 self.decompressor.as_mut(),
946 )?
947 }
948 };
949
950 return Ok(Some(page));
951 }
952 }
953
954 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
955 match &mut self.state {
956 SerializedPageReaderState::Values {
957 offset,
958 remaining_bytes,
959 next_page_header,
960 page_index,
961 require_dictionary,
962 } => {
963 loop {
964 if *remaining_bytes == 0 {
965 return Ok(None);
966 }
967 return if let Some(header) = next_page_header.as_ref() {
968 if let Ok(page_meta) = (&**header).try_into() {
969 Ok(Some(page_meta))
970 } else {
971 *next_page_header = None;
973 continue;
974 }
975 } else {
976 let mut read = self.reader.get_read(*offset)?;
977 let (header_len, header) = Self::read_page_header_len(
978 &self.context,
979 &mut read,
980 *page_index,
981 *require_dictionary,
982 )?;
983 verify_page_header_len(header_len, *remaining_bytes)?;
984 *offset += header_len as u64;
985 *remaining_bytes -= header_len as u64;
986 let page_meta = if let Ok(page_meta) = (&header).try_into() {
987 Ok(Some(page_meta))
988 } else {
989 continue;
991 };
992 *next_page_header = Some(Box::new(header));
993 page_meta
994 };
995 }
996 }
997 SerializedPageReaderState::Pages {
998 page_locations,
999 dictionary_page,
1000 total_rows,
1001 page_index: _,
1002 } => {
1003 if dictionary_page.is_some() {
1004 Ok(Some(PageMetadata {
1005 num_rows: None,
1006 num_levels: None,
1007 is_dict: true,
1008 }))
1009 } else if let Some(page) = page_locations.front() {
1010 let next_rows = page_locations
1011 .get(1)
1012 .map(|x| x.first_row_index as usize)
1013 .unwrap_or(*total_rows);
1014
1015 Ok(Some(PageMetadata {
1016 num_rows: Some(next_rows - page.first_row_index as usize),
1017 num_levels: None,
1018 is_dict: false,
1019 }))
1020 } else {
1021 Ok(None)
1022 }
1023 }
1024 }
1025 }
1026
1027 fn skip_next_page(&mut self) -> Result<()> {
1028 match &mut self.state {
1029 SerializedPageReaderState::Values {
1030 offset,
1031 remaining_bytes,
1032 next_page_header,
1033 page_index,
1034 require_dictionary,
1035 } => {
1036 if let Some(buffered_header) = next_page_header.take() {
1037 verify_page_size(
1038 buffered_header.compressed_page_size,
1039 buffered_header.uncompressed_page_size,
1040 *remaining_bytes,
1041 )?;
1042 *offset += buffered_header.compressed_page_size as u64;
1044 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1045 } else {
1046 let mut read = self.reader.get_read(*offset)?;
1047 let (header_len, header) = Self::read_page_header_len(
1048 &self.context,
1049 &mut read,
1050 *page_index,
1051 *require_dictionary,
1052 )?;
1053 verify_page_header_len(header_len, *remaining_bytes)?;
1054 verify_page_size(
1055 header.compressed_page_size,
1056 header.uncompressed_page_size,
1057 *remaining_bytes,
1058 )?;
1059 let data_page_size = header.compressed_page_size as u64;
1060 *offset += header_len as u64 + data_page_size;
1061 *remaining_bytes -= header_len as u64 + data_page_size;
1062 }
1063 if *require_dictionary {
1064 *require_dictionary = false;
1065 } else {
1066 *page_index += 1;
1067 }
1068 Ok(())
1069 }
1070 SerializedPageReaderState::Pages {
1071 page_locations,
1072 dictionary_page,
1073 page_index,
1074 ..
1075 } => {
1076 if dictionary_page.is_some() {
1077 dictionary_page.take();
1079 } else {
1080 if page_locations.pop_front().is_some() {
1082 *page_index += 1;
1083 }
1084 }
1085
1086 Ok(())
1087 }
1088 }
1089 }
1090
1091 fn at_record_boundary(&mut self) -> Result<bool> {
1092 match &mut self.state {
1093 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1094 SerializedPageReaderState::Pages { .. } => Ok(true),
1095 }
1096 }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use std::collections::HashSet;
1102
1103 use bytes::Buf;
1104
1105 use crate::file::properties::{EnabledStatistics, WriterProperties};
1106 use crate::format::BoundaryOrder;
1107
1108 use crate::basic::{self, ColumnOrder, SortOrder};
1109 use crate::column::reader::ColumnReader;
1110 use crate::data_type::private::ParquetValueType;
1111 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1112 use crate::file::page_index::index::{Index, NativeIndex};
1113 #[allow(deprecated)]
1114 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1115 use crate::file::writer::SerializedFileWriter;
1116 use crate::record::RowAccessor;
1117 use crate::schema::parser::parse_message_type;
1118 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1119
1120 use super::*;
1121
1122 #[test]
1123 fn test_cursor_and_file_has_the_same_behaviour() {
1124 let mut buf: Vec<u8> = Vec::new();
1125 get_test_file("alltypes_plain.parquet")
1126 .read_to_end(&mut buf)
1127 .unwrap();
1128 let cursor = Bytes::from(buf);
1129 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1130
1131 let test_file = get_test_file("alltypes_plain.parquet");
1132 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1133
1134 let file_iter = read_from_file.get_row_iter(None).unwrap();
1135 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1136
1137 for (a, b) in file_iter.zip(cursor_iter) {
1138 assert_eq!(a.unwrap(), b.unwrap())
1139 }
1140 }
1141
1142 #[test]
1143 fn test_file_reader_try_from() {
1144 let test_file = get_test_file("alltypes_plain.parquet");
1146 let test_path_buf = get_test_path("alltypes_plain.parquet");
1147 let test_path = test_path_buf.as_path();
1148 let test_path_str = test_path.to_str().unwrap();
1149
1150 let reader = SerializedFileReader::try_from(test_file);
1151 assert!(reader.is_ok());
1152
1153 let reader = SerializedFileReader::try_from(test_path);
1154 assert!(reader.is_ok());
1155
1156 let reader = SerializedFileReader::try_from(test_path_str);
1157 assert!(reader.is_ok());
1158
1159 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1160 assert!(reader.is_ok());
1161
1162 let test_path = Path::new("invalid.parquet");
1164 let test_path_str = test_path.to_str().unwrap();
1165
1166 let reader = SerializedFileReader::try_from(test_path);
1167 assert!(reader.is_err());
1168
1169 let reader = SerializedFileReader::try_from(test_path_str);
1170 assert!(reader.is_err());
1171
1172 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1173 assert!(reader.is_err());
1174 }
1175
1176 #[test]
1177 fn test_file_reader_into_iter() {
1178 let path = get_test_path("alltypes_plain.parquet");
1179 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1180 let iter = reader.into_iter();
1181 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1182
1183 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1184 }
1185
1186 #[test]
1187 fn test_file_reader_into_iter_project() {
1188 let path = get_test_path("alltypes_plain.parquet");
1189 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1190 let schema = "message schema { OPTIONAL INT32 id; }";
1191 let proj = parse_message_type(schema).ok();
1192 let iter = reader.into_iter().project(proj).unwrap();
1193 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1194
1195 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1196 }
1197
1198 #[test]
1199 fn test_reuse_file_chunk() {
1200 let test_file = get_test_file("alltypes_plain.parquet");
1204 let reader = SerializedFileReader::new(test_file).unwrap();
1205 let row_group = reader.get_row_group(0).unwrap();
1206
1207 let mut page_readers = Vec::new();
1208 for i in 0..row_group.num_columns() {
1209 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1210 }
1211
1212 for mut page_reader in page_readers {
1215 assert!(page_reader.get_next_page().is_ok());
1216 }
1217 }
1218
1219 #[test]
1220 fn test_file_reader() {
1221 let test_file = get_test_file("alltypes_plain.parquet");
1222 let reader_result = SerializedFileReader::new(test_file);
1223 assert!(reader_result.is_ok());
1224 let reader = reader_result.unwrap();
1225
1226 let metadata = reader.metadata();
1228 assert_eq!(metadata.num_row_groups(), 1);
1229
1230 let file_metadata = metadata.file_metadata();
1232 assert!(file_metadata.created_by().is_some());
1233 assert_eq!(
1234 file_metadata.created_by().unwrap(),
1235 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1236 );
1237 assert!(file_metadata.key_value_metadata().is_none());
1238 assert_eq!(file_metadata.num_rows(), 8);
1239 assert_eq!(file_metadata.version(), 1);
1240 assert_eq!(file_metadata.column_orders(), None);
1241
1242 let row_group_metadata = metadata.row_group(0);
1244 assert_eq!(row_group_metadata.num_columns(), 11);
1245 assert_eq!(row_group_metadata.num_rows(), 8);
1246 assert_eq!(row_group_metadata.total_byte_size(), 671);
1247 for i in 0..row_group_metadata.num_columns() {
1249 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1250 }
1251
1252 let row_group_reader_result = reader.get_row_group(0);
1254 assert!(row_group_reader_result.is_ok());
1255 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1256 assert_eq!(
1257 row_group_reader.num_columns(),
1258 row_group_metadata.num_columns()
1259 );
1260 assert_eq!(
1261 row_group_reader.metadata().total_byte_size(),
1262 row_group_metadata.total_byte_size()
1263 );
1264
1265 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1268 assert!(page_reader_0_result.is_ok());
1269 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1270 let mut page_count = 0;
1271 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1272 let is_expected_page = match page {
1273 Page::DictionaryPage {
1274 buf,
1275 num_values,
1276 encoding,
1277 is_sorted,
1278 } => {
1279 assert_eq!(buf.len(), 32);
1280 assert_eq!(num_values, 8);
1281 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1282 assert!(!is_sorted);
1283 true
1284 }
1285 Page::DataPage {
1286 buf,
1287 num_values,
1288 encoding,
1289 def_level_encoding,
1290 rep_level_encoding,
1291 statistics,
1292 } => {
1293 assert_eq!(buf.len(), 11);
1294 assert_eq!(num_values, 8);
1295 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1296 assert_eq!(def_level_encoding, Encoding::RLE);
1297 #[allow(deprecated)]
1298 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1299 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1300 assert!(statistics.is_none());
1301 true
1302 }
1303 _ => false,
1304 };
1305 assert!(is_expected_page);
1306 page_count += 1;
1307 }
1308 assert_eq!(page_count, 2);
1309 }
1310
1311 #[test]
1312 fn test_file_reader_datapage_v2() {
1313 let test_file = get_test_file("datapage_v2.snappy.parquet");
1314 let reader_result = SerializedFileReader::new(test_file);
1315 assert!(reader_result.is_ok());
1316 let reader = reader_result.unwrap();
1317
1318 let metadata = reader.metadata();
1320 assert_eq!(metadata.num_row_groups(), 1);
1321
1322 let file_metadata = metadata.file_metadata();
1324 assert!(file_metadata.created_by().is_some());
1325 assert_eq!(
1326 file_metadata.created_by().unwrap(),
1327 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1328 );
1329 assert!(file_metadata.key_value_metadata().is_some());
1330 assert_eq!(
1331 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1332 1
1333 );
1334
1335 assert_eq!(file_metadata.num_rows(), 5);
1336 assert_eq!(file_metadata.version(), 1);
1337 assert_eq!(file_metadata.column_orders(), None);
1338
1339 let row_group_metadata = metadata.row_group(0);
1340
1341 for i in 0..row_group_metadata.num_columns() {
1343 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1344 }
1345
1346 let row_group_reader_result = reader.get_row_group(0);
1348 assert!(row_group_reader_result.is_ok());
1349 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1350 assert_eq!(
1351 row_group_reader.num_columns(),
1352 row_group_metadata.num_columns()
1353 );
1354 assert_eq!(
1355 row_group_reader.metadata().total_byte_size(),
1356 row_group_metadata.total_byte_size()
1357 );
1358
1359 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1362 assert!(page_reader_0_result.is_ok());
1363 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1364 let mut page_count = 0;
1365 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1366 let is_expected_page = match page {
1367 Page::DictionaryPage {
1368 buf,
1369 num_values,
1370 encoding,
1371 is_sorted,
1372 } => {
1373 assert_eq!(buf.len(), 7);
1374 assert_eq!(num_values, 1);
1375 assert_eq!(encoding, Encoding::PLAIN);
1376 assert!(!is_sorted);
1377 true
1378 }
1379 Page::DataPageV2 {
1380 buf,
1381 num_values,
1382 encoding,
1383 num_nulls,
1384 num_rows,
1385 def_levels_byte_len,
1386 rep_levels_byte_len,
1387 is_compressed,
1388 statistics,
1389 } => {
1390 assert_eq!(buf.len(), 4);
1391 assert_eq!(num_values, 5);
1392 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1393 assert_eq!(num_nulls, 1);
1394 assert_eq!(num_rows, 5);
1395 assert_eq!(def_levels_byte_len, 2);
1396 assert_eq!(rep_levels_byte_len, 0);
1397 assert!(is_compressed);
1398 assert!(statistics.is_some());
1399 true
1400 }
1401 _ => false,
1402 };
1403 assert!(is_expected_page);
1404 page_count += 1;
1405 }
1406 assert_eq!(page_count, 2);
1407 }
1408
1409 #[test]
1410 fn test_file_reader_empty_compressed_datapage_v2() {
1411 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1413 let reader_result = SerializedFileReader::new(test_file);
1414 assert!(reader_result.is_ok());
1415 let reader = reader_result.unwrap();
1416
1417 let metadata = reader.metadata();
1419 assert_eq!(metadata.num_row_groups(), 1);
1420
1421 let file_metadata = metadata.file_metadata();
1423 assert!(file_metadata.created_by().is_some());
1424 assert_eq!(
1425 file_metadata.created_by().unwrap(),
1426 "parquet-cpp-arrow version 14.0.2"
1427 );
1428 assert!(file_metadata.key_value_metadata().is_some());
1429 assert_eq!(
1430 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1431 1
1432 );
1433
1434 assert_eq!(file_metadata.num_rows(), 10);
1435 assert_eq!(file_metadata.version(), 2);
1436 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1437 assert_eq!(
1438 file_metadata.column_orders(),
1439 Some(vec![expected_order].as_ref())
1440 );
1441
1442 let row_group_metadata = metadata.row_group(0);
1443
1444 for i in 0..row_group_metadata.num_columns() {
1446 assert_eq!(file_metadata.column_order(i), expected_order);
1447 }
1448
1449 let row_group_reader_result = reader.get_row_group(0);
1451 assert!(row_group_reader_result.is_ok());
1452 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1453 assert_eq!(
1454 row_group_reader.num_columns(),
1455 row_group_metadata.num_columns()
1456 );
1457 assert_eq!(
1458 row_group_reader.metadata().total_byte_size(),
1459 row_group_metadata.total_byte_size()
1460 );
1461
1462 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1464 assert!(page_reader_0_result.is_ok());
1465 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1466 let mut page_count = 0;
1467 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1468 let is_expected_page = match page {
1469 Page::DictionaryPage {
1470 buf,
1471 num_values,
1472 encoding,
1473 is_sorted,
1474 } => {
1475 assert_eq!(buf.len(), 0);
1476 assert_eq!(num_values, 0);
1477 assert_eq!(encoding, Encoding::PLAIN);
1478 assert!(!is_sorted);
1479 true
1480 }
1481 Page::DataPageV2 {
1482 buf,
1483 num_values,
1484 encoding,
1485 num_nulls,
1486 num_rows,
1487 def_levels_byte_len,
1488 rep_levels_byte_len,
1489 is_compressed,
1490 statistics,
1491 } => {
1492 assert_eq!(buf.len(), 3);
1493 assert_eq!(num_values, 10);
1494 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1495 assert_eq!(num_nulls, 10);
1496 assert_eq!(num_rows, 10);
1497 assert_eq!(def_levels_byte_len, 2);
1498 assert_eq!(rep_levels_byte_len, 0);
1499 assert!(is_compressed);
1500 assert!(statistics.is_some());
1501 true
1502 }
1503 _ => false,
1504 };
1505 assert!(is_expected_page);
1506 page_count += 1;
1507 }
1508 assert_eq!(page_count, 2);
1509 }
1510
1511 #[test]
1512 fn test_file_reader_empty_datapage_v2() {
1513 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1515 let reader_result = SerializedFileReader::new(test_file);
1516 assert!(reader_result.is_ok());
1517 let reader = reader_result.unwrap();
1518
1519 let metadata = reader.metadata();
1521 assert_eq!(metadata.num_row_groups(), 1);
1522
1523 let file_metadata = metadata.file_metadata();
1525 assert!(file_metadata.created_by().is_some());
1526 assert_eq!(
1527 file_metadata.created_by().unwrap(),
1528 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1529 );
1530 assert!(file_metadata.key_value_metadata().is_some());
1531 assert_eq!(
1532 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1533 2
1534 );
1535
1536 assert_eq!(file_metadata.num_rows(), 1);
1537 assert_eq!(file_metadata.version(), 1);
1538 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1539 assert_eq!(
1540 file_metadata.column_orders(),
1541 Some(vec![expected_order].as_ref())
1542 );
1543
1544 let row_group_metadata = metadata.row_group(0);
1545
1546 for i in 0..row_group_metadata.num_columns() {
1548 assert_eq!(file_metadata.column_order(i), expected_order);
1549 }
1550
1551 let row_group_reader_result = reader.get_row_group(0);
1553 assert!(row_group_reader_result.is_ok());
1554 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1555 assert_eq!(
1556 row_group_reader.num_columns(),
1557 row_group_metadata.num_columns()
1558 );
1559 assert_eq!(
1560 row_group_reader.metadata().total_byte_size(),
1561 row_group_metadata.total_byte_size()
1562 );
1563
1564 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1566 assert!(page_reader_0_result.is_ok());
1567 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1568 let mut page_count = 0;
1569 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1570 let is_expected_page = match page {
1571 Page::DataPageV2 {
1572 buf,
1573 num_values,
1574 encoding,
1575 num_nulls,
1576 num_rows,
1577 def_levels_byte_len,
1578 rep_levels_byte_len,
1579 is_compressed,
1580 statistics,
1581 } => {
1582 assert_eq!(buf.len(), 2);
1583 assert_eq!(num_values, 1);
1584 assert_eq!(encoding, Encoding::PLAIN);
1585 assert_eq!(num_nulls, 1);
1586 assert_eq!(num_rows, 1);
1587 assert_eq!(def_levels_byte_len, 2);
1588 assert_eq!(rep_levels_byte_len, 0);
1589 assert!(is_compressed);
1590 assert!(statistics.is_none());
1591 true
1592 }
1593 _ => false,
1594 };
1595 assert!(is_expected_page);
1596 page_count += 1;
1597 }
1598 assert_eq!(page_count, 1);
1599 }
1600
1601 fn get_serialized_page_reader<R: ChunkReader>(
1602 file_reader: &SerializedFileReader<R>,
1603 row_group: usize,
1604 column: usize,
1605 ) -> Result<SerializedPageReader<R>> {
1606 let row_group = {
1607 let row_group_metadata = file_reader.metadata.row_group(row_group);
1608 let props = Arc::clone(&file_reader.props);
1609 let f = Arc::clone(&file_reader.chunk_reader);
1610 SerializedRowGroupReader::new(
1611 f,
1612 row_group_metadata,
1613 file_reader
1614 .metadata
1615 .offset_index()
1616 .map(|x| x[row_group].as_slice()),
1617 props,
1618 )?
1619 };
1620
1621 let col = row_group.metadata.column(column);
1622
1623 let page_locations = row_group
1624 .offset_index
1625 .map(|x| x[column].page_locations.clone());
1626
1627 let props = Arc::clone(&row_group.props);
1628 SerializedPageReader::new_with_properties(
1629 Arc::clone(&row_group.chunk_reader),
1630 col,
1631 usize::try_from(row_group.metadata.num_rows())?,
1632 page_locations,
1633 props,
1634 )
1635 }
1636
1637 #[test]
1638 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1639 let test_file = get_test_file("alltypes_plain.parquet");
1640 let reader = SerializedFileReader::new(test_file)?;
1641
1642 let mut offset_set = HashSet::new();
1643 let num_row_groups = reader.metadata.num_row_groups();
1644 for row_group in 0..num_row_groups {
1645 let num_columns = reader.metadata.row_group(row_group).num_columns();
1646 for column in 0..num_columns {
1647 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1648
1649 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1650 match &page_reader.state {
1651 SerializedPageReaderState::Pages {
1652 page_locations,
1653 dictionary_page,
1654 ..
1655 } => {
1656 if let Some(page) = dictionary_page {
1657 assert_eq!(page.offset as u64, page_offset);
1658 } else if let Some(page) = page_locations.front() {
1659 assert_eq!(page.offset as u64, page_offset);
1660 } else {
1661 unreachable!()
1662 }
1663 }
1664 SerializedPageReaderState::Values {
1665 offset,
1666 next_page_header,
1667 ..
1668 } => {
1669 assert!(next_page_header.is_some());
1670 assert_eq!(*offset, page_offset);
1671 }
1672 }
1673 let page = page_reader.get_next_page()?;
1674 assert!(page.is_some());
1675 let newly_inserted = offset_set.insert(page_offset);
1676 assert!(newly_inserted);
1677 }
1678 }
1679 }
1680
1681 Ok(())
1682 }
1683
1684 #[test]
1685 fn test_page_iterator() {
1686 let file = get_test_file("alltypes_plain.parquet");
1687 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1688
1689 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1690
1691 let page = page_iterator.next();
1693 assert!(page.is_some());
1694 assert!(page.unwrap().is_ok());
1695
1696 let page = page_iterator.next();
1698 assert!(page.is_none());
1699
1700 let row_group_indices = Box::new(0..1);
1701 let mut page_iterator =
1702 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1703
1704 let page = page_iterator.next();
1706 assert!(page.is_some());
1707 assert!(page.unwrap().is_ok());
1708
1709 let page = page_iterator.next();
1711 assert!(page.is_none());
1712 }
1713
1714 #[test]
1715 fn test_file_reader_key_value_metadata() {
1716 let file = get_test_file("binary.parquet");
1717 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1718
1719 let metadata = file_reader
1720 .metadata
1721 .file_metadata()
1722 .key_value_metadata()
1723 .unwrap();
1724
1725 assert_eq!(metadata.len(), 3);
1726
1727 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1728
1729 assert_eq!(metadata[1].key, "writer.model.name");
1730 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1731
1732 assert_eq!(metadata[2].key, "parquet.proto.class");
1733 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1734 }
1735
1736 #[test]
1737 fn test_file_reader_optional_metadata() {
1738 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1740 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1741
1742 let row_group_metadata = file_reader.metadata.row_group(0);
1743 let col0_metadata = row_group_metadata.column(0);
1744
1745 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1747
1748 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1750
1751 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1752 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1753 assert_eq!(page_encoding_stats.count, 1);
1754
1755 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1757 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1758
1759 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1761 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1762 }
1763
1764 #[test]
1765 fn test_file_reader_with_no_filter() -> Result<()> {
1766 let test_file = get_test_file("alltypes_plain.parquet");
1767 let origin_reader = SerializedFileReader::new(test_file)?;
1768 let metadata = origin_reader.metadata();
1770 assert_eq!(metadata.num_row_groups(), 1);
1771 Ok(())
1772 }
1773
1774 #[test]
1775 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1776 let test_file = get_test_file("alltypes_plain.parquet");
1777 let read_options = ReadOptionsBuilder::new()
1778 .with_predicate(Box::new(|_, _| false))
1779 .build();
1780 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1781 let metadata = reader.metadata();
1782 assert_eq!(metadata.num_row_groups(), 0);
1783 Ok(())
1784 }
1785
1786 #[test]
1787 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1788 let test_file = get_test_file("alltypes_plain.parquet");
1789 let origin_reader = SerializedFileReader::new(test_file)?;
1790 let metadata = origin_reader.metadata();
1792 assert_eq!(metadata.num_row_groups(), 1);
1793 let mid = get_midpoint_offset(metadata.row_group(0));
1794
1795 let test_file = get_test_file("alltypes_plain.parquet");
1796 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1797 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1798 let metadata = reader.metadata();
1799 assert_eq!(metadata.num_row_groups(), 1);
1800
1801 let test_file = get_test_file("alltypes_plain.parquet");
1802 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1803 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1804 let metadata = reader.metadata();
1805 assert_eq!(metadata.num_row_groups(), 0);
1806 Ok(())
1807 }
1808
1809 #[test]
1810 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1811 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1812 let origin_reader = SerializedFileReader::new(test_file)?;
1813 let metadata = origin_reader.metadata();
1814 let mid = get_midpoint_offset(metadata.row_group(0));
1815
1816 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1818 let read_options = ReadOptionsBuilder::new()
1819 .with_page_index()
1820 .with_predicate(Box::new(|_, _| true))
1821 .with_range(mid, mid + 1)
1822 .build();
1823 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1824 let metadata = reader.metadata();
1825 assert_eq!(metadata.num_row_groups(), 1);
1826 assert_eq!(metadata.column_index().unwrap().len(), 1);
1827 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1828
1829 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1831 let read_options = ReadOptionsBuilder::new()
1832 .with_page_index()
1833 .with_predicate(Box::new(|_, _| true))
1834 .with_range(0, mid)
1835 .build();
1836 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1837 let metadata = reader.metadata();
1838 assert_eq!(metadata.num_row_groups(), 0);
1839 assert!(metadata.column_index().is_none());
1840 assert!(metadata.offset_index().is_none());
1841
1842 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1844 let read_options = ReadOptionsBuilder::new()
1845 .with_page_index()
1846 .with_predicate(Box::new(|_, _| false))
1847 .with_range(mid, mid + 1)
1848 .build();
1849 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1850 let metadata = reader.metadata();
1851 assert_eq!(metadata.num_row_groups(), 0);
1852 assert!(metadata.column_index().is_none());
1853 assert!(metadata.offset_index().is_none());
1854
1855 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1857 let read_options = ReadOptionsBuilder::new()
1858 .with_page_index()
1859 .with_predicate(Box::new(|_, _| false))
1860 .with_range(0, mid)
1861 .build();
1862 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1863 let metadata = reader.metadata();
1864 assert_eq!(metadata.num_row_groups(), 0);
1865 assert!(metadata.column_index().is_none());
1866 assert!(metadata.offset_index().is_none());
1867 Ok(())
1868 }
1869
1870 #[test]
1871 fn test_file_reader_invalid_metadata() {
1872 let data = [
1873 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1874 80, 65, 82, 49,
1875 ];
1876 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1877 assert_eq!(
1878 ret.err().unwrap().to_string(),
1879 "Parquet error: Could not parse metadata: bad data"
1880 );
1881 }
1882
1883 #[test]
1884 fn test_page_index_reader() {
1901 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1902 let builder = ReadOptionsBuilder::new();
1903 let options = builder.with_page_index().build();
1905 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1906 let reader = reader_result.unwrap();
1907
1908 let metadata = reader.metadata();
1910 assert_eq!(metadata.num_row_groups(), 1);
1911
1912 let column_index = metadata.column_index().unwrap();
1913
1914 assert_eq!(column_index.len(), 1);
1916 let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1917 index
1918 } else {
1919 unreachable!()
1920 };
1921
1922 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1923 let index_in_pages = &index.indexes;
1924
1925 assert_eq!(index_in_pages.len(), 1);
1927
1928 let page0 = &index_in_pages[0];
1929 let min = page0.min.as_ref().unwrap();
1930 let max = page0.max.as_ref().unwrap();
1931 assert_eq!(b"Hello", min.as_bytes());
1932 assert_eq!(b"today", max.as_bytes());
1933
1934 let offset_indexes = metadata.offset_index().unwrap();
1935 assert_eq!(offset_indexes.len(), 1);
1937 let offset_index = &offset_indexes[0];
1938 let page_offset = &offset_index[0].page_locations()[0];
1939
1940 assert_eq!(4, page_offset.offset);
1941 assert_eq!(152, page_offset.compressed_page_size);
1942 assert_eq!(0, page_offset.first_row_index);
1943 }
1944
1945 #[test]
1946 #[allow(deprecated)]
1947 fn test_page_index_reader_out_of_order() {
1948 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1949 let options = ReadOptionsBuilder::new().with_page_index().build();
1950 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1951 let metadata = reader.metadata();
1952
1953 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1954 let columns = metadata.row_group(0).columns();
1955 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1956
1957 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1958 let mut b = read_columns_indexes(&test_file, &reversed)
1959 .unwrap()
1960 .unwrap();
1961 b.reverse();
1962 assert_eq!(a, b);
1963
1964 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1965 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1966 b.reverse();
1967 assert_eq!(a, b);
1968 }
1969
1970 #[test]
1971 fn test_page_index_reader_all_type() {
1972 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1973 let builder = ReadOptionsBuilder::new();
1974 let options = builder.with_page_index().build();
1976 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1977 let reader = reader_result.unwrap();
1978
1979 let metadata = reader.metadata();
1981 assert_eq!(metadata.num_row_groups(), 1);
1982
1983 let column_index = metadata.column_index().unwrap();
1984 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1985
1986 assert_eq!(column_index.len(), 1);
1988 let row_group_metadata = metadata.row_group(0);
1989
1990 assert!(!&column_index[0][0].is_sorted());
1992 let boundary_order = &column_index[0][0].get_boundary_order();
1993 assert!(boundary_order.is_some());
1994 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1995 if let Index::INT32(index) = &column_index[0][0] {
1996 check_native_page_index(
1997 index,
1998 325,
1999 get_row_group_min_max_bytes(row_group_metadata, 0),
2000 BoundaryOrder::UNORDERED,
2001 );
2002 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2003 } else {
2004 unreachable!()
2005 };
2006 assert!(&column_index[0][1].is_sorted());
2008 if let Index::BOOLEAN(index) = &column_index[0][1] {
2009 assert_eq!(index.indexes.len(), 82);
2010 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2011 } else {
2012 unreachable!()
2013 };
2014 assert!(&column_index[0][2].is_sorted());
2016 if let Index::INT32(index) = &column_index[0][2] {
2017 check_native_page_index(
2018 index,
2019 325,
2020 get_row_group_min_max_bytes(row_group_metadata, 2),
2021 BoundaryOrder::ASCENDING,
2022 );
2023 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2024 } else {
2025 unreachable!()
2026 };
2027 assert!(&column_index[0][3].is_sorted());
2029 if let Index::INT32(index) = &column_index[0][3] {
2030 check_native_page_index(
2031 index,
2032 325,
2033 get_row_group_min_max_bytes(row_group_metadata, 3),
2034 BoundaryOrder::ASCENDING,
2035 );
2036 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2037 } else {
2038 unreachable!()
2039 };
2040 assert!(&column_index[0][4].is_sorted());
2042 if let Index::INT32(index) = &column_index[0][4] {
2043 check_native_page_index(
2044 index,
2045 325,
2046 get_row_group_min_max_bytes(row_group_metadata, 4),
2047 BoundaryOrder::ASCENDING,
2048 );
2049 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2050 } else {
2051 unreachable!()
2052 };
2053 assert!(!&column_index[0][5].is_sorted());
2055 if let Index::INT64(index) = &column_index[0][5] {
2056 check_native_page_index(
2057 index,
2058 528,
2059 get_row_group_min_max_bytes(row_group_metadata, 5),
2060 BoundaryOrder::UNORDERED,
2061 );
2062 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2063 } else {
2064 unreachable!()
2065 };
2066 assert!(&column_index[0][6].is_sorted());
2068 if let Index::FLOAT(index) = &column_index[0][6] {
2069 check_native_page_index(
2070 index,
2071 325,
2072 get_row_group_min_max_bytes(row_group_metadata, 6),
2073 BoundaryOrder::ASCENDING,
2074 );
2075 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2076 } else {
2077 unreachable!()
2078 };
2079 assert!(!&column_index[0][7].is_sorted());
2081 if let Index::DOUBLE(index) = &column_index[0][7] {
2082 check_native_page_index(
2083 index,
2084 528,
2085 get_row_group_min_max_bytes(row_group_metadata, 7),
2086 BoundaryOrder::UNORDERED,
2087 );
2088 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2089 } else {
2090 unreachable!()
2091 };
2092 assert!(!&column_index[0][8].is_sorted());
2094 if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2095 check_native_page_index(
2096 index,
2097 974,
2098 get_row_group_min_max_bytes(row_group_metadata, 8),
2099 BoundaryOrder::UNORDERED,
2100 );
2101 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2102 } else {
2103 unreachable!()
2104 };
2105 assert!(&column_index[0][9].is_sorted());
2107 if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2108 check_native_page_index(
2109 index,
2110 352,
2111 get_row_group_min_max_bytes(row_group_metadata, 9),
2112 BoundaryOrder::ASCENDING,
2113 );
2114 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2115 } else {
2116 unreachable!()
2117 };
2118 assert!(!&column_index[0][10].is_sorted());
2121 if let Index::NONE = &column_index[0][10] {
2122 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2123 } else {
2124 unreachable!()
2125 };
2126 assert!(&column_index[0][11].is_sorted());
2128 if let Index::INT32(index) = &column_index[0][11] {
2129 check_native_page_index(
2130 index,
2131 325,
2132 get_row_group_min_max_bytes(row_group_metadata, 11),
2133 BoundaryOrder::ASCENDING,
2134 );
2135 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2136 } else {
2137 unreachable!()
2138 };
2139 assert!(!&column_index[0][12].is_sorted());
2141 if let Index::INT32(index) = &column_index[0][12] {
2142 check_native_page_index(
2143 index,
2144 325,
2145 get_row_group_min_max_bytes(row_group_metadata, 12),
2146 BoundaryOrder::UNORDERED,
2147 );
2148 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2149 } else {
2150 unreachable!()
2151 };
2152 }
2153
2154 fn check_native_page_index<T: ParquetValueType>(
2155 row_group_index: &NativeIndex<T>,
2156 page_size: usize,
2157 min_max: (&[u8], &[u8]),
2158 boundary_order: BoundaryOrder,
2159 ) {
2160 assert_eq!(row_group_index.indexes.len(), page_size);
2161 assert_eq!(row_group_index.boundary_order, boundary_order);
2162 row_group_index.indexes.iter().all(|x| {
2163 x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2164 && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2165 });
2166 }
2167
2168 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2169 let statistics = r.column(col_num).statistics().unwrap();
2170 (
2171 statistics.min_bytes_opt().unwrap_or_default(),
2172 statistics.max_bytes_opt().unwrap_or_default(),
2173 )
2174 }
2175
2176 #[test]
2177 fn test_skip_next_page_with_dictionary_page() {
2178 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2179 let builder = ReadOptionsBuilder::new();
2180 let options = builder.with_page_index().build();
2182 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2183 let reader = reader_result.unwrap();
2184
2185 let row_group_reader = reader.get_row_group(0).unwrap();
2186
2187 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2189
2190 let mut vec = vec![];
2191
2192 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2194 assert!(meta.is_dict);
2195
2196 column_page_reader.skip_next_page().unwrap();
2198
2199 let page = column_page_reader.get_next_page().unwrap().unwrap();
2201 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2202
2203 for _i in 0..351 {
2205 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2207 assert!(!meta.is_dict); vec.push(meta);
2209
2210 let page = column_page_reader.get_next_page().unwrap().unwrap();
2211 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2212 }
2213
2214 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2216 assert!(column_page_reader.get_next_page().unwrap().is_none());
2217
2218 assert_eq!(vec.len(), 351);
2220 }
2221
2222 #[test]
2223 fn test_skip_page_with_offset_index() {
2224 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2225 let builder = ReadOptionsBuilder::new();
2226 let options = builder.with_page_index().build();
2228 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2229 let reader = reader_result.unwrap();
2230
2231 let row_group_reader = reader.get_row_group(0).unwrap();
2232
2233 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2235
2236 let mut vec = vec![];
2237
2238 for i in 0..325 {
2239 if i % 2 == 0 {
2240 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2241 } else {
2242 column_page_reader.skip_next_page().unwrap();
2243 }
2244 }
2245 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2247 assert!(column_page_reader.get_next_page().unwrap().is_none());
2248
2249 assert_eq!(vec.len(), 163);
2250 }
2251
2252 #[test]
2253 fn test_skip_page_without_offset_index() {
2254 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2255
2256 let reader_result = SerializedFileReader::new(test_file);
2258 let reader = reader_result.unwrap();
2259
2260 let row_group_reader = reader.get_row_group(0).unwrap();
2261
2262 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2264
2265 let mut vec = vec![];
2266
2267 for i in 0..325 {
2268 if i % 2 == 0 {
2269 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2270 } else {
2271 column_page_reader.peek_next_page().unwrap().unwrap();
2272 column_page_reader.skip_next_page().unwrap();
2273 }
2274 }
2275 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2277 assert!(column_page_reader.get_next_page().unwrap().is_none());
2278
2279 assert_eq!(vec.len(), 163);
2280 }
2281
2282 #[test]
2283 fn test_peek_page_with_dictionary_page() {
2284 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2285 let builder = ReadOptionsBuilder::new();
2286 let options = builder.with_page_index().build();
2288 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2289 let reader = reader_result.unwrap();
2290 let row_group_reader = reader.get_row_group(0).unwrap();
2291
2292 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2294
2295 let mut vec = vec![];
2296
2297 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2298 assert!(meta.is_dict);
2299 let page = column_page_reader.get_next_page().unwrap().unwrap();
2300 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2301
2302 for i in 0..352 {
2303 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2304 if i != 351 {
2307 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2308 } else {
2309 assert_eq!(meta.num_rows, Some(10));
2312 }
2313 assert!(!meta.is_dict);
2314 vec.push(meta);
2315 let page = column_page_reader.get_next_page().unwrap().unwrap();
2316 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2317 }
2318
2319 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2321 assert!(column_page_reader.get_next_page().unwrap().is_none());
2322
2323 assert_eq!(vec.len(), 352);
2324 }
2325
2326 #[test]
2327 fn test_peek_page_with_dictionary_page_without_offset_index() {
2328 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2329
2330 let reader_result = SerializedFileReader::new(test_file);
2331 let reader = reader_result.unwrap();
2332 let row_group_reader = reader.get_row_group(0).unwrap();
2333
2334 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2336
2337 let mut vec = vec![];
2338
2339 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2340 assert!(meta.is_dict);
2341 let page = column_page_reader.get_next_page().unwrap().unwrap();
2342 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2343
2344 for i in 0..352 {
2345 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2346 if i != 351 {
2349 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2350 } else {
2351 assert_eq!(meta.num_levels, Some(10));
2354 }
2355 assert!(!meta.is_dict);
2356 vec.push(meta);
2357 let page = column_page_reader.get_next_page().unwrap().unwrap();
2358 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2359 }
2360
2361 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2363 assert!(column_page_reader.get_next_page().unwrap().is_none());
2364
2365 assert_eq!(vec.len(), 352);
2366 }
2367
2368 #[test]
2369 fn test_fixed_length_index() {
2370 let message_type = "
2371 message test_schema {
2372 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2373 }
2374 ";
2375
2376 let schema = parse_message_type(message_type).unwrap();
2377 let mut out = Vec::with_capacity(1024);
2378 let mut writer =
2379 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2380
2381 let mut r = writer.next_row_group().unwrap();
2382 let mut c = r.next_column().unwrap().unwrap();
2383 c.typed::<FixedLenByteArrayType>()
2384 .write_batch(
2385 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2386 Some(&[1, 1, 0, 1]),
2387 None,
2388 )
2389 .unwrap();
2390 c.close().unwrap();
2391 r.close().unwrap();
2392 writer.close().unwrap();
2393
2394 let b = Bytes::from(out);
2395 let options = ReadOptionsBuilder::new().with_page_index().build();
2396 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2397 let index = reader.metadata().column_index().unwrap();
2398
2399 assert_eq!(index.len(), 1);
2401 let c = &index[0];
2402 assert_eq!(c.len(), 1);
2404
2405 match &c[0] {
2406 Index::FIXED_LEN_BYTE_ARRAY(v) => {
2407 assert_eq!(v.indexes.len(), 1);
2408 let page_idx = &v.indexes[0];
2409 assert_eq!(page_idx.null_count.unwrap(), 1);
2410 assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2411 assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2412 }
2413 _ => unreachable!(),
2414 }
2415 }
2416
2417 #[test]
2418 fn test_multi_gz() {
2419 let file = get_test_file("concatenated_gzip_members.parquet");
2420 let reader = SerializedFileReader::new(file).unwrap();
2421 let row_group_reader = reader.get_row_group(0).unwrap();
2422 match row_group_reader.get_column_reader(0).unwrap() {
2423 ColumnReader::Int64ColumnReader(mut reader) => {
2424 let mut buffer = Vec::with_capacity(1024);
2425 let mut def_levels = Vec::with_capacity(1024);
2426 let (num_records, num_values, num_levels) = reader
2427 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2428 .unwrap();
2429
2430 assert_eq!(num_records, 513);
2431 assert_eq!(num_values, 513);
2432 assert_eq!(num_levels, 513);
2433
2434 let expected: Vec<i64> = (1..514).collect();
2435 assert_eq!(&buffer, &expected);
2436 }
2437 _ => unreachable!(),
2438 }
2439 }
2440
2441 #[test]
2442 fn test_byte_stream_split_extended() {
2443 let path = format!(
2444 "{}/byte_stream_split_extended.gzip.parquet",
2445 arrow::util::test_util::parquet_test_data(),
2446 );
2447 let file = File::open(path).unwrap();
2448 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2449
2450 let mut iter = reader
2452 .get_row_iter(None)
2453 .expect("Failed to create row iterator");
2454
2455 let mut start = 0;
2456 let end = reader.metadata().file_metadata().num_rows();
2457
2458 let check_row = |row: Result<Row, ParquetError>| {
2459 assert!(row.is_ok());
2460 let r = row.unwrap();
2461 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2462 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2463 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2464 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2465 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2466 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2467 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2468 };
2469
2470 while start < end {
2471 match iter.next() {
2472 Some(row) => check_row(row),
2473 None => break,
2474 };
2475 start += 1;
2476 }
2477 }
2478
2479 #[test]
2480 fn test_filtered_rowgroup_metadata() {
2481 let message_type = "
2482 message test_schema {
2483 REQUIRED INT32 a;
2484 }
2485 ";
2486 let schema = Arc::new(parse_message_type(message_type).unwrap());
2487 let props = Arc::new(
2488 WriterProperties::builder()
2489 .set_statistics_enabled(EnabledStatistics::Page)
2490 .build(),
2491 );
2492 let mut file: File = tempfile::tempfile().unwrap();
2493 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2494 let data = [1, 2, 3, 4, 5];
2495
2496 for idx in 0..5 {
2498 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2499 let mut row_group_writer = file_writer.next_row_group().unwrap();
2500 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2501 writer
2502 .typed::<Int32Type>()
2503 .write_batch(data_i.as_slice(), None, None)
2504 .unwrap();
2505 writer.close().unwrap();
2506 }
2507 row_group_writer.close().unwrap();
2508 file_writer.flushed_row_groups();
2509 }
2510 let file_metadata = file_writer.close().unwrap();
2511
2512 assert_eq!(file_metadata.num_rows, 25);
2513 assert_eq!(file_metadata.row_groups.len(), 5);
2514
2515 let read_options = ReadOptionsBuilder::new()
2517 .with_page_index()
2518 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2519 .build();
2520 let reader =
2521 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2522 .unwrap();
2523 let metadata = reader.metadata();
2524
2525 assert_eq!(metadata.num_row_groups(), 1);
2527 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2528
2529 assert!(metadata.column_index().is_some());
2531 assert!(metadata.offset_index().is_some());
2532 assert_eq!(metadata.column_index().unwrap().len(), 1);
2533 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2534 let col_idx = metadata.column_index().unwrap();
2535 let off_idx = metadata.offset_index().unwrap();
2536 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2537 let pg_idx = &col_idx[0][0];
2538 let off_idx_i = &off_idx[0][0];
2539
2540 match pg_idx {
2542 Index::INT32(int_idx) => {
2543 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2544 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2545 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2546 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2547 }
2548 _ => panic!("wrong stats type"),
2549 }
2550
2551 assert_eq!(
2553 off_idx_i.page_locations[0].offset,
2554 metadata.row_group(0).column(0).data_page_offset()
2555 );
2556
2557 let read_options = ReadOptionsBuilder::new()
2559 .with_page_index()
2560 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2561 .build();
2562 let reader =
2563 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2564 .unwrap();
2565 let metadata = reader.metadata();
2566
2567 assert_eq!(metadata.num_row_groups(), 2);
2569 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2570 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2571
2572 assert!(metadata.column_index().is_some());
2574 assert!(metadata.offset_index().is_some());
2575 assert_eq!(metadata.column_index().unwrap().len(), 2);
2576 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2577 let col_idx = metadata.column_index().unwrap();
2578 let off_idx = metadata.offset_index().unwrap();
2579
2580 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2581 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2582 let pg_idx = &col_idx_i[0];
2583 let off_idx_i = &off_idx[i][0];
2584
2585 match pg_idx {
2587 Index::INT32(int_idx) => {
2588 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2589 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2590 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2591 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2592 }
2593 _ => panic!("wrong stats type"),
2594 }
2595
2596 assert_eq!(
2598 off_idx_i.page_locations[0].offset,
2599 metadata.row_group(i).column(0).data_page_offset()
2600 );
2601 }
2602 }
2603}