1use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30 metadata::*,
31 properties::{ReaderProperties, ReaderPropertiesPtr},
32 reader::*,
33 statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39#[cfg(feature = "encryption")]
40use crate::thrift::TCompactSliceInputProtocol;
41use crate::thrift::TSerializable;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45use thrift::protocol::TCompactInputProtocol;
46
47impl TryFrom<File> for SerializedFileReader<File> {
48 type Error = ParquetError;
49
50 fn try_from(file: File) -> Result<Self> {
51 Self::new(file)
52 }
53}
54
55impl TryFrom<&Path> for SerializedFileReader<File> {
56 type Error = ParquetError;
57
58 fn try_from(path: &Path) -> Result<Self> {
59 let file = File::open(path)?;
60 Self::try_from(file)
61 }
62}
63
64impl TryFrom<String> for SerializedFileReader<File> {
65 type Error = ParquetError;
66
67 fn try_from(path: String) -> Result<Self> {
68 Self::try_from(Path::new(&path))
69 }
70}
71
72impl TryFrom<&str> for SerializedFileReader<File> {
73 type Error = ParquetError;
74
75 fn try_from(path: &str) -> Result<Self> {
76 Self::try_from(Path::new(&path))
77 }
78}
79
80impl IntoIterator for SerializedFileReader<File> {
83 type Item = Result<Row>;
84 type IntoIter = RowIter<'static>;
85
86 fn into_iter(self) -> Self::IntoIter {
87 RowIter::from_file_into(Box::new(self))
88 }
89}
90
91pub struct SerializedFileReader<R: ChunkReader> {
96 chunk_reader: Arc<R>,
97 metadata: Arc<ParquetMetaData>,
98 props: ReaderPropertiesPtr,
99}
100
101pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
105
106#[derive(Default)]
110pub struct ReadOptionsBuilder {
111 predicates: Vec<ReadGroupPredicate>,
112 enable_page_index: bool,
113 props: Option<ReaderProperties>,
114}
115
116impl ReadOptionsBuilder {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125 self.predicates.push(predicate);
126 self
127 }
128
129 pub fn with_range(mut self, start: i64, end: i64) -> Self {
132 assert!(start < end);
133 let predicate = move |rg: &RowGroupMetaData, _: usize| {
134 let mid = get_midpoint_offset(rg);
135 mid >= start && mid < end
136 };
137 self.predicates.push(Box::new(predicate));
138 self
139 }
140
141 pub fn with_page_index(mut self) -> Self {
146 self.enable_page_index = true;
147 self
148 }
149
150 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152 self.props = Some(properties);
153 self
154 }
155
156 pub fn build(self) -> ReadOptions {
158 let props = self
159 .props
160 .unwrap_or_else(|| ReaderProperties::builder().build());
161 ReadOptions {
162 predicates: self.predicates,
163 enable_page_index: self.enable_page_index,
164 props,
165 }
166 }
167}
168
169pub struct ReadOptions {
174 predicates: Vec<ReadGroupPredicate>,
175 enable_page_index: bool,
176 props: ReaderProperties,
177}
178
179impl<R: 'static + ChunkReader> SerializedFileReader<R> {
180 pub fn new(chunk_reader: R) -> Result<Self> {
183 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
184 let props = Arc::new(ReaderProperties::builder().build());
185 Ok(Self {
186 chunk_reader: Arc::new(chunk_reader),
187 metadata: Arc::new(metadata),
188 props,
189 })
190 }
191
192 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
195 let mut metadata_builder = ParquetMetaDataReader::new()
196 .parse_and_finish(&chunk_reader)?
197 .into_builder();
198 let mut predicates = options.predicates;
199
200 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
202 let mut keep = true;
203 for predicate in &mut predicates {
204 if !predicate(&rg_meta, i) {
205 keep = false;
206 break;
207 }
208 }
209 if keep {
210 metadata_builder = metadata_builder.add_row_group(rg_meta);
211 }
212 }
213
214 let mut metadata = metadata_builder.build();
215
216 if options.enable_page_index {
218 let mut reader =
219 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
220 reader.read_page_indexes(&chunk_reader)?;
221 metadata = reader.finish()?;
222 }
223
224 Ok(Self {
225 chunk_reader: Arc::new(chunk_reader),
226 metadata: Arc::new(metadata),
227 props: Arc::new(options.props),
228 })
229 }
230}
231
232fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
234 let col = meta.column(0);
235 let mut offset = col.data_page_offset();
236 if let Some(dic_offset) = col.dictionary_page_offset() {
237 if offset > dic_offset {
238 offset = dic_offset
239 }
240 };
241 offset + meta.compressed_size() / 2
242}
243
244impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
245 fn metadata(&self) -> &ParquetMetaData {
246 &self.metadata
247 }
248
249 fn num_row_groups(&self) -> usize {
250 self.metadata.num_row_groups()
251 }
252
253 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
254 let row_group_metadata = self.metadata.row_group(i);
255 let props = Arc::clone(&self.props);
257 let f = Arc::clone(&self.chunk_reader);
258 Ok(Box::new(SerializedRowGroupReader::new(
259 f,
260 row_group_metadata,
261 self.metadata.offset_index().map(|x| x[i].as_slice()),
262 props,
263 )?))
264 }
265
266 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
267 RowIter::from_file(projection, self)
268 }
269}
270
271pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
273 chunk_reader: Arc<R>,
274 metadata: &'a RowGroupMetaData,
275 offset_index: Option<&'a [OffsetIndexMetaData]>,
276 props: ReaderPropertiesPtr,
277 bloom_filters: Vec<Option<Sbbf>>,
278}
279
280impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
281 pub fn new(
283 chunk_reader: Arc<R>,
284 metadata: &'a RowGroupMetaData,
285 offset_index: Option<&'a [OffsetIndexMetaData]>,
286 props: ReaderPropertiesPtr,
287 ) -> Result<Self> {
288 let bloom_filters = if props.read_bloom_filter() {
289 metadata
290 .columns()
291 .iter()
292 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
293 .collect::<Result<Vec<_>>>()?
294 } else {
295 std::iter::repeat_n(None, metadata.columns().len()).collect()
296 };
297 Ok(Self {
298 chunk_reader,
299 metadata,
300 offset_index,
301 props,
302 bloom_filters,
303 })
304 }
305}
306
307impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
308 fn metadata(&self) -> &RowGroupMetaData {
309 self.metadata
310 }
311
312 fn num_columns(&self) -> usize {
313 self.metadata.num_columns()
314 }
315
316 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
318 let col = self.metadata.column(i);
319
320 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
321
322 let props = Arc::clone(&self.props);
323 Ok(Box::new(SerializedPageReader::new_with_properties(
324 Arc::clone(&self.chunk_reader),
325 col,
326 usize::try_from(self.metadata.num_rows())?,
327 page_locations,
328 props,
329 )?))
330 }
331
332 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
334 self.bloom_filters[i].as_ref()
335 }
336
337 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
338 RowIter::from_row_group(projection, self)
339 }
340}
341
342pub(crate) fn decode_page(
344 page_header: PageHeader,
345 buffer: Bytes,
346 physical_type: Type,
347 decompressor: Option<&mut Box<dyn Codec>>,
348) -> Result<Page> {
349 #[cfg(feature = "crc")]
351 if let Some(expected_crc) = page_header.crc {
352 let crc = crc32fast::hash(&buffer);
353 if crc != expected_crc as u32 {
354 return Err(general_err!("Page CRC checksum mismatch"));
355 }
356 }
357
358 let mut offset: usize = 0;
365 let mut can_decompress = true;
366
367 if let Some(ref header_v2) = page_header.data_page_header_v2 {
368 if header_v2.definition_levels_byte_length < 0
369 || header_v2.repetition_levels_byte_length < 0
370 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
371 > page_header.uncompressed_page_size
372 {
373 return Err(general_err!(
374 "DataPage v2 header contains implausible values \
375 for definition_levels_byte_length ({}) \
376 and repetition_levels_byte_length ({}) \
377 given DataPage header provides uncompressed_page_size ({})",
378 header_v2.definition_levels_byte_length,
379 header_v2.repetition_levels_byte_length,
380 page_header.uncompressed_page_size
381 ));
382 }
383 offset = usize::try_from(
384 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
385 )?;
386 can_decompress = header_v2.is_compressed.unwrap_or(true);
388 }
389
390 let buffer = match decompressor {
393 Some(decompressor) if can_decompress => {
394 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
395 let decompressed_size = uncompressed_page_size - offset;
396 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
397 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
398 if decompressed_size > 0 {
399 let compressed = &buffer.as_ref()[offset..];
400 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
401 }
402
403 if decompressed.len() != uncompressed_page_size {
404 return Err(general_err!(
405 "Actual decompressed size doesn't match the expected one ({} vs {})",
406 decompressed.len(),
407 uncompressed_page_size
408 ));
409 }
410
411 Bytes::from(decompressed)
412 }
413 _ => buffer,
414 };
415
416 let result = match page_header.type_ {
417 PageType::DICTIONARY_PAGE => {
418 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
419 ParquetError::General("Missing dictionary page header".to_string())
420 })?;
421 let is_sorted = dict_header.is_sorted.unwrap_or(false);
422 Page::DictionaryPage {
423 buf: buffer,
424 num_values: dict_header.num_values.try_into()?,
425 encoding: Encoding::try_from(dict_header.encoding)?,
426 is_sorted,
427 }
428 }
429 PageType::DATA_PAGE => {
430 let header = page_header
431 .data_page_header
432 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
433 Page::DataPage {
434 buf: buffer,
435 num_values: header.num_values.try_into()?,
436 encoding: Encoding::try_from(header.encoding)?,
437 def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
438 rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
439 statistics: statistics::from_thrift(physical_type, header.statistics)?,
440 }
441 }
442 PageType::DATA_PAGE_V2 => {
443 let header = page_header
444 .data_page_header_v2
445 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
446 let is_compressed = header.is_compressed.unwrap_or(true);
447 Page::DataPageV2 {
448 buf: buffer,
449 num_values: header.num_values.try_into()?,
450 encoding: Encoding::try_from(header.encoding)?,
451 num_nulls: header.num_nulls.try_into()?,
452 num_rows: header.num_rows.try_into()?,
453 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
454 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
455 is_compressed,
456 statistics: statistics::from_thrift(physical_type, header.statistics)?,
457 }
458 }
459 _ => {
460 unimplemented!("Page type {:?} is not supported", page_header.type_)
462 }
463 };
464
465 Ok(result)
466}
467
468enum SerializedPageReaderState {
469 Values {
470 offset: u64,
473
474 remaining_bytes: u64,
477
478 next_page_header: Option<Box<PageHeader>>,
480
481 page_index: usize,
483
484 require_dictionary: bool,
486 },
487 Pages {
488 page_locations: VecDeque<PageLocation>,
490 dictionary_page: Option<PageLocation>,
492 total_rows: usize,
494 page_index: usize,
496 },
497}
498
499#[derive(Default)]
500struct SerializedPageReaderContext {
501 #[cfg(feature = "encryption")]
503 crypto_context: Option<Arc<CryptoContext>>,
504}
505
506pub struct SerializedPageReader<R: ChunkReader> {
508 reader: Arc<R>,
510
511 decompressor: Option<Box<dyn Codec>>,
513
514 physical_type: Type,
516
517 state: SerializedPageReaderState,
518
519 context: SerializedPageReaderContext,
520}
521
522impl<R: ChunkReader> SerializedPageReader<R> {
523 pub fn new(
525 reader: Arc<R>,
526 column_chunk_metadata: &ColumnChunkMetaData,
527 total_rows: usize,
528 page_locations: Option<Vec<PageLocation>>,
529 ) -> Result<Self> {
530 let props = Arc::new(ReaderProperties::builder().build());
531 SerializedPageReader::new_with_properties(
532 reader,
533 column_chunk_metadata,
534 total_rows,
535 page_locations,
536 props,
537 )
538 }
539
540 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
542 pub(crate) fn add_crypto_context(
543 self,
544 _rg_idx: usize,
545 _column_idx: usize,
546 _parquet_meta_data: &ParquetMetaData,
547 _column_chunk_metadata: &ColumnChunkMetaData,
548 ) -> Result<SerializedPageReader<R>> {
549 Ok(self)
550 }
551
552 #[cfg(feature = "encryption")]
554 pub(crate) fn add_crypto_context(
555 mut self,
556 rg_idx: usize,
557 column_idx: usize,
558 parquet_meta_data: &ParquetMetaData,
559 column_chunk_metadata: &ColumnChunkMetaData,
560 ) -> Result<SerializedPageReader<R>> {
561 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
562 return Ok(self);
563 };
564 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
565 return Ok(self);
566 };
567 let crypto_context =
568 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
569 self.context.crypto_context = Some(Arc::new(crypto_context));
570 Ok(self)
571 }
572
573 pub fn new_with_properties(
575 reader: Arc<R>,
576 meta: &ColumnChunkMetaData,
577 total_rows: usize,
578 page_locations: Option<Vec<PageLocation>>,
579 props: ReaderPropertiesPtr,
580 ) -> Result<Self> {
581 let decompressor = create_codec(meta.compression(), props.codec_options())?;
582 let (start, len) = meta.byte_range();
583
584 let state = match page_locations {
585 Some(locations) => {
586 let dictionary_page = match locations.first() {
589 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
590 offset: start as i64,
591 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
592 first_row_index: 0,
593 }),
594 _ => None,
595 };
596
597 SerializedPageReaderState::Pages {
598 page_locations: locations.into(),
599 dictionary_page,
600 total_rows,
601 page_index: 0,
602 }
603 }
604 None => SerializedPageReaderState::Values {
605 offset: start,
606 remaining_bytes: len,
607 next_page_header: None,
608 page_index: 0,
609 require_dictionary: meta.dictionary_page_offset().is_some(),
610 },
611 };
612 Ok(Self {
613 reader,
614 decompressor,
615 state,
616 physical_type: meta.column_type(),
617 context: Default::default(),
618 })
619 }
620
621 #[cfg(test)]
627 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
628 match &mut self.state {
629 SerializedPageReaderState::Values {
630 offset,
631 remaining_bytes,
632 next_page_header,
633 page_index,
634 require_dictionary,
635 } => {
636 loop {
637 if *remaining_bytes == 0 {
638 return Ok(None);
639 }
640 return if let Some(header) = next_page_header.as_ref() {
641 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
642 Ok(Some(*offset))
643 } else {
644 *next_page_header = None;
646 continue;
647 }
648 } else {
649 let mut read = self.reader.get_read(*offset)?;
650 let (header_len, header) = Self::read_page_header_len(
651 &self.context,
652 &mut read,
653 *page_index,
654 *require_dictionary,
655 )?;
656 *offset += header_len as u64;
657 *remaining_bytes -= header_len as u64;
658 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
659 Ok(Some(*offset))
660 } else {
661 continue;
663 };
664 *next_page_header = Some(Box::new(header));
665 page_meta
666 };
667 }
668 }
669 SerializedPageReaderState::Pages {
670 page_locations,
671 dictionary_page,
672 ..
673 } => {
674 if let Some(page) = dictionary_page {
675 Ok(Some(page.offset as u64))
676 } else if let Some(page) = page_locations.front() {
677 Ok(Some(page.offset as u64))
678 } else {
679 Ok(None)
680 }
681 }
682 }
683 }
684
685 fn read_page_header_len<T: Read>(
686 context: &SerializedPageReaderContext,
687 input: &mut T,
688 page_index: usize,
689 dictionary_page: bool,
690 ) -> Result<(usize, PageHeader)> {
691 struct TrackedRead<R> {
693 inner: R,
694 bytes_read: usize,
695 }
696
697 impl<R: Read> Read for TrackedRead<R> {
698 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
699 let v = self.inner.read(buf)?;
700 self.bytes_read += v;
701 Ok(v)
702 }
703 }
704
705 let mut tracked = TrackedRead {
706 inner: input,
707 bytes_read: 0,
708 };
709 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
710 Ok((tracked.bytes_read, header))
711 }
712
713 fn read_page_header_len_from_bytes(
714 context: &SerializedPageReaderContext,
715 buffer: &[u8],
716 page_index: usize,
717 dictionary_page: bool,
718 ) -> Result<(usize, PageHeader)> {
719 let mut input = std::io::Cursor::new(buffer);
720 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
721 let header_len = input.position() as usize;
722 Ok((header_len, header))
723 }
724}
725
726#[cfg(not(feature = "encryption"))]
727impl SerializedPageReaderContext {
728 fn read_page_header<T: Read>(
729 &self,
730 input: &mut T,
731 _page_index: usize,
732 _dictionary_page: bool,
733 ) -> Result<PageHeader> {
734 let mut prot = TCompactInputProtocol::new(input);
735 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
736 }
737
738 fn decrypt_page_data<T>(
739 &self,
740 buffer: T,
741 _page_index: usize,
742 _dictionary_page: bool,
743 ) -> Result<T> {
744 Ok(buffer)
745 }
746}
747
748#[cfg(feature = "encryption")]
749impl SerializedPageReaderContext {
750 fn read_page_header<T: Read>(
751 &self,
752 input: &mut T,
753 page_index: usize,
754 dictionary_page: bool,
755 ) -> Result<PageHeader> {
756 match self.page_crypto_context(page_index, dictionary_page) {
757 None => {
758 let mut prot = TCompactInputProtocol::new(input);
759 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
760 }
761 Some(page_crypto_context) => {
762 let data_decryptor = page_crypto_context.data_decryptor();
763 let aad = page_crypto_context.create_page_header_aad()?;
764
765 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
766 ParquetError::General(format!(
767 "Error decrypting page header for column {}, decryption key may be wrong",
768 page_crypto_context.column_ordinal
769 ))
770 })?;
771
772 let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
773 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
774 }
775 }
776 }
777
778 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
779 where
780 T: AsRef<[u8]>,
781 T: From<Vec<u8>>,
782 {
783 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
784 if let Some(page_crypto_context) = page_crypto_context {
785 let decryptor = page_crypto_context.data_decryptor();
786 let aad = page_crypto_context.create_page_aad()?;
787 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
788 Ok(T::from(decrypted))
789 } else {
790 Ok(buffer)
791 }
792 }
793
794 fn page_crypto_context(
795 &self,
796 page_index: usize,
797 dictionary_page: bool,
798 ) -> Option<Arc<CryptoContext>> {
799 self.crypto_context.as_ref().map(|c| {
800 Arc::new(if dictionary_page {
801 c.for_dictionary_page()
802 } else {
803 c.with_page_ordinal(page_index)
804 })
805 })
806 }
807}
808
809impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
810 type Item = Result<Page>;
811
812 fn next(&mut self) -> Option<Self::Item> {
813 self.get_next_page().transpose()
814 }
815}
816
817fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
818 if header_len as u64 > remaining_bytes {
819 return Err(eof_err!("Invalid page header"));
820 }
821 Ok(())
822}
823
824fn verify_page_size(
825 compressed_size: i32,
826 uncompressed_size: i32,
827 remaining_bytes: u64,
828) -> Result<()> {
829 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
833 return Err(eof_err!("Invalid page header"));
834 }
835 Ok(())
836}
837
838impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
839 fn get_next_page(&mut self) -> Result<Option<Page>> {
840 loop {
841 let page = match &mut self.state {
842 SerializedPageReaderState::Values {
843 offset,
844 remaining_bytes: remaining,
845 next_page_header,
846 page_index,
847 require_dictionary,
848 } => {
849 if *remaining == 0 {
850 return Ok(None);
851 }
852
853 let mut read = self.reader.get_read(*offset)?;
854 let header = if let Some(header) = next_page_header.take() {
855 *header
856 } else {
857 let (header_len, header) = Self::read_page_header_len(
858 &self.context,
859 &mut read,
860 *page_index,
861 *require_dictionary,
862 )?;
863 verify_page_header_len(header_len, *remaining)?;
864 *offset += header_len as u64;
865 *remaining -= header_len as u64;
866 header
867 };
868 verify_page_size(
869 header.compressed_page_size,
870 header.uncompressed_page_size,
871 *remaining,
872 )?;
873 let data_len = header.compressed_page_size as usize;
874 *offset += data_len as u64;
875 *remaining -= data_len as u64;
876
877 if header.type_ == PageType::INDEX_PAGE {
878 continue;
879 }
880
881 let mut buffer = Vec::with_capacity(data_len);
882 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
883
884 if read != data_len {
885 return Err(eof_err!(
886 "Expected to read {} bytes of page, read only {}",
887 data_len,
888 read
889 ));
890 }
891
892 let buffer =
893 self.context
894 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
895
896 let page = decode_page(
897 header,
898 Bytes::from(buffer),
899 self.physical_type,
900 self.decompressor.as_mut(),
901 )?;
902 if page.is_data_page() {
903 *page_index += 1;
904 } else if page.is_dictionary_page() {
905 *require_dictionary = false;
906 }
907 page
908 }
909 SerializedPageReaderState::Pages {
910 page_locations,
911 dictionary_page,
912 page_index,
913 ..
914 } => {
915 let (front, is_dictionary_page) = match dictionary_page.take() {
916 Some(front) => (front, true),
917 None => match page_locations.pop_front() {
918 Some(front) => (front, false),
919 None => return Ok(None),
920 },
921 };
922
923 let page_len = usize::try_from(front.compressed_page_size)?;
924 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
925
926 let (offset, header) = Self::read_page_header_len_from_bytes(
927 &self.context,
928 buffer.as_ref(),
929 *page_index,
930 is_dictionary_page,
931 )?;
932 let bytes = buffer.slice(offset..);
933 let bytes =
934 self.context
935 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
936
937 if !is_dictionary_page {
938 *page_index += 1;
939 }
940 decode_page(
941 header,
942 bytes,
943 self.physical_type,
944 self.decompressor.as_mut(),
945 )?
946 }
947 };
948
949 return Ok(Some(page));
950 }
951 }
952
953 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
954 match &mut self.state {
955 SerializedPageReaderState::Values {
956 offset,
957 remaining_bytes,
958 next_page_header,
959 page_index,
960 require_dictionary,
961 } => {
962 loop {
963 if *remaining_bytes == 0 {
964 return Ok(None);
965 }
966 return if let Some(header) = next_page_header.as_ref() {
967 if let Ok(page_meta) = (&**header).try_into() {
968 Ok(Some(page_meta))
969 } else {
970 *next_page_header = None;
972 continue;
973 }
974 } else {
975 let mut read = self.reader.get_read(*offset)?;
976 let (header_len, header) = Self::read_page_header_len(
977 &self.context,
978 &mut read,
979 *page_index,
980 *require_dictionary,
981 )?;
982 verify_page_header_len(header_len, *remaining_bytes)?;
983 *offset += header_len as u64;
984 *remaining_bytes -= header_len as u64;
985 let page_meta = if let Ok(page_meta) = (&header).try_into() {
986 Ok(Some(page_meta))
987 } else {
988 continue;
990 };
991 *next_page_header = Some(Box::new(header));
992 page_meta
993 };
994 }
995 }
996 SerializedPageReaderState::Pages {
997 page_locations,
998 dictionary_page,
999 total_rows,
1000 page_index: _,
1001 } => {
1002 if dictionary_page.is_some() {
1003 Ok(Some(PageMetadata {
1004 num_rows: None,
1005 num_levels: None,
1006 is_dict: true,
1007 }))
1008 } else if let Some(page) = page_locations.front() {
1009 let next_rows = page_locations
1010 .get(1)
1011 .map(|x| x.first_row_index as usize)
1012 .unwrap_or(*total_rows);
1013
1014 Ok(Some(PageMetadata {
1015 num_rows: Some(next_rows - page.first_row_index as usize),
1016 num_levels: None,
1017 is_dict: false,
1018 }))
1019 } else {
1020 Ok(None)
1021 }
1022 }
1023 }
1024 }
1025
1026 fn skip_next_page(&mut self) -> Result<()> {
1027 match &mut self.state {
1028 SerializedPageReaderState::Values {
1029 offset,
1030 remaining_bytes,
1031 next_page_header,
1032 page_index,
1033 require_dictionary,
1034 } => {
1035 if let Some(buffered_header) = next_page_header.take() {
1036 verify_page_size(
1037 buffered_header.compressed_page_size,
1038 buffered_header.uncompressed_page_size,
1039 *remaining_bytes,
1040 )?;
1041 *offset += buffered_header.compressed_page_size as u64;
1043 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1044 } else {
1045 let mut read = self.reader.get_read(*offset)?;
1046 let (header_len, header) = Self::read_page_header_len(
1047 &self.context,
1048 &mut read,
1049 *page_index,
1050 *require_dictionary,
1051 )?;
1052 verify_page_header_len(header_len, *remaining_bytes)?;
1053 verify_page_size(
1054 header.compressed_page_size,
1055 header.uncompressed_page_size,
1056 *remaining_bytes,
1057 )?;
1058 let data_page_size = header.compressed_page_size as u64;
1059 *offset += header_len as u64 + data_page_size;
1060 *remaining_bytes -= header_len as u64 + data_page_size;
1061 }
1062 if *require_dictionary {
1063 *require_dictionary = false;
1064 } else {
1065 *page_index += 1;
1066 }
1067 Ok(())
1068 }
1069 SerializedPageReaderState::Pages {
1070 page_locations,
1071 dictionary_page,
1072 page_index,
1073 ..
1074 } => {
1075 if dictionary_page.is_some() {
1076 dictionary_page.take();
1078 } else {
1079 if page_locations.pop_front().is_some() {
1081 *page_index += 1;
1082 }
1083 }
1084
1085 Ok(())
1086 }
1087 }
1088 }
1089
1090 fn at_record_boundary(&mut self) -> Result<bool> {
1091 match &mut self.state {
1092 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1093 SerializedPageReaderState::Pages { .. } => Ok(true),
1094 }
1095 }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use std::collections::HashSet;
1101
1102 use bytes::Buf;
1103
1104 use crate::file::properties::{EnabledStatistics, WriterProperties};
1105 use crate::format::BoundaryOrder;
1106
1107 use crate::basic::{self, ColumnOrder, SortOrder};
1108 use crate::column::reader::ColumnReader;
1109 use crate::data_type::private::ParquetValueType;
1110 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1111 use crate::file::page_index::index::{Index, NativeIndex};
1112 #[allow(deprecated)]
1113 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1114 use crate::file::writer::SerializedFileWriter;
1115 use crate::record::RowAccessor;
1116 use crate::schema::parser::parse_message_type;
1117 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1118
1119 use super::*;
1120
1121 #[test]
1122 fn test_cursor_and_file_has_the_same_behaviour() {
1123 let mut buf: Vec<u8> = Vec::new();
1124 get_test_file("alltypes_plain.parquet")
1125 .read_to_end(&mut buf)
1126 .unwrap();
1127 let cursor = Bytes::from(buf);
1128 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1129
1130 let test_file = get_test_file("alltypes_plain.parquet");
1131 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1132
1133 let file_iter = read_from_file.get_row_iter(None).unwrap();
1134 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1135
1136 for (a, b) in file_iter.zip(cursor_iter) {
1137 assert_eq!(a.unwrap(), b.unwrap())
1138 }
1139 }
1140
1141 #[test]
1142 fn test_file_reader_try_from() {
1143 let test_file = get_test_file("alltypes_plain.parquet");
1145 let test_path_buf = get_test_path("alltypes_plain.parquet");
1146 let test_path = test_path_buf.as_path();
1147 let test_path_str = test_path.to_str().unwrap();
1148
1149 let reader = SerializedFileReader::try_from(test_file);
1150 assert!(reader.is_ok());
1151
1152 let reader = SerializedFileReader::try_from(test_path);
1153 assert!(reader.is_ok());
1154
1155 let reader = SerializedFileReader::try_from(test_path_str);
1156 assert!(reader.is_ok());
1157
1158 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1159 assert!(reader.is_ok());
1160
1161 let test_path = Path::new("invalid.parquet");
1163 let test_path_str = test_path.to_str().unwrap();
1164
1165 let reader = SerializedFileReader::try_from(test_path);
1166 assert!(reader.is_err());
1167
1168 let reader = SerializedFileReader::try_from(test_path_str);
1169 assert!(reader.is_err());
1170
1171 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1172 assert!(reader.is_err());
1173 }
1174
1175 #[test]
1176 fn test_file_reader_into_iter() {
1177 let path = get_test_path("alltypes_plain.parquet");
1178 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1179 let iter = reader.into_iter();
1180 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1181
1182 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1183 }
1184
1185 #[test]
1186 fn test_file_reader_into_iter_project() {
1187 let path = get_test_path("alltypes_plain.parquet");
1188 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1189 let schema = "message schema { OPTIONAL INT32 id; }";
1190 let proj = parse_message_type(schema).ok();
1191 let iter = reader.into_iter().project(proj).unwrap();
1192 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1193
1194 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1195 }
1196
1197 #[test]
1198 fn test_reuse_file_chunk() {
1199 let test_file = get_test_file("alltypes_plain.parquet");
1203 let reader = SerializedFileReader::new(test_file).unwrap();
1204 let row_group = reader.get_row_group(0).unwrap();
1205
1206 let mut page_readers = Vec::new();
1207 for i in 0..row_group.num_columns() {
1208 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1209 }
1210
1211 for mut page_reader in page_readers {
1214 assert!(page_reader.get_next_page().is_ok());
1215 }
1216 }
1217
1218 #[test]
1219 fn test_file_reader() {
1220 let test_file = get_test_file("alltypes_plain.parquet");
1221 let reader_result = SerializedFileReader::new(test_file);
1222 assert!(reader_result.is_ok());
1223 let reader = reader_result.unwrap();
1224
1225 let metadata = reader.metadata();
1227 assert_eq!(metadata.num_row_groups(), 1);
1228
1229 let file_metadata = metadata.file_metadata();
1231 assert!(file_metadata.created_by().is_some());
1232 assert_eq!(
1233 file_metadata.created_by().unwrap(),
1234 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1235 );
1236 assert!(file_metadata.key_value_metadata().is_none());
1237 assert_eq!(file_metadata.num_rows(), 8);
1238 assert_eq!(file_metadata.version(), 1);
1239 assert_eq!(file_metadata.column_orders(), None);
1240
1241 let row_group_metadata = metadata.row_group(0);
1243 assert_eq!(row_group_metadata.num_columns(), 11);
1244 assert_eq!(row_group_metadata.num_rows(), 8);
1245 assert_eq!(row_group_metadata.total_byte_size(), 671);
1246 for i in 0..row_group_metadata.num_columns() {
1248 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1249 }
1250
1251 let row_group_reader_result = reader.get_row_group(0);
1253 assert!(row_group_reader_result.is_ok());
1254 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1255 assert_eq!(
1256 row_group_reader.num_columns(),
1257 row_group_metadata.num_columns()
1258 );
1259 assert_eq!(
1260 row_group_reader.metadata().total_byte_size(),
1261 row_group_metadata.total_byte_size()
1262 );
1263
1264 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1267 assert!(page_reader_0_result.is_ok());
1268 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1269 let mut page_count = 0;
1270 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1271 let is_expected_page = match page {
1272 Page::DictionaryPage {
1273 buf,
1274 num_values,
1275 encoding,
1276 is_sorted,
1277 } => {
1278 assert_eq!(buf.len(), 32);
1279 assert_eq!(num_values, 8);
1280 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1281 assert!(!is_sorted);
1282 true
1283 }
1284 Page::DataPage {
1285 buf,
1286 num_values,
1287 encoding,
1288 def_level_encoding,
1289 rep_level_encoding,
1290 statistics,
1291 } => {
1292 assert_eq!(buf.len(), 11);
1293 assert_eq!(num_values, 8);
1294 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1295 assert_eq!(def_level_encoding, Encoding::RLE);
1296 #[allow(deprecated)]
1297 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1298 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1299 assert!(statistics.is_none());
1300 true
1301 }
1302 _ => false,
1303 };
1304 assert!(is_expected_page);
1305 page_count += 1;
1306 }
1307 assert_eq!(page_count, 2);
1308 }
1309
1310 #[test]
1311 fn test_file_reader_datapage_v2() {
1312 let test_file = get_test_file("datapage_v2.snappy.parquet");
1313 let reader_result = SerializedFileReader::new(test_file);
1314 assert!(reader_result.is_ok());
1315 let reader = reader_result.unwrap();
1316
1317 let metadata = reader.metadata();
1319 assert_eq!(metadata.num_row_groups(), 1);
1320
1321 let file_metadata = metadata.file_metadata();
1323 assert!(file_metadata.created_by().is_some());
1324 assert_eq!(
1325 file_metadata.created_by().unwrap(),
1326 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1327 );
1328 assert!(file_metadata.key_value_metadata().is_some());
1329 assert_eq!(
1330 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1331 1
1332 );
1333
1334 assert_eq!(file_metadata.num_rows(), 5);
1335 assert_eq!(file_metadata.version(), 1);
1336 assert_eq!(file_metadata.column_orders(), None);
1337
1338 let row_group_metadata = metadata.row_group(0);
1339
1340 for i in 0..row_group_metadata.num_columns() {
1342 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1343 }
1344
1345 let row_group_reader_result = reader.get_row_group(0);
1347 assert!(row_group_reader_result.is_ok());
1348 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1349 assert_eq!(
1350 row_group_reader.num_columns(),
1351 row_group_metadata.num_columns()
1352 );
1353 assert_eq!(
1354 row_group_reader.metadata().total_byte_size(),
1355 row_group_metadata.total_byte_size()
1356 );
1357
1358 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1361 assert!(page_reader_0_result.is_ok());
1362 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1363 let mut page_count = 0;
1364 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1365 let is_expected_page = match page {
1366 Page::DictionaryPage {
1367 buf,
1368 num_values,
1369 encoding,
1370 is_sorted,
1371 } => {
1372 assert_eq!(buf.len(), 7);
1373 assert_eq!(num_values, 1);
1374 assert_eq!(encoding, Encoding::PLAIN);
1375 assert!(!is_sorted);
1376 true
1377 }
1378 Page::DataPageV2 {
1379 buf,
1380 num_values,
1381 encoding,
1382 num_nulls,
1383 num_rows,
1384 def_levels_byte_len,
1385 rep_levels_byte_len,
1386 is_compressed,
1387 statistics,
1388 } => {
1389 assert_eq!(buf.len(), 4);
1390 assert_eq!(num_values, 5);
1391 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1392 assert_eq!(num_nulls, 1);
1393 assert_eq!(num_rows, 5);
1394 assert_eq!(def_levels_byte_len, 2);
1395 assert_eq!(rep_levels_byte_len, 0);
1396 assert!(is_compressed);
1397 assert!(statistics.is_some());
1398 true
1399 }
1400 _ => false,
1401 };
1402 assert!(is_expected_page);
1403 page_count += 1;
1404 }
1405 assert_eq!(page_count, 2);
1406 }
1407
1408 #[test]
1409 fn test_file_reader_empty_compressed_datapage_v2() {
1410 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1412 let reader_result = SerializedFileReader::new(test_file);
1413 assert!(reader_result.is_ok());
1414 let reader = reader_result.unwrap();
1415
1416 let metadata = reader.metadata();
1418 assert_eq!(metadata.num_row_groups(), 1);
1419
1420 let file_metadata = metadata.file_metadata();
1422 assert!(file_metadata.created_by().is_some());
1423 assert_eq!(
1424 file_metadata.created_by().unwrap(),
1425 "parquet-cpp-arrow version 14.0.2"
1426 );
1427 assert!(file_metadata.key_value_metadata().is_some());
1428 assert_eq!(
1429 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1430 1
1431 );
1432
1433 assert_eq!(file_metadata.num_rows(), 10);
1434 assert_eq!(file_metadata.version(), 2);
1435 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1436 assert_eq!(
1437 file_metadata.column_orders(),
1438 Some(vec![expected_order].as_ref())
1439 );
1440
1441 let row_group_metadata = metadata.row_group(0);
1442
1443 for i in 0..row_group_metadata.num_columns() {
1445 assert_eq!(file_metadata.column_order(i), expected_order);
1446 }
1447
1448 let row_group_reader_result = reader.get_row_group(0);
1450 assert!(row_group_reader_result.is_ok());
1451 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1452 assert_eq!(
1453 row_group_reader.num_columns(),
1454 row_group_metadata.num_columns()
1455 );
1456 assert_eq!(
1457 row_group_reader.metadata().total_byte_size(),
1458 row_group_metadata.total_byte_size()
1459 );
1460
1461 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1463 assert!(page_reader_0_result.is_ok());
1464 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1465 let mut page_count = 0;
1466 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1467 let is_expected_page = match page {
1468 Page::DictionaryPage {
1469 buf,
1470 num_values,
1471 encoding,
1472 is_sorted,
1473 } => {
1474 assert_eq!(buf.len(), 0);
1475 assert_eq!(num_values, 0);
1476 assert_eq!(encoding, Encoding::PLAIN);
1477 assert!(!is_sorted);
1478 true
1479 }
1480 Page::DataPageV2 {
1481 buf,
1482 num_values,
1483 encoding,
1484 num_nulls,
1485 num_rows,
1486 def_levels_byte_len,
1487 rep_levels_byte_len,
1488 is_compressed,
1489 statistics,
1490 } => {
1491 assert_eq!(buf.len(), 3);
1492 assert_eq!(num_values, 10);
1493 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1494 assert_eq!(num_nulls, 10);
1495 assert_eq!(num_rows, 10);
1496 assert_eq!(def_levels_byte_len, 2);
1497 assert_eq!(rep_levels_byte_len, 0);
1498 assert!(is_compressed);
1499 assert!(statistics.is_some());
1500 true
1501 }
1502 _ => false,
1503 };
1504 assert!(is_expected_page);
1505 page_count += 1;
1506 }
1507 assert_eq!(page_count, 2);
1508 }
1509
1510 #[test]
1511 fn test_file_reader_empty_datapage_v2() {
1512 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1514 let reader_result = SerializedFileReader::new(test_file);
1515 assert!(reader_result.is_ok());
1516 let reader = reader_result.unwrap();
1517
1518 let metadata = reader.metadata();
1520 assert_eq!(metadata.num_row_groups(), 1);
1521
1522 let file_metadata = metadata.file_metadata();
1524 assert!(file_metadata.created_by().is_some());
1525 assert_eq!(
1526 file_metadata.created_by().unwrap(),
1527 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1528 );
1529 assert!(file_metadata.key_value_metadata().is_some());
1530 assert_eq!(
1531 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1532 2
1533 );
1534
1535 assert_eq!(file_metadata.num_rows(), 1);
1536 assert_eq!(file_metadata.version(), 1);
1537 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1538 assert_eq!(
1539 file_metadata.column_orders(),
1540 Some(vec![expected_order].as_ref())
1541 );
1542
1543 let row_group_metadata = metadata.row_group(0);
1544
1545 for i in 0..row_group_metadata.num_columns() {
1547 assert_eq!(file_metadata.column_order(i), expected_order);
1548 }
1549
1550 let row_group_reader_result = reader.get_row_group(0);
1552 assert!(row_group_reader_result.is_ok());
1553 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1554 assert_eq!(
1555 row_group_reader.num_columns(),
1556 row_group_metadata.num_columns()
1557 );
1558 assert_eq!(
1559 row_group_reader.metadata().total_byte_size(),
1560 row_group_metadata.total_byte_size()
1561 );
1562
1563 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1565 assert!(page_reader_0_result.is_ok());
1566 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1567 let mut page_count = 0;
1568 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1569 let is_expected_page = match page {
1570 Page::DataPageV2 {
1571 buf,
1572 num_values,
1573 encoding,
1574 num_nulls,
1575 num_rows,
1576 def_levels_byte_len,
1577 rep_levels_byte_len,
1578 is_compressed,
1579 statistics,
1580 } => {
1581 assert_eq!(buf.len(), 2);
1582 assert_eq!(num_values, 1);
1583 assert_eq!(encoding, Encoding::PLAIN);
1584 assert_eq!(num_nulls, 1);
1585 assert_eq!(num_rows, 1);
1586 assert_eq!(def_levels_byte_len, 2);
1587 assert_eq!(rep_levels_byte_len, 0);
1588 assert!(is_compressed);
1589 assert!(statistics.is_none());
1590 true
1591 }
1592 _ => false,
1593 };
1594 assert!(is_expected_page);
1595 page_count += 1;
1596 }
1597 assert_eq!(page_count, 1);
1598 }
1599
1600 fn get_serialized_page_reader<R: ChunkReader>(
1601 file_reader: &SerializedFileReader<R>,
1602 row_group: usize,
1603 column: usize,
1604 ) -> Result<SerializedPageReader<R>> {
1605 let row_group = {
1606 let row_group_metadata = file_reader.metadata.row_group(row_group);
1607 let props = Arc::clone(&file_reader.props);
1608 let f = Arc::clone(&file_reader.chunk_reader);
1609 SerializedRowGroupReader::new(
1610 f,
1611 row_group_metadata,
1612 file_reader
1613 .metadata
1614 .offset_index()
1615 .map(|x| x[row_group].as_slice()),
1616 props,
1617 )?
1618 };
1619
1620 let col = row_group.metadata.column(column);
1621
1622 let page_locations = row_group
1623 .offset_index
1624 .map(|x| x[column].page_locations.clone());
1625
1626 let props = Arc::clone(&row_group.props);
1627 SerializedPageReader::new_with_properties(
1628 Arc::clone(&row_group.chunk_reader),
1629 col,
1630 usize::try_from(row_group.metadata.num_rows())?,
1631 page_locations,
1632 props,
1633 )
1634 }
1635
1636 #[test]
1637 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1638 let test_file = get_test_file("alltypes_plain.parquet");
1639 let reader = SerializedFileReader::new(test_file)?;
1640
1641 let mut offset_set = HashSet::new();
1642 let num_row_groups = reader.metadata.num_row_groups();
1643 for row_group in 0..num_row_groups {
1644 let num_columns = reader.metadata.row_group(row_group).num_columns();
1645 for column in 0..num_columns {
1646 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1647
1648 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1649 match &page_reader.state {
1650 SerializedPageReaderState::Pages {
1651 page_locations,
1652 dictionary_page,
1653 ..
1654 } => {
1655 if let Some(page) = dictionary_page {
1656 assert_eq!(page.offset as u64, page_offset);
1657 } else if let Some(page) = page_locations.front() {
1658 assert_eq!(page.offset as u64, page_offset);
1659 } else {
1660 unreachable!()
1661 }
1662 }
1663 SerializedPageReaderState::Values {
1664 offset,
1665 next_page_header,
1666 ..
1667 } => {
1668 assert!(next_page_header.is_some());
1669 assert_eq!(*offset, page_offset);
1670 }
1671 }
1672 let page = page_reader.get_next_page()?;
1673 assert!(page.is_some());
1674 let newly_inserted = offset_set.insert(page_offset);
1675 assert!(newly_inserted);
1676 }
1677 }
1678 }
1679
1680 Ok(())
1681 }
1682
1683 #[test]
1684 fn test_page_iterator() {
1685 let file = get_test_file("alltypes_plain.parquet");
1686 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1687
1688 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1689
1690 let page = page_iterator.next();
1692 assert!(page.is_some());
1693 assert!(page.unwrap().is_ok());
1694
1695 let page = page_iterator.next();
1697 assert!(page.is_none());
1698
1699 let row_group_indices = Box::new(0..1);
1700 let mut page_iterator =
1701 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1702
1703 let page = page_iterator.next();
1705 assert!(page.is_some());
1706 assert!(page.unwrap().is_ok());
1707
1708 let page = page_iterator.next();
1710 assert!(page.is_none());
1711 }
1712
1713 #[test]
1714 fn test_file_reader_key_value_metadata() {
1715 let file = get_test_file("binary.parquet");
1716 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1717
1718 let metadata = file_reader
1719 .metadata
1720 .file_metadata()
1721 .key_value_metadata()
1722 .unwrap();
1723
1724 assert_eq!(metadata.len(), 3);
1725
1726 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1727
1728 assert_eq!(metadata[1].key, "writer.model.name");
1729 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1730
1731 assert_eq!(metadata[2].key, "parquet.proto.class");
1732 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1733 }
1734
1735 #[test]
1736 fn test_file_reader_optional_metadata() {
1737 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1739 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1740
1741 let row_group_metadata = file_reader.metadata.row_group(0);
1742 let col0_metadata = row_group_metadata.column(0);
1743
1744 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1746
1747 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1749
1750 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1751 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1752 assert_eq!(page_encoding_stats.count, 1);
1753
1754 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1756 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1757
1758 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1760 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1761 }
1762
1763 #[test]
1764 fn test_file_reader_with_no_filter() -> Result<()> {
1765 let test_file = get_test_file("alltypes_plain.parquet");
1766 let origin_reader = SerializedFileReader::new(test_file)?;
1767 let metadata = origin_reader.metadata();
1769 assert_eq!(metadata.num_row_groups(), 1);
1770 Ok(())
1771 }
1772
1773 #[test]
1774 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1775 let test_file = get_test_file("alltypes_plain.parquet");
1776 let read_options = ReadOptionsBuilder::new()
1777 .with_predicate(Box::new(|_, _| false))
1778 .build();
1779 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1780 let metadata = reader.metadata();
1781 assert_eq!(metadata.num_row_groups(), 0);
1782 Ok(())
1783 }
1784
1785 #[test]
1786 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1787 let test_file = get_test_file("alltypes_plain.parquet");
1788 let origin_reader = SerializedFileReader::new(test_file)?;
1789 let metadata = origin_reader.metadata();
1791 assert_eq!(metadata.num_row_groups(), 1);
1792 let mid = get_midpoint_offset(metadata.row_group(0));
1793
1794 let test_file = get_test_file("alltypes_plain.parquet");
1795 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1796 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1797 let metadata = reader.metadata();
1798 assert_eq!(metadata.num_row_groups(), 1);
1799
1800 let test_file = get_test_file("alltypes_plain.parquet");
1801 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1802 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1803 let metadata = reader.metadata();
1804 assert_eq!(metadata.num_row_groups(), 0);
1805 Ok(())
1806 }
1807
1808 #[test]
1809 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1810 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1811 let origin_reader = SerializedFileReader::new(test_file)?;
1812 let metadata = origin_reader.metadata();
1813 let mid = get_midpoint_offset(metadata.row_group(0));
1814
1815 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1817 let read_options = ReadOptionsBuilder::new()
1818 .with_page_index()
1819 .with_predicate(Box::new(|_, _| true))
1820 .with_range(mid, mid + 1)
1821 .build();
1822 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1823 let metadata = reader.metadata();
1824 assert_eq!(metadata.num_row_groups(), 1);
1825 assert_eq!(metadata.column_index().unwrap().len(), 1);
1826 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1827
1828 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1830 let read_options = ReadOptionsBuilder::new()
1831 .with_page_index()
1832 .with_predicate(Box::new(|_, _| true))
1833 .with_range(0, mid)
1834 .build();
1835 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1836 let metadata = reader.metadata();
1837 assert_eq!(metadata.num_row_groups(), 0);
1838 assert!(metadata.column_index().is_none());
1839 assert!(metadata.offset_index().is_none());
1840
1841 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1843 let read_options = ReadOptionsBuilder::new()
1844 .with_page_index()
1845 .with_predicate(Box::new(|_, _| false))
1846 .with_range(mid, mid + 1)
1847 .build();
1848 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1849 let metadata = reader.metadata();
1850 assert_eq!(metadata.num_row_groups(), 0);
1851 assert!(metadata.column_index().is_none());
1852 assert!(metadata.offset_index().is_none());
1853
1854 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1856 let read_options = ReadOptionsBuilder::new()
1857 .with_page_index()
1858 .with_predicate(Box::new(|_, _| false))
1859 .with_range(0, mid)
1860 .build();
1861 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1862 let metadata = reader.metadata();
1863 assert_eq!(metadata.num_row_groups(), 0);
1864 assert!(metadata.column_index().is_none());
1865 assert!(metadata.offset_index().is_none());
1866 Ok(())
1867 }
1868
1869 #[test]
1870 fn test_file_reader_invalid_metadata() {
1871 let data = [
1872 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1873 80, 65, 82, 49,
1874 ];
1875 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1876 assert_eq!(
1877 ret.err().unwrap().to_string(),
1878 "Parquet error: Could not parse metadata: bad data"
1879 );
1880 }
1881
1882 #[test]
1883 fn test_page_index_reader() {
1900 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1901 let builder = ReadOptionsBuilder::new();
1902 let options = builder.with_page_index().build();
1904 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1905 let reader = reader_result.unwrap();
1906
1907 let metadata = reader.metadata();
1909 assert_eq!(metadata.num_row_groups(), 1);
1910
1911 let column_index = metadata.column_index().unwrap();
1912
1913 assert_eq!(column_index.len(), 1);
1915 let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1916 index
1917 } else {
1918 unreachable!()
1919 };
1920
1921 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1922 let index_in_pages = &index.indexes;
1923
1924 assert_eq!(index_in_pages.len(), 1);
1926
1927 let page0 = &index_in_pages[0];
1928 let min = page0.min.as_ref().unwrap();
1929 let max = page0.max.as_ref().unwrap();
1930 assert_eq!(b"Hello", min.as_bytes());
1931 assert_eq!(b"today", max.as_bytes());
1932
1933 let offset_indexes = metadata.offset_index().unwrap();
1934 assert_eq!(offset_indexes.len(), 1);
1936 let offset_index = &offset_indexes[0];
1937 let page_offset = &offset_index[0].page_locations()[0];
1938
1939 assert_eq!(4, page_offset.offset);
1940 assert_eq!(152, page_offset.compressed_page_size);
1941 assert_eq!(0, page_offset.first_row_index);
1942 }
1943
1944 #[test]
1945 #[allow(deprecated)]
1946 fn test_page_index_reader_out_of_order() {
1947 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1948 let options = ReadOptionsBuilder::new().with_page_index().build();
1949 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1950 let metadata = reader.metadata();
1951
1952 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1953 let columns = metadata.row_group(0).columns();
1954 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1955
1956 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1957 let mut b = read_columns_indexes(&test_file, &reversed)
1958 .unwrap()
1959 .unwrap();
1960 b.reverse();
1961 assert_eq!(a, b);
1962
1963 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1964 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1965 b.reverse();
1966 assert_eq!(a, b);
1967 }
1968
1969 #[test]
1970 fn test_page_index_reader_all_type() {
1971 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1972 let builder = ReadOptionsBuilder::new();
1973 let options = builder.with_page_index().build();
1975 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1976 let reader = reader_result.unwrap();
1977
1978 let metadata = reader.metadata();
1980 assert_eq!(metadata.num_row_groups(), 1);
1981
1982 let column_index = metadata.column_index().unwrap();
1983 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1984
1985 assert_eq!(column_index.len(), 1);
1987 let row_group_metadata = metadata.row_group(0);
1988
1989 assert!(!&column_index[0][0].is_sorted());
1991 let boundary_order = &column_index[0][0].get_boundary_order();
1992 assert!(boundary_order.is_some());
1993 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1994 if let Index::INT32(index) = &column_index[0][0] {
1995 check_native_page_index(
1996 index,
1997 325,
1998 get_row_group_min_max_bytes(row_group_metadata, 0),
1999 BoundaryOrder::UNORDERED,
2000 );
2001 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2002 } else {
2003 unreachable!()
2004 };
2005 assert!(&column_index[0][1].is_sorted());
2007 if let Index::BOOLEAN(index) = &column_index[0][1] {
2008 assert_eq!(index.indexes.len(), 82);
2009 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2010 } else {
2011 unreachable!()
2012 };
2013 assert!(&column_index[0][2].is_sorted());
2015 if let Index::INT32(index) = &column_index[0][2] {
2016 check_native_page_index(
2017 index,
2018 325,
2019 get_row_group_min_max_bytes(row_group_metadata, 2),
2020 BoundaryOrder::ASCENDING,
2021 );
2022 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2023 } else {
2024 unreachable!()
2025 };
2026 assert!(&column_index[0][3].is_sorted());
2028 if let Index::INT32(index) = &column_index[0][3] {
2029 check_native_page_index(
2030 index,
2031 325,
2032 get_row_group_min_max_bytes(row_group_metadata, 3),
2033 BoundaryOrder::ASCENDING,
2034 );
2035 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2036 } else {
2037 unreachable!()
2038 };
2039 assert!(&column_index[0][4].is_sorted());
2041 if let Index::INT32(index) = &column_index[0][4] {
2042 check_native_page_index(
2043 index,
2044 325,
2045 get_row_group_min_max_bytes(row_group_metadata, 4),
2046 BoundaryOrder::ASCENDING,
2047 );
2048 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2049 } else {
2050 unreachable!()
2051 };
2052 assert!(!&column_index[0][5].is_sorted());
2054 if let Index::INT64(index) = &column_index[0][5] {
2055 check_native_page_index(
2056 index,
2057 528,
2058 get_row_group_min_max_bytes(row_group_metadata, 5),
2059 BoundaryOrder::UNORDERED,
2060 );
2061 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2062 } else {
2063 unreachable!()
2064 };
2065 assert!(&column_index[0][6].is_sorted());
2067 if let Index::FLOAT(index) = &column_index[0][6] {
2068 check_native_page_index(
2069 index,
2070 325,
2071 get_row_group_min_max_bytes(row_group_metadata, 6),
2072 BoundaryOrder::ASCENDING,
2073 );
2074 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2075 } else {
2076 unreachable!()
2077 };
2078 assert!(!&column_index[0][7].is_sorted());
2080 if let Index::DOUBLE(index) = &column_index[0][7] {
2081 check_native_page_index(
2082 index,
2083 528,
2084 get_row_group_min_max_bytes(row_group_metadata, 7),
2085 BoundaryOrder::UNORDERED,
2086 );
2087 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2088 } else {
2089 unreachable!()
2090 };
2091 assert!(!&column_index[0][8].is_sorted());
2093 if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2094 check_native_page_index(
2095 index,
2096 974,
2097 get_row_group_min_max_bytes(row_group_metadata, 8),
2098 BoundaryOrder::UNORDERED,
2099 );
2100 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2101 } else {
2102 unreachable!()
2103 };
2104 assert!(&column_index[0][9].is_sorted());
2106 if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2107 check_native_page_index(
2108 index,
2109 352,
2110 get_row_group_min_max_bytes(row_group_metadata, 9),
2111 BoundaryOrder::ASCENDING,
2112 );
2113 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2114 } else {
2115 unreachable!()
2116 };
2117 assert!(!&column_index[0][10].is_sorted());
2120 if let Index::NONE = &column_index[0][10] {
2121 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2122 } else {
2123 unreachable!()
2124 };
2125 assert!(&column_index[0][11].is_sorted());
2127 if let Index::INT32(index) = &column_index[0][11] {
2128 check_native_page_index(
2129 index,
2130 325,
2131 get_row_group_min_max_bytes(row_group_metadata, 11),
2132 BoundaryOrder::ASCENDING,
2133 );
2134 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2135 } else {
2136 unreachable!()
2137 };
2138 assert!(!&column_index[0][12].is_sorted());
2140 if let Index::INT32(index) = &column_index[0][12] {
2141 check_native_page_index(
2142 index,
2143 325,
2144 get_row_group_min_max_bytes(row_group_metadata, 12),
2145 BoundaryOrder::UNORDERED,
2146 );
2147 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2148 } else {
2149 unreachable!()
2150 };
2151 }
2152
2153 fn check_native_page_index<T: ParquetValueType>(
2154 row_group_index: &NativeIndex<T>,
2155 page_size: usize,
2156 min_max: (&[u8], &[u8]),
2157 boundary_order: BoundaryOrder,
2158 ) {
2159 assert_eq!(row_group_index.indexes.len(), page_size);
2160 assert_eq!(row_group_index.boundary_order, boundary_order);
2161 row_group_index.indexes.iter().all(|x| {
2162 x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2163 && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2164 });
2165 }
2166
2167 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2168 let statistics = r.column(col_num).statistics().unwrap();
2169 (
2170 statistics.min_bytes_opt().unwrap_or_default(),
2171 statistics.max_bytes_opt().unwrap_or_default(),
2172 )
2173 }
2174
2175 #[test]
2176 fn test_skip_next_page_with_dictionary_page() {
2177 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2178 let builder = ReadOptionsBuilder::new();
2179 let options = builder.with_page_index().build();
2181 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2182 let reader = reader_result.unwrap();
2183
2184 let row_group_reader = reader.get_row_group(0).unwrap();
2185
2186 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2188
2189 let mut vec = vec![];
2190
2191 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2193 assert!(meta.is_dict);
2194
2195 column_page_reader.skip_next_page().unwrap();
2197
2198 let page = column_page_reader.get_next_page().unwrap().unwrap();
2200 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2201
2202 for _i in 0..351 {
2204 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2206 assert!(!meta.is_dict); vec.push(meta);
2208
2209 let page = column_page_reader.get_next_page().unwrap().unwrap();
2210 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2211 }
2212
2213 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2215 assert!(column_page_reader.get_next_page().unwrap().is_none());
2216
2217 assert_eq!(vec.len(), 351);
2219 }
2220
2221 #[test]
2222 fn test_skip_page_with_offset_index() {
2223 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2224 let builder = ReadOptionsBuilder::new();
2225 let options = builder.with_page_index().build();
2227 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2228 let reader = reader_result.unwrap();
2229
2230 let row_group_reader = reader.get_row_group(0).unwrap();
2231
2232 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2234
2235 let mut vec = vec![];
2236
2237 for i in 0..325 {
2238 if i % 2 == 0 {
2239 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2240 } else {
2241 column_page_reader.skip_next_page().unwrap();
2242 }
2243 }
2244 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2246 assert!(column_page_reader.get_next_page().unwrap().is_none());
2247
2248 assert_eq!(vec.len(), 163);
2249 }
2250
2251 #[test]
2252 fn test_skip_page_without_offset_index() {
2253 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2254
2255 let reader_result = SerializedFileReader::new(test_file);
2257 let reader = reader_result.unwrap();
2258
2259 let row_group_reader = reader.get_row_group(0).unwrap();
2260
2261 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2263
2264 let mut vec = vec![];
2265
2266 for i in 0..325 {
2267 if i % 2 == 0 {
2268 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2269 } else {
2270 column_page_reader.peek_next_page().unwrap().unwrap();
2271 column_page_reader.skip_next_page().unwrap();
2272 }
2273 }
2274 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2276 assert!(column_page_reader.get_next_page().unwrap().is_none());
2277
2278 assert_eq!(vec.len(), 163);
2279 }
2280
2281 #[test]
2282 fn test_peek_page_with_dictionary_page() {
2283 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2284 let builder = ReadOptionsBuilder::new();
2285 let options = builder.with_page_index().build();
2287 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2288 let reader = reader_result.unwrap();
2289 let row_group_reader = reader.get_row_group(0).unwrap();
2290
2291 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2293
2294 let mut vec = vec![];
2295
2296 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2297 assert!(meta.is_dict);
2298 let page = column_page_reader.get_next_page().unwrap().unwrap();
2299 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2300
2301 for i in 0..352 {
2302 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2303 if i != 351 {
2306 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2307 } else {
2308 assert_eq!(meta.num_rows, Some(10));
2311 }
2312 assert!(!meta.is_dict);
2313 vec.push(meta);
2314 let page = column_page_reader.get_next_page().unwrap().unwrap();
2315 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2316 }
2317
2318 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2320 assert!(column_page_reader.get_next_page().unwrap().is_none());
2321
2322 assert_eq!(vec.len(), 352);
2323 }
2324
2325 #[test]
2326 fn test_peek_page_with_dictionary_page_without_offset_index() {
2327 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2328
2329 let reader_result = SerializedFileReader::new(test_file);
2330 let reader = reader_result.unwrap();
2331 let row_group_reader = reader.get_row_group(0).unwrap();
2332
2333 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2335
2336 let mut vec = vec![];
2337
2338 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2339 assert!(meta.is_dict);
2340 let page = column_page_reader.get_next_page().unwrap().unwrap();
2341 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2342
2343 for i in 0..352 {
2344 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2345 if i != 351 {
2348 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2349 } else {
2350 assert_eq!(meta.num_levels, Some(10));
2353 }
2354 assert!(!meta.is_dict);
2355 vec.push(meta);
2356 let page = column_page_reader.get_next_page().unwrap().unwrap();
2357 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2358 }
2359
2360 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2362 assert!(column_page_reader.get_next_page().unwrap().is_none());
2363
2364 assert_eq!(vec.len(), 352);
2365 }
2366
2367 #[test]
2368 fn test_fixed_length_index() {
2369 let message_type = "
2370 message test_schema {
2371 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2372 }
2373 ";
2374
2375 let schema = parse_message_type(message_type).unwrap();
2376 let mut out = Vec::with_capacity(1024);
2377 let mut writer =
2378 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2379
2380 let mut r = writer.next_row_group().unwrap();
2381 let mut c = r.next_column().unwrap().unwrap();
2382 c.typed::<FixedLenByteArrayType>()
2383 .write_batch(
2384 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2385 Some(&[1, 1, 0, 1]),
2386 None,
2387 )
2388 .unwrap();
2389 c.close().unwrap();
2390 r.close().unwrap();
2391 writer.close().unwrap();
2392
2393 let b = Bytes::from(out);
2394 let options = ReadOptionsBuilder::new().with_page_index().build();
2395 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2396 let index = reader.metadata().column_index().unwrap();
2397
2398 assert_eq!(index.len(), 1);
2400 let c = &index[0];
2401 assert_eq!(c.len(), 1);
2403
2404 match &c[0] {
2405 Index::FIXED_LEN_BYTE_ARRAY(v) => {
2406 assert_eq!(v.indexes.len(), 1);
2407 let page_idx = &v.indexes[0];
2408 assert_eq!(page_idx.null_count.unwrap(), 1);
2409 assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2410 assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2411 }
2412 _ => unreachable!(),
2413 }
2414 }
2415
2416 #[test]
2417 fn test_multi_gz() {
2418 let file = get_test_file("concatenated_gzip_members.parquet");
2419 let reader = SerializedFileReader::new(file).unwrap();
2420 let row_group_reader = reader.get_row_group(0).unwrap();
2421 match row_group_reader.get_column_reader(0).unwrap() {
2422 ColumnReader::Int64ColumnReader(mut reader) => {
2423 let mut buffer = Vec::with_capacity(1024);
2424 let mut def_levels = Vec::with_capacity(1024);
2425 let (num_records, num_values, num_levels) = reader
2426 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2427 .unwrap();
2428
2429 assert_eq!(num_records, 513);
2430 assert_eq!(num_values, 513);
2431 assert_eq!(num_levels, 513);
2432
2433 let expected: Vec<i64> = (1..514).collect();
2434 assert_eq!(&buffer, &expected);
2435 }
2436 _ => unreachable!(),
2437 }
2438 }
2439
2440 #[test]
2441 fn test_byte_stream_split_extended() {
2442 let path = format!(
2443 "{}/byte_stream_split_extended.gzip.parquet",
2444 arrow::util::test_util::parquet_test_data(),
2445 );
2446 let file = File::open(path).unwrap();
2447 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2448
2449 let mut iter = reader
2451 .get_row_iter(None)
2452 .expect("Failed to create row iterator");
2453
2454 let mut start = 0;
2455 let end = reader.metadata().file_metadata().num_rows();
2456
2457 let check_row = |row: Result<Row, ParquetError>| {
2458 assert!(row.is_ok());
2459 let r = row.unwrap();
2460 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2461 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2462 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2463 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2464 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2465 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2466 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2467 };
2468
2469 while start < end {
2470 match iter.next() {
2471 Some(row) => check_row(row),
2472 None => break,
2473 };
2474 start += 1;
2475 }
2476 }
2477
2478 #[test]
2479 fn test_filtered_rowgroup_metadata() {
2480 let message_type = "
2481 message test_schema {
2482 REQUIRED INT32 a;
2483 }
2484 ";
2485 let schema = Arc::new(parse_message_type(message_type).unwrap());
2486 let props = Arc::new(
2487 WriterProperties::builder()
2488 .set_statistics_enabled(EnabledStatistics::Page)
2489 .build(),
2490 );
2491 let mut file: File = tempfile::tempfile().unwrap();
2492 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2493 let data = [1, 2, 3, 4, 5];
2494
2495 for idx in 0..5 {
2497 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2498 let mut row_group_writer = file_writer.next_row_group().unwrap();
2499 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2500 writer
2501 .typed::<Int32Type>()
2502 .write_batch(data_i.as_slice(), None, None)
2503 .unwrap();
2504 writer.close().unwrap();
2505 }
2506 row_group_writer.close().unwrap();
2507 file_writer.flushed_row_groups();
2508 }
2509 let file_metadata = file_writer.close().unwrap();
2510
2511 assert_eq!(file_metadata.num_rows, 25);
2512 assert_eq!(file_metadata.row_groups.len(), 5);
2513
2514 let read_options = ReadOptionsBuilder::new()
2516 .with_page_index()
2517 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2518 .build();
2519 let reader =
2520 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2521 .unwrap();
2522 let metadata = reader.metadata();
2523
2524 assert_eq!(metadata.num_row_groups(), 1);
2526 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2527
2528 assert!(metadata.column_index().is_some());
2530 assert!(metadata.offset_index().is_some());
2531 assert_eq!(metadata.column_index().unwrap().len(), 1);
2532 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2533 let col_idx = metadata.column_index().unwrap();
2534 let off_idx = metadata.offset_index().unwrap();
2535 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2536 let pg_idx = &col_idx[0][0];
2537 let off_idx_i = &off_idx[0][0];
2538
2539 match pg_idx {
2541 Index::INT32(int_idx) => {
2542 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2543 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2544 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2545 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2546 }
2547 _ => panic!("wrong stats type"),
2548 }
2549
2550 assert_eq!(
2552 off_idx_i.page_locations[0].offset,
2553 metadata.row_group(0).column(0).data_page_offset()
2554 );
2555
2556 let read_options = ReadOptionsBuilder::new()
2558 .with_page_index()
2559 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2560 .build();
2561 let reader =
2562 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2563 .unwrap();
2564 let metadata = reader.metadata();
2565
2566 assert_eq!(metadata.num_row_groups(), 2);
2568 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2569 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2570
2571 assert!(metadata.column_index().is_some());
2573 assert!(metadata.offset_index().is_some());
2574 assert_eq!(metadata.column_index().unwrap().len(), 2);
2575 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2576 let col_idx = metadata.column_index().unwrap();
2577 let off_idx = metadata.offset_index().unwrap();
2578
2579 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2580 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2581 let pg_idx = &col_idx_i[0];
2582 let off_idx_i = &off_idx[i][0];
2583
2584 match pg_idx {
2586 Index::INT32(int_idx) => {
2587 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2588 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2589 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2590 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2591 }
2592 _ => panic!("wrong stats type"),
2593 }
2594
2595 assert_eq!(
2597 off_idx_i.page_locations[0].offset,
2598 metadata.row_group(i).column(0).data_page_offset()
2599 );
2600 }
2601 }
2602}