1use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30 metadata::*,
31 properties::{ReaderProperties, ReaderPropertiesPtr},
32 reader::*,
33 statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39#[cfg(feature = "encryption")]
40use crate::thrift::TCompactSliceInputProtocol;
41use crate::thrift::TSerializable;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::iter;
45use std::{fs::File, io::Read, path::Path, sync::Arc};
46use thrift::protocol::TCompactInputProtocol;
47
48impl TryFrom<File> for SerializedFileReader<File> {
49 type Error = ParquetError;
50
51 fn try_from(file: File) -> Result<Self> {
52 Self::new(file)
53 }
54}
55
56impl TryFrom<&Path> for SerializedFileReader<File> {
57 type Error = ParquetError;
58
59 fn try_from(path: &Path) -> Result<Self> {
60 let file = File::open(path)?;
61 Self::try_from(file)
62 }
63}
64
65impl TryFrom<String> for SerializedFileReader<File> {
66 type Error = ParquetError;
67
68 fn try_from(path: String) -> Result<Self> {
69 Self::try_from(Path::new(&path))
70 }
71}
72
73impl TryFrom<&str> for SerializedFileReader<File> {
74 type Error = ParquetError;
75
76 fn try_from(path: &str) -> Result<Self> {
77 Self::try_from(Path::new(&path))
78 }
79}
80
81impl IntoIterator for SerializedFileReader<File> {
84 type Item = Result<Row>;
85 type IntoIter = RowIter<'static>;
86
87 fn into_iter(self) -> Self::IntoIter {
88 RowIter::from_file_into(Box::new(self))
89 }
90}
91
92pub struct SerializedFileReader<R: ChunkReader> {
97 chunk_reader: Arc<R>,
98 metadata: Arc<ParquetMetaData>,
99 props: ReaderPropertiesPtr,
100}
101
102pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
106
107#[derive(Default)]
111pub struct ReadOptionsBuilder {
112 predicates: Vec<ReadGroupPredicate>,
113 enable_page_index: bool,
114 props: Option<ReaderProperties>,
115}
116
117impl ReadOptionsBuilder {
118 pub fn new() -> Self {
120 Self::default()
121 }
122
123 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
126 self.predicates.push(predicate);
127 self
128 }
129
130 pub fn with_range(mut self, start: i64, end: i64) -> Self {
133 assert!(start < end);
134 let predicate = move |rg: &RowGroupMetaData, _: usize| {
135 let mid = get_midpoint_offset(rg);
136 mid >= start && mid < end
137 };
138 self.predicates.push(Box::new(predicate));
139 self
140 }
141
142 pub fn with_page_index(mut self) -> Self {
147 self.enable_page_index = true;
148 self
149 }
150
151 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
153 self.props = Some(properties);
154 self
155 }
156
157 pub fn build(self) -> ReadOptions {
159 let props = self
160 .props
161 .unwrap_or_else(|| ReaderProperties::builder().build());
162 ReadOptions {
163 predicates: self.predicates,
164 enable_page_index: self.enable_page_index,
165 props,
166 }
167 }
168}
169
170pub struct ReadOptions {
175 predicates: Vec<ReadGroupPredicate>,
176 enable_page_index: bool,
177 props: ReaderProperties,
178}
179
180impl<R: 'static + ChunkReader> SerializedFileReader<R> {
181 pub fn new(chunk_reader: R) -> Result<Self> {
184 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
185 let props = Arc::new(ReaderProperties::builder().build());
186 Ok(Self {
187 chunk_reader: Arc::new(chunk_reader),
188 metadata: Arc::new(metadata),
189 props,
190 })
191 }
192
193 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
196 let mut metadata_builder = ParquetMetaDataReader::new()
197 .parse_and_finish(&chunk_reader)?
198 .into_builder();
199 let mut predicates = options.predicates;
200
201 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
203 let mut keep = true;
204 for predicate in &mut predicates {
205 if !predicate(&rg_meta, i) {
206 keep = false;
207 break;
208 }
209 }
210 if keep {
211 metadata_builder = metadata_builder.add_row_group(rg_meta);
212 }
213 }
214
215 let mut metadata = metadata_builder.build();
216
217 if options.enable_page_index {
219 let mut reader =
220 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
221 reader.read_page_indexes(&chunk_reader)?;
222 metadata = reader.finish()?;
223 }
224
225 Ok(Self {
226 chunk_reader: Arc::new(chunk_reader),
227 metadata: Arc::new(metadata),
228 props: Arc::new(options.props),
229 })
230 }
231}
232
233fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
235 let col = meta.column(0);
236 let mut offset = col.data_page_offset();
237 if let Some(dic_offset) = col.dictionary_page_offset() {
238 if offset > dic_offset {
239 offset = dic_offset
240 }
241 };
242 offset + meta.compressed_size() / 2
243}
244
245impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
246 fn metadata(&self) -> &ParquetMetaData {
247 &self.metadata
248 }
249
250 fn num_row_groups(&self) -> usize {
251 self.metadata.num_row_groups()
252 }
253
254 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
255 let row_group_metadata = self.metadata.row_group(i);
256 let props = Arc::clone(&self.props);
258 let f = Arc::clone(&self.chunk_reader);
259 Ok(Box::new(SerializedRowGroupReader::new(
260 f,
261 row_group_metadata,
262 self.metadata.offset_index().map(|x| x[i].as_slice()),
263 props,
264 )?))
265 }
266
267 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
268 RowIter::from_file(projection, self)
269 }
270}
271
272pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
274 chunk_reader: Arc<R>,
275 metadata: &'a RowGroupMetaData,
276 offset_index: Option<&'a [OffsetIndexMetaData]>,
277 props: ReaderPropertiesPtr,
278 bloom_filters: Vec<Option<Sbbf>>,
279}
280
281impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
282 pub fn new(
284 chunk_reader: Arc<R>,
285 metadata: &'a RowGroupMetaData,
286 offset_index: Option<&'a [OffsetIndexMetaData]>,
287 props: ReaderPropertiesPtr,
288 ) -> Result<Self> {
289 let bloom_filters = if props.read_bloom_filter() {
290 metadata
291 .columns()
292 .iter()
293 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
294 .collect::<Result<Vec<_>>>()?
295 } else {
296 iter::repeat(None).take(metadata.columns().len()).collect()
297 };
298 Ok(Self {
299 chunk_reader,
300 metadata,
301 offset_index,
302 props,
303 bloom_filters,
304 })
305 }
306}
307
308impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
309 fn metadata(&self) -> &RowGroupMetaData {
310 self.metadata
311 }
312
313 fn num_columns(&self) -> usize {
314 self.metadata.num_columns()
315 }
316
317 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
319 let col = self.metadata.column(i);
320
321 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
322
323 let props = Arc::clone(&self.props);
324 Ok(Box::new(SerializedPageReader::new_with_properties(
325 Arc::clone(&self.chunk_reader),
326 col,
327 usize::try_from(self.metadata.num_rows())?,
328 page_locations,
329 props,
330 )?))
331 }
332
333 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
335 self.bloom_filters[i].as_ref()
336 }
337
338 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
339 RowIter::from_row_group(projection, self)
340 }
341}
342
343pub(crate) fn decode_page(
345 page_header: PageHeader,
346 buffer: Bytes,
347 physical_type: Type,
348 decompressor: Option<&mut Box<dyn Codec>>,
349) -> Result<Page> {
350 #[cfg(feature = "crc")]
352 if let Some(expected_crc) = page_header.crc {
353 let crc = crc32fast::hash(&buffer);
354 if crc != expected_crc as u32 {
355 return Err(general_err!("Page CRC checksum mismatch"));
356 }
357 }
358
359 let mut offset: usize = 0;
366 let mut can_decompress = true;
367
368 if let Some(ref header_v2) = page_header.data_page_header_v2 {
369 if header_v2.definition_levels_byte_length < 0
370 || header_v2.repetition_levels_byte_length < 0
371 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
372 > page_header.uncompressed_page_size
373 {
374 return Err(general_err!(
375 "DataPage v2 header contains implausible values \
376 for definition_levels_byte_length ({}) \
377 and repetition_levels_byte_length ({}) \
378 given DataPage header provides uncompressed_page_size ({})",
379 header_v2.definition_levels_byte_length,
380 header_v2.repetition_levels_byte_length,
381 page_header.uncompressed_page_size
382 ));
383 }
384 offset = usize::try_from(
385 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
386 )?;
387 can_decompress = header_v2.is_compressed.unwrap_or(true);
389 }
390
391 let buffer = match decompressor {
394 Some(decompressor) if can_decompress => {
395 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
396 let decompressed_size = uncompressed_page_size - offset;
397 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
398 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
399 if decompressed_size > 0 {
400 let compressed = &buffer.as_ref()[offset..];
401 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
402 }
403
404 if decompressed.len() != uncompressed_page_size {
405 return Err(general_err!(
406 "Actual decompressed size doesn't match the expected one ({} vs {})",
407 decompressed.len(),
408 uncompressed_page_size
409 ));
410 }
411
412 Bytes::from(decompressed)
413 }
414 _ => buffer,
415 };
416
417 let result = match page_header.type_ {
418 PageType::DICTIONARY_PAGE => {
419 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
420 ParquetError::General("Missing dictionary page header".to_string())
421 })?;
422 let is_sorted = dict_header.is_sorted.unwrap_or(false);
423 Page::DictionaryPage {
424 buf: buffer,
425 num_values: dict_header.num_values.try_into()?,
426 encoding: Encoding::try_from(dict_header.encoding)?,
427 is_sorted,
428 }
429 }
430 PageType::DATA_PAGE => {
431 let header = page_header
432 .data_page_header
433 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
434 Page::DataPage {
435 buf: buffer,
436 num_values: header.num_values.try_into()?,
437 encoding: Encoding::try_from(header.encoding)?,
438 def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
439 rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
440 statistics: statistics::from_thrift(physical_type, header.statistics)?,
441 }
442 }
443 PageType::DATA_PAGE_V2 => {
444 let header = page_header
445 .data_page_header_v2
446 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
447 let is_compressed = header.is_compressed.unwrap_or(true);
448 Page::DataPageV2 {
449 buf: buffer,
450 num_values: header.num_values.try_into()?,
451 encoding: Encoding::try_from(header.encoding)?,
452 num_nulls: header.num_nulls.try_into()?,
453 num_rows: header.num_rows.try_into()?,
454 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
455 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
456 is_compressed,
457 statistics: statistics::from_thrift(physical_type, header.statistics)?,
458 }
459 }
460 _ => {
461 unimplemented!("Page type {:?} is not supported", page_header.type_)
463 }
464 };
465
466 Ok(result)
467}
468
469enum SerializedPageReaderState {
470 Values {
471 offset: usize,
473
474 remaining_bytes: usize,
476
477 next_page_header: Option<Box<PageHeader>>,
479
480 page_index: usize,
482
483 require_dictionary: bool,
485 },
486 Pages {
487 page_locations: VecDeque<PageLocation>,
489 dictionary_page: Option<PageLocation>,
491 total_rows: usize,
493 page_index: usize,
495 },
496}
497
498#[derive(Default)]
499struct SerializedPageReaderContext {
500 #[cfg(feature = "encryption")]
502 crypto_context: Option<Arc<CryptoContext>>,
503}
504
505pub struct SerializedPageReader<R: ChunkReader> {
507 reader: Arc<R>,
509
510 decompressor: Option<Box<dyn Codec>>,
512
513 physical_type: Type,
515
516 state: SerializedPageReaderState,
517
518 context: SerializedPageReaderContext,
519}
520
521impl<R: ChunkReader> SerializedPageReader<R> {
522 pub fn new(
524 reader: Arc<R>,
525 column_chunk_metadata: &ColumnChunkMetaData,
526 total_rows: usize,
527 page_locations: Option<Vec<PageLocation>>,
528 ) -> Result<Self> {
529 let props = Arc::new(ReaderProperties::builder().build());
530 SerializedPageReader::new_with_properties(
531 reader,
532 column_chunk_metadata,
533 total_rows,
534 page_locations,
535 props,
536 )
537 }
538
539 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
541 pub(crate) fn add_crypto_context(
542 self,
543 _rg_idx: usize,
544 _column_idx: usize,
545 _parquet_meta_data: &ParquetMetaData,
546 _column_chunk_metadata: &ColumnChunkMetaData,
547 ) -> Result<SerializedPageReader<R>> {
548 Ok(self)
549 }
550
551 #[cfg(feature = "encryption")]
553 pub(crate) fn add_crypto_context(
554 mut self,
555 rg_idx: usize,
556 column_idx: usize,
557 parquet_meta_data: &ParquetMetaData,
558 column_chunk_metadata: &ColumnChunkMetaData,
559 ) -> Result<SerializedPageReader<R>> {
560 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
561 return Ok(self);
562 };
563 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
564 return Ok(self);
565 };
566 let crypto_context =
567 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
568 self.context.crypto_context = Some(Arc::new(crypto_context));
569 Ok(self)
570 }
571
572 pub fn new_with_properties(
574 reader: Arc<R>,
575 meta: &ColumnChunkMetaData,
576 total_rows: usize,
577 page_locations: Option<Vec<PageLocation>>,
578 props: ReaderPropertiesPtr,
579 ) -> Result<Self> {
580 let decompressor = create_codec(meta.compression(), props.codec_options())?;
581 let (start, len) = meta.byte_range();
582
583 let state = match page_locations {
584 Some(locations) => {
585 let dictionary_page = match locations.first() {
588 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
589 offset: start as i64,
590 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
591 first_row_index: 0,
592 }),
593 _ => None,
594 };
595
596 SerializedPageReaderState::Pages {
597 page_locations: locations.into(),
598 dictionary_page,
599 total_rows,
600 page_index: 0,
601 }
602 }
603 None => SerializedPageReaderState::Values {
604 offset: usize::try_from(start)?,
605 remaining_bytes: usize::try_from(len)?,
606 next_page_header: None,
607 page_index: 0,
608 require_dictionary: meta.dictionary_page_offset().is_some(),
609 },
610 };
611 Ok(Self {
612 reader,
613 decompressor,
614 state,
615 physical_type: meta.column_type(),
616 context: Default::default(),
617 })
618 }
619
620 #[cfg(test)]
626 fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
627 match &mut self.state {
628 SerializedPageReaderState::Values {
629 offset,
630 remaining_bytes,
631 next_page_header,
632 page_index,
633 require_dictionary,
634 } => {
635 loop {
636 if *remaining_bytes == 0 {
637 return Ok(None);
638 }
639 return if let Some(header) = next_page_header.as_ref() {
640 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
641 Ok(Some(*offset))
642 } else {
643 *next_page_header = None;
645 continue;
646 }
647 } else {
648 let mut read = self.reader.get_read(*offset as u64)?;
649 let (header_len, header) = Self::read_page_header_len(
650 &self.context,
651 &mut read,
652 *page_index,
653 *require_dictionary,
654 )?;
655 *offset += header_len;
656 *remaining_bytes -= header_len;
657 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
658 Ok(Some(*offset))
659 } else {
660 continue;
662 };
663 *next_page_header = Some(Box::new(header));
664 page_meta
665 };
666 }
667 }
668 SerializedPageReaderState::Pages {
669 page_locations,
670 dictionary_page,
671 ..
672 } => {
673 if let Some(page) = dictionary_page {
674 Ok(Some(usize::try_from(page.offset)?))
675 } else if let Some(page) = page_locations.front() {
676 Ok(Some(usize::try_from(page.offset)?))
677 } else {
678 Ok(None)
679 }
680 }
681 }
682 }
683
684 fn read_page_header_len<T: Read>(
685 context: &SerializedPageReaderContext,
686 input: &mut T,
687 page_index: usize,
688 dictionary_page: bool,
689 ) -> Result<(usize, PageHeader)> {
690 struct TrackedRead<R> {
692 inner: R,
693 bytes_read: usize,
694 }
695
696 impl<R: Read> Read for TrackedRead<R> {
697 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
698 let v = self.inner.read(buf)?;
699 self.bytes_read += v;
700 Ok(v)
701 }
702 }
703
704 let mut tracked = TrackedRead {
705 inner: input,
706 bytes_read: 0,
707 };
708 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
709 Ok((tracked.bytes_read, header))
710 }
711
712 fn read_page_header_len_from_bytes(
713 context: &SerializedPageReaderContext,
714 buffer: &[u8],
715 page_index: usize,
716 dictionary_page: bool,
717 ) -> Result<(usize, PageHeader)> {
718 let mut input = std::io::Cursor::new(buffer);
719 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
720 let header_len = input.position() as usize;
721 Ok((header_len, header))
722 }
723}
724
725#[cfg(not(feature = "encryption"))]
726impl SerializedPageReaderContext {
727 fn read_page_header<T: Read>(
728 &self,
729 input: &mut T,
730 _page_index: usize,
731 _dictionary_page: bool,
732 ) -> Result<PageHeader> {
733 let mut prot = TCompactInputProtocol::new(input);
734 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
735 }
736
737 fn decrypt_page_data<T>(
738 &self,
739 buffer: T,
740 _page_index: usize,
741 _dictionary_page: bool,
742 ) -> Result<T> {
743 Ok(buffer)
744 }
745}
746
747#[cfg(feature = "encryption")]
748impl SerializedPageReaderContext {
749 fn read_page_header<T: Read>(
750 &self,
751 input: &mut T,
752 page_index: usize,
753 dictionary_page: bool,
754 ) -> Result<PageHeader> {
755 match self.page_crypto_context(page_index, dictionary_page) {
756 None => {
757 let mut prot = TCompactInputProtocol::new(input);
758 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
759 }
760 Some(page_crypto_context) => {
761 let data_decryptor = page_crypto_context.data_decryptor();
762 let aad = page_crypto_context.create_page_header_aad()?;
763
764 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
765 ParquetError::General(format!(
766 "Error decrypting page header for column {}, decryption key may be wrong",
767 page_crypto_context.column_ordinal
768 ))
769 })?;
770
771 let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
772 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
773 }
774 }
775 }
776
777 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
778 where
779 T: AsRef<[u8]>,
780 T: From<Vec<u8>>,
781 {
782 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
783 if let Some(page_crypto_context) = page_crypto_context {
784 let decryptor = page_crypto_context.data_decryptor();
785 let aad = page_crypto_context.create_page_aad()?;
786 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
787 Ok(T::from(decrypted))
788 } else {
789 Ok(buffer)
790 }
791 }
792
793 fn page_crypto_context(
794 &self,
795 page_index: usize,
796 dictionary_page: bool,
797 ) -> Option<Arc<CryptoContext>> {
798 self.crypto_context.as_ref().map(|c| {
799 Arc::new(if dictionary_page {
800 c.for_dictionary_page()
801 } else {
802 c.with_page_ordinal(page_index)
803 })
804 })
805 }
806}
807
808impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
809 type Item = Result<Page>;
810
811 fn next(&mut self) -> Option<Self::Item> {
812 self.get_next_page().transpose()
813 }
814}
815
816fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
817 if header_len > remaining_bytes {
818 return Err(eof_err!("Invalid page header"));
819 }
820 Ok(())
821}
822
823fn verify_page_size(
824 compressed_size: i32,
825 uncompressed_size: i32,
826 remaining_bytes: usize,
827) -> Result<()> {
828 if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
832 return Err(eof_err!("Invalid page header"));
833 }
834 Ok(())
835}
836
837impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
838 fn get_next_page(&mut self) -> Result<Option<Page>> {
839 loop {
840 let page = match &mut self.state {
841 SerializedPageReaderState::Values {
842 offset,
843 remaining_bytes: remaining,
844 next_page_header,
845 page_index,
846 require_dictionary,
847 } => {
848 if *remaining == 0 {
849 return Ok(None);
850 }
851
852 let mut read = self.reader.get_read(*offset as u64)?;
853 let header = if let Some(header) = next_page_header.take() {
854 *header
855 } else {
856 let (header_len, header) = Self::read_page_header_len(
857 &self.context,
858 &mut read,
859 *page_index,
860 *require_dictionary,
861 )?;
862 verify_page_header_len(header_len, *remaining)?;
863 *offset += header_len;
864 *remaining -= header_len;
865 header
866 };
867 verify_page_size(
868 header.compressed_page_size,
869 header.uncompressed_page_size,
870 *remaining,
871 )?;
872 let data_len = header.compressed_page_size as usize;
873 *offset += data_len;
874 *remaining -= data_len;
875
876 if header.type_ == PageType::INDEX_PAGE {
877 continue;
878 }
879
880 let mut buffer = Vec::with_capacity(data_len);
881 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
882
883 if read != data_len {
884 return Err(eof_err!(
885 "Expected to read {} bytes of page, read only {}",
886 data_len,
887 read
888 ));
889 }
890
891 let buffer =
892 self.context
893 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
894
895 let page = decode_page(
896 header,
897 Bytes::from(buffer),
898 self.physical_type,
899 self.decompressor.as_mut(),
900 )?;
901 if page.is_data_page() {
902 *page_index += 1;
903 } else if page.is_dictionary_page() {
904 *require_dictionary = false;
905 }
906 page
907 }
908 SerializedPageReaderState::Pages {
909 page_locations,
910 dictionary_page,
911 page_index,
912 ..
913 } => {
914 let (front, is_dictionary_page) = match dictionary_page.take() {
915 Some(front) => (front, true),
916 None => match page_locations.pop_front() {
917 Some(front) => (front, false),
918 None => return Ok(None),
919 },
920 };
921
922 let page_len = usize::try_from(front.compressed_page_size)?;
923 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
924
925 let (offset, header) = Self::read_page_header_len_from_bytes(
926 &self.context,
927 buffer.as_ref(),
928 *page_index,
929 is_dictionary_page,
930 )?;
931 let bytes = buffer.slice(offset..);
932 let bytes =
933 self.context
934 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
935
936 if !is_dictionary_page {
937 *page_index += 1;
938 }
939 decode_page(
940 header,
941 bytes,
942 self.physical_type,
943 self.decompressor.as_mut(),
944 )?
945 }
946 };
947
948 return Ok(Some(page));
949 }
950 }
951
952 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
953 match &mut self.state {
954 SerializedPageReaderState::Values {
955 offset,
956 remaining_bytes,
957 next_page_header,
958 page_index,
959 require_dictionary,
960 } => {
961 loop {
962 if *remaining_bytes == 0 {
963 return Ok(None);
964 }
965 return if let Some(header) = next_page_header.as_ref() {
966 if let Ok(page_meta) = (&**header).try_into() {
967 Ok(Some(page_meta))
968 } else {
969 *next_page_header = None;
971 continue;
972 }
973 } else {
974 let mut read = self.reader.get_read(*offset as u64)?;
975 let (header_len, header) = Self::read_page_header_len(
976 &self.context,
977 &mut read,
978 *page_index,
979 *require_dictionary,
980 )?;
981 verify_page_header_len(header_len, *remaining_bytes)?;
982 *offset += header_len;
983 *remaining_bytes -= header_len;
984 let page_meta = if let Ok(page_meta) = (&header).try_into() {
985 Ok(Some(page_meta))
986 } else {
987 continue;
989 };
990 *next_page_header = Some(Box::new(header));
991 page_meta
992 };
993 }
994 }
995 SerializedPageReaderState::Pages {
996 page_locations,
997 dictionary_page,
998 total_rows,
999 page_index: _,
1000 } => {
1001 if dictionary_page.is_some() {
1002 Ok(Some(PageMetadata {
1003 num_rows: None,
1004 num_levels: None,
1005 is_dict: true,
1006 }))
1007 } else if let Some(page) = page_locations.front() {
1008 let next_rows = page_locations
1009 .get(1)
1010 .map(|x| x.first_row_index as usize)
1011 .unwrap_or(*total_rows);
1012
1013 Ok(Some(PageMetadata {
1014 num_rows: Some(next_rows - page.first_row_index as usize),
1015 num_levels: None,
1016 is_dict: false,
1017 }))
1018 } else {
1019 Ok(None)
1020 }
1021 }
1022 }
1023 }
1024
1025 fn skip_next_page(&mut self) -> Result<()> {
1026 match &mut self.state {
1027 SerializedPageReaderState::Values {
1028 offset,
1029 remaining_bytes,
1030 next_page_header,
1031 page_index,
1032 require_dictionary,
1033 } => {
1034 if let Some(buffered_header) = next_page_header.take() {
1035 verify_page_size(
1036 buffered_header.compressed_page_size,
1037 buffered_header.uncompressed_page_size,
1038 *remaining_bytes,
1039 )?;
1040 *offset += buffered_header.compressed_page_size as usize;
1042 *remaining_bytes -= buffered_header.compressed_page_size as usize;
1043 } else {
1044 let mut read = self.reader.get_read(*offset as u64)?;
1045 let (header_len, header) = Self::read_page_header_len(
1046 &self.context,
1047 &mut read,
1048 *page_index,
1049 *require_dictionary,
1050 )?;
1051 verify_page_header_len(header_len, *remaining_bytes)?;
1052 verify_page_size(
1053 header.compressed_page_size,
1054 header.uncompressed_page_size,
1055 *remaining_bytes,
1056 )?;
1057 let data_page_size = header.compressed_page_size as usize;
1058 *offset += header_len + data_page_size;
1059 *remaining_bytes -= header_len + data_page_size;
1060 }
1061 if *require_dictionary {
1062 *require_dictionary = false;
1063 } else {
1064 *page_index += 1;
1065 }
1066 Ok(())
1067 }
1068 SerializedPageReaderState::Pages {
1069 page_locations,
1070 dictionary_page,
1071 page_index,
1072 ..
1073 } => {
1074 if dictionary_page.is_some() {
1075 dictionary_page.take();
1077 } else {
1078 if page_locations.pop_front().is_some() {
1080 *page_index += 1;
1081 }
1082 }
1083
1084 Ok(())
1085 }
1086 }
1087 }
1088
1089 fn at_record_boundary(&mut self) -> Result<bool> {
1090 match &mut self.state {
1091 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1092 SerializedPageReaderState::Pages { .. } => Ok(true),
1093 }
1094 }
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099 use std::collections::HashSet;
1100
1101 use bytes::Buf;
1102
1103 use crate::file::properties::{EnabledStatistics, WriterProperties};
1104 use crate::format::BoundaryOrder;
1105
1106 use crate::basic::{self, ColumnOrder, SortOrder};
1107 use crate::column::reader::ColumnReader;
1108 use crate::data_type::private::ParquetValueType;
1109 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1110 use crate::file::page_index::index::{Index, NativeIndex};
1111 #[allow(deprecated)]
1112 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1113 use crate::file::writer::SerializedFileWriter;
1114 use crate::record::RowAccessor;
1115 use crate::schema::parser::parse_message_type;
1116 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1117
1118 use super::*;
1119
1120 #[test]
1121 fn test_cursor_and_file_has_the_same_behaviour() {
1122 let mut buf: Vec<u8> = Vec::new();
1123 get_test_file("alltypes_plain.parquet")
1124 .read_to_end(&mut buf)
1125 .unwrap();
1126 let cursor = Bytes::from(buf);
1127 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1128
1129 let test_file = get_test_file("alltypes_plain.parquet");
1130 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1131
1132 let file_iter = read_from_file.get_row_iter(None).unwrap();
1133 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1134
1135 for (a, b) in file_iter.zip(cursor_iter) {
1136 assert_eq!(a.unwrap(), b.unwrap())
1137 }
1138 }
1139
1140 #[test]
1141 fn test_file_reader_try_from() {
1142 let test_file = get_test_file("alltypes_plain.parquet");
1144 let test_path_buf = get_test_path("alltypes_plain.parquet");
1145 let test_path = test_path_buf.as_path();
1146 let test_path_str = test_path.to_str().unwrap();
1147
1148 let reader = SerializedFileReader::try_from(test_file);
1149 assert!(reader.is_ok());
1150
1151 let reader = SerializedFileReader::try_from(test_path);
1152 assert!(reader.is_ok());
1153
1154 let reader = SerializedFileReader::try_from(test_path_str);
1155 assert!(reader.is_ok());
1156
1157 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1158 assert!(reader.is_ok());
1159
1160 let test_path = Path::new("invalid.parquet");
1162 let test_path_str = test_path.to_str().unwrap();
1163
1164 let reader = SerializedFileReader::try_from(test_path);
1165 assert!(reader.is_err());
1166
1167 let reader = SerializedFileReader::try_from(test_path_str);
1168 assert!(reader.is_err());
1169
1170 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1171 assert!(reader.is_err());
1172 }
1173
1174 #[test]
1175 fn test_file_reader_into_iter() {
1176 let path = get_test_path("alltypes_plain.parquet");
1177 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1178 let iter = reader.into_iter();
1179 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1180
1181 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1182 }
1183
1184 #[test]
1185 fn test_file_reader_into_iter_project() {
1186 let path = get_test_path("alltypes_plain.parquet");
1187 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1188 let schema = "message schema { OPTIONAL INT32 id; }";
1189 let proj = parse_message_type(schema).ok();
1190 let iter = reader.into_iter().project(proj).unwrap();
1191 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1192
1193 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1194 }
1195
1196 #[test]
1197 fn test_reuse_file_chunk() {
1198 let test_file = get_test_file("alltypes_plain.parquet");
1202 let reader = SerializedFileReader::new(test_file).unwrap();
1203 let row_group = reader.get_row_group(0).unwrap();
1204
1205 let mut page_readers = Vec::new();
1206 for i in 0..row_group.num_columns() {
1207 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1208 }
1209
1210 for mut page_reader in page_readers {
1213 assert!(page_reader.get_next_page().is_ok());
1214 }
1215 }
1216
1217 #[test]
1218 fn test_file_reader() {
1219 let test_file = get_test_file("alltypes_plain.parquet");
1220 let reader_result = SerializedFileReader::new(test_file);
1221 assert!(reader_result.is_ok());
1222 let reader = reader_result.unwrap();
1223
1224 let metadata = reader.metadata();
1226 assert_eq!(metadata.num_row_groups(), 1);
1227
1228 let file_metadata = metadata.file_metadata();
1230 assert!(file_metadata.created_by().is_some());
1231 assert_eq!(
1232 file_metadata.created_by().unwrap(),
1233 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1234 );
1235 assert!(file_metadata.key_value_metadata().is_none());
1236 assert_eq!(file_metadata.num_rows(), 8);
1237 assert_eq!(file_metadata.version(), 1);
1238 assert_eq!(file_metadata.column_orders(), None);
1239
1240 let row_group_metadata = metadata.row_group(0);
1242 assert_eq!(row_group_metadata.num_columns(), 11);
1243 assert_eq!(row_group_metadata.num_rows(), 8);
1244 assert_eq!(row_group_metadata.total_byte_size(), 671);
1245 for i in 0..row_group_metadata.num_columns() {
1247 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1248 }
1249
1250 let row_group_reader_result = reader.get_row_group(0);
1252 assert!(row_group_reader_result.is_ok());
1253 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1254 assert_eq!(
1255 row_group_reader.num_columns(),
1256 row_group_metadata.num_columns()
1257 );
1258 assert_eq!(
1259 row_group_reader.metadata().total_byte_size(),
1260 row_group_metadata.total_byte_size()
1261 );
1262
1263 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1266 assert!(page_reader_0_result.is_ok());
1267 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1268 let mut page_count = 0;
1269 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1270 let is_expected_page = match page {
1271 Page::DictionaryPage {
1272 buf,
1273 num_values,
1274 encoding,
1275 is_sorted,
1276 } => {
1277 assert_eq!(buf.len(), 32);
1278 assert_eq!(num_values, 8);
1279 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1280 assert!(!is_sorted);
1281 true
1282 }
1283 Page::DataPage {
1284 buf,
1285 num_values,
1286 encoding,
1287 def_level_encoding,
1288 rep_level_encoding,
1289 statistics,
1290 } => {
1291 assert_eq!(buf.len(), 11);
1292 assert_eq!(num_values, 8);
1293 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1294 assert_eq!(def_level_encoding, Encoding::RLE);
1295 #[allow(deprecated)]
1296 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1297 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1298 assert!(statistics.is_none());
1299 true
1300 }
1301 _ => false,
1302 };
1303 assert!(is_expected_page);
1304 page_count += 1;
1305 }
1306 assert_eq!(page_count, 2);
1307 }
1308
1309 #[test]
1310 fn test_file_reader_datapage_v2() {
1311 let test_file = get_test_file("datapage_v2.snappy.parquet");
1312 let reader_result = SerializedFileReader::new(test_file);
1313 assert!(reader_result.is_ok());
1314 let reader = reader_result.unwrap();
1315
1316 let metadata = reader.metadata();
1318 assert_eq!(metadata.num_row_groups(), 1);
1319
1320 let file_metadata = metadata.file_metadata();
1322 assert!(file_metadata.created_by().is_some());
1323 assert_eq!(
1324 file_metadata.created_by().unwrap(),
1325 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1326 );
1327 assert!(file_metadata.key_value_metadata().is_some());
1328 assert_eq!(
1329 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1330 1
1331 );
1332
1333 assert_eq!(file_metadata.num_rows(), 5);
1334 assert_eq!(file_metadata.version(), 1);
1335 assert_eq!(file_metadata.column_orders(), None);
1336
1337 let row_group_metadata = metadata.row_group(0);
1338
1339 for i in 0..row_group_metadata.num_columns() {
1341 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1342 }
1343
1344 let row_group_reader_result = reader.get_row_group(0);
1346 assert!(row_group_reader_result.is_ok());
1347 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1348 assert_eq!(
1349 row_group_reader.num_columns(),
1350 row_group_metadata.num_columns()
1351 );
1352 assert_eq!(
1353 row_group_reader.metadata().total_byte_size(),
1354 row_group_metadata.total_byte_size()
1355 );
1356
1357 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1360 assert!(page_reader_0_result.is_ok());
1361 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1362 let mut page_count = 0;
1363 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1364 let is_expected_page = match page {
1365 Page::DictionaryPage {
1366 buf,
1367 num_values,
1368 encoding,
1369 is_sorted,
1370 } => {
1371 assert_eq!(buf.len(), 7);
1372 assert_eq!(num_values, 1);
1373 assert_eq!(encoding, Encoding::PLAIN);
1374 assert!(!is_sorted);
1375 true
1376 }
1377 Page::DataPageV2 {
1378 buf,
1379 num_values,
1380 encoding,
1381 num_nulls,
1382 num_rows,
1383 def_levels_byte_len,
1384 rep_levels_byte_len,
1385 is_compressed,
1386 statistics,
1387 } => {
1388 assert_eq!(buf.len(), 4);
1389 assert_eq!(num_values, 5);
1390 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1391 assert_eq!(num_nulls, 1);
1392 assert_eq!(num_rows, 5);
1393 assert_eq!(def_levels_byte_len, 2);
1394 assert_eq!(rep_levels_byte_len, 0);
1395 assert!(is_compressed);
1396 assert!(statistics.is_some());
1397 true
1398 }
1399 _ => false,
1400 };
1401 assert!(is_expected_page);
1402 page_count += 1;
1403 }
1404 assert_eq!(page_count, 2);
1405 }
1406
1407 #[test]
1408 fn test_file_reader_empty_compressed_datapage_v2() {
1409 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1411 let reader_result = SerializedFileReader::new(test_file);
1412 assert!(reader_result.is_ok());
1413 let reader = reader_result.unwrap();
1414
1415 let metadata = reader.metadata();
1417 assert_eq!(metadata.num_row_groups(), 1);
1418
1419 let file_metadata = metadata.file_metadata();
1421 assert!(file_metadata.created_by().is_some());
1422 assert_eq!(
1423 file_metadata.created_by().unwrap(),
1424 "parquet-cpp-arrow version 14.0.2"
1425 );
1426 assert!(file_metadata.key_value_metadata().is_some());
1427 assert_eq!(
1428 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1429 1
1430 );
1431
1432 assert_eq!(file_metadata.num_rows(), 10);
1433 assert_eq!(file_metadata.version(), 2);
1434 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1435 assert_eq!(
1436 file_metadata.column_orders(),
1437 Some(vec![expected_order].as_ref())
1438 );
1439
1440 let row_group_metadata = metadata.row_group(0);
1441
1442 for i in 0..row_group_metadata.num_columns() {
1444 assert_eq!(file_metadata.column_order(i), expected_order);
1445 }
1446
1447 let row_group_reader_result = reader.get_row_group(0);
1449 assert!(row_group_reader_result.is_ok());
1450 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1451 assert_eq!(
1452 row_group_reader.num_columns(),
1453 row_group_metadata.num_columns()
1454 );
1455 assert_eq!(
1456 row_group_reader.metadata().total_byte_size(),
1457 row_group_metadata.total_byte_size()
1458 );
1459
1460 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1462 assert!(page_reader_0_result.is_ok());
1463 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1464 let mut page_count = 0;
1465 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1466 let is_expected_page = match page {
1467 Page::DictionaryPage {
1468 buf,
1469 num_values,
1470 encoding,
1471 is_sorted,
1472 } => {
1473 assert_eq!(buf.len(), 0);
1474 assert_eq!(num_values, 0);
1475 assert_eq!(encoding, Encoding::PLAIN);
1476 assert!(!is_sorted);
1477 true
1478 }
1479 Page::DataPageV2 {
1480 buf,
1481 num_values,
1482 encoding,
1483 num_nulls,
1484 num_rows,
1485 def_levels_byte_len,
1486 rep_levels_byte_len,
1487 is_compressed,
1488 statistics,
1489 } => {
1490 assert_eq!(buf.len(), 3);
1491 assert_eq!(num_values, 10);
1492 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1493 assert_eq!(num_nulls, 10);
1494 assert_eq!(num_rows, 10);
1495 assert_eq!(def_levels_byte_len, 2);
1496 assert_eq!(rep_levels_byte_len, 0);
1497 assert!(is_compressed);
1498 assert!(statistics.is_some());
1499 true
1500 }
1501 _ => false,
1502 };
1503 assert!(is_expected_page);
1504 page_count += 1;
1505 }
1506 assert_eq!(page_count, 2);
1507 }
1508
1509 #[test]
1510 fn test_file_reader_empty_datapage_v2() {
1511 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1513 let reader_result = SerializedFileReader::new(test_file);
1514 assert!(reader_result.is_ok());
1515 let reader = reader_result.unwrap();
1516
1517 let metadata = reader.metadata();
1519 assert_eq!(metadata.num_row_groups(), 1);
1520
1521 let file_metadata = metadata.file_metadata();
1523 assert!(file_metadata.created_by().is_some());
1524 assert_eq!(
1525 file_metadata.created_by().unwrap(),
1526 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1527 );
1528 assert!(file_metadata.key_value_metadata().is_some());
1529 assert_eq!(
1530 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1531 2
1532 );
1533
1534 assert_eq!(file_metadata.num_rows(), 1);
1535 assert_eq!(file_metadata.version(), 1);
1536 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1537 assert_eq!(
1538 file_metadata.column_orders(),
1539 Some(vec![expected_order].as_ref())
1540 );
1541
1542 let row_group_metadata = metadata.row_group(0);
1543
1544 for i in 0..row_group_metadata.num_columns() {
1546 assert_eq!(file_metadata.column_order(i), expected_order);
1547 }
1548
1549 let row_group_reader_result = reader.get_row_group(0);
1551 assert!(row_group_reader_result.is_ok());
1552 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1553 assert_eq!(
1554 row_group_reader.num_columns(),
1555 row_group_metadata.num_columns()
1556 );
1557 assert_eq!(
1558 row_group_reader.metadata().total_byte_size(),
1559 row_group_metadata.total_byte_size()
1560 );
1561
1562 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1564 assert!(page_reader_0_result.is_ok());
1565 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1566 let mut page_count = 0;
1567 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1568 let is_expected_page = match page {
1569 Page::DataPageV2 {
1570 buf,
1571 num_values,
1572 encoding,
1573 num_nulls,
1574 num_rows,
1575 def_levels_byte_len,
1576 rep_levels_byte_len,
1577 is_compressed,
1578 statistics,
1579 } => {
1580 assert_eq!(buf.len(), 2);
1581 assert_eq!(num_values, 1);
1582 assert_eq!(encoding, Encoding::PLAIN);
1583 assert_eq!(num_nulls, 1);
1584 assert_eq!(num_rows, 1);
1585 assert_eq!(def_levels_byte_len, 2);
1586 assert_eq!(rep_levels_byte_len, 0);
1587 assert!(is_compressed);
1588 assert!(statistics.is_none());
1589 true
1590 }
1591 _ => false,
1592 };
1593 assert!(is_expected_page);
1594 page_count += 1;
1595 }
1596 assert_eq!(page_count, 1);
1597 }
1598
1599 fn get_serialized_page_reader<R: ChunkReader>(
1600 file_reader: &SerializedFileReader<R>,
1601 row_group: usize,
1602 column: usize,
1603 ) -> Result<SerializedPageReader<R>> {
1604 let row_group = {
1605 let row_group_metadata = file_reader.metadata.row_group(row_group);
1606 let props = Arc::clone(&file_reader.props);
1607 let f = Arc::clone(&file_reader.chunk_reader);
1608 SerializedRowGroupReader::new(
1609 f,
1610 row_group_metadata,
1611 file_reader
1612 .metadata
1613 .offset_index()
1614 .map(|x| x[row_group].as_slice()),
1615 props,
1616 )?
1617 };
1618
1619 let col = row_group.metadata.column(column);
1620
1621 let page_locations = row_group
1622 .offset_index
1623 .map(|x| x[column].page_locations.clone());
1624
1625 let props = Arc::clone(&row_group.props);
1626 SerializedPageReader::new_with_properties(
1627 Arc::clone(&row_group.chunk_reader),
1628 col,
1629 usize::try_from(row_group.metadata.num_rows())?,
1630 page_locations,
1631 props,
1632 )
1633 }
1634
1635 #[test]
1636 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1637 let test_file = get_test_file("alltypes_plain.parquet");
1638 let reader = SerializedFileReader::new(test_file)?;
1639
1640 let mut offset_set = HashSet::new();
1641 let num_row_groups = reader.metadata.num_row_groups();
1642 for row_group in 0..num_row_groups {
1643 let num_columns = reader.metadata.row_group(row_group).num_columns();
1644 for column in 0..num_columns {
1645 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1646
1647 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1648 match &page_reader.state {
1649 SerializedPageReaderState::Pages {
1650 page_locations,
1651 dictionary_page,
1652 ..
1653 } => {
1654 if let Some(page) = dictionary_page {
1655 assert_eq!(page.offset as usize, page_offset);
1656 } else if let Some(page) = page_locations.front() {
1657 assert_eq!(page.offset as usize, page_offset);
1658 } else {
1659 unreachable!()
1660 }
1661 }
1662 SerializedPageReaderState::Values {
1663 offset,
1664 next_page_header,
1665 ..
1666 } => {
1667 assert!(next_page_header.is_some());
1668 assert_eq!(*offset, page_offset);
1669 }
1670 }
1671 let page = page_reader.get_next_page()?;
1672 assert!(page.is_some());
1673 let newly_inserted = offset_set.insert(page_offset);
1674 assert!(newly_inserted);
1675 }
1676 }
1677 }
1678
1679 Ok(())
1680 }
1681
1682 #[test]
1683 fn test_page_iterator() {
1684 let file = get_test_file("alltypes_plain.parquet");
1685 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1686
1687 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1688
1689 let page = page_iterator.next();
1691 assert!(page.is_some());
1692 assert!(page.unwrap().is_ok());
1693
1694 let page = page_iterator.next();
1696 assert!(page.is_none());
1697
1698 let row_group_indices = Box::new(0..1);
1699 let mut page_iterator =
1700 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1701
1702 let page = page_iterator.next();
1704 assert!(page.is_some());
1705 assert!(page.unwrap().is_ok());
1706
1707 let page = page_iterator.next();
1709 assert!(page.is_none());
1710 }
1711
1712 #[test]
1713 fn test_file_reader_key_value_metadata() {
1714 let file = get_test_file("binary.parquet");
1715 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1716
1717 let metadata = file_reader
1718 .metadata
1719 .file_metadata()
1720 .key_value_metadata()
1721 .unwrap();
1722
1723 assert_eq!(metadata.len(), 3);
1724
1725 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1726
1727 assert_eq!(metadata[1].key, "writer.model.name");
1728 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1729
1730 assert_eq!(metadata[2].key, "parquet.proto.class");
1731 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1732 }
1733
1734 #[test]
1735 fn test_file_reader_optional_metadata() {
1736 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1738 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1739
1740 let row_group_metadata = file_reader.metadata.row_group(0);
1741 let col0_metadata = row_group_metadata.column(0);
1742
1743 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1745
1746 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1748
1749 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1750 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1751 assert_eq!(page_encoding_stats.count, 1);
1752
1753 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1755 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1756
1757 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1759 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1760 }
1761
1762 #[test]
1763 fn test_file_reader_with_no_filter() -> Result<()> {
1764 let test_file = get_test_file("alltypes_plain.parquet");
1765 let origin_reader = SerializedFileReader::new(test_file)?;
1766 let metadata = origin_reader.metadata();
1768 assert_eq!(metadata.num_row_groups(), 1);
1769 Ok(())
1770 }
1771
1772 #[test]
1773 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1774 let test_file = get_test_file("alltypes_plain.parquet");
1775 let read_options = ReadOptionsBuilder::new()
1776 .with_predicate(Box::new(|_, _| false))
1777 .build();
1778 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1779 let metadata = reader.metadata();
1780 assert_eq!(metadata.num_row_groups(), 0);
1781 Ok(())
1782 }
1783
1784 #[test]
1785 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1786 let test_file = get_test_file("alltypes_plain.parquet");
1787 let origin_reader = SerializedFileReader::new(test_file)?;
1788 let metadata = origin_reader.metadata();
1790 assert_eq!(metadata.num_row_groups(), 1);
1791 let mid = get_midpoint_offset(metadata.row_group(0));
1792
1793 let test_file = get_test_file("alltypes_plain.parquet");
1794 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1795 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1796 let metadata = reader.metadata();
1797 assert_eq!(metadata.num_row_groups(), 1);
1798
1799 let test_file = get_test_file("alltypes_plain.parquet");
1800 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1801 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1802 let metadata = reader.metadata();
1803 assert_eq!(metadata.num_row_groups(), 0);
1804 Ok(())
1805 }
1806
1807 #[test]
1808 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1809 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1810 let origin_reader = SerializedFileReader::new(test_file)?;
1811 let metadata = origin_reader.metadata();
1812 let mid = get_midpoint_offset(metadata.row_group(0));
1813
1814 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1816 let read_options = ReadOptionsBuilder::new()
1817 .with_page_index()
1818 .with_predicate(Box::new(|_, _| true))
1819 .with_range(mid, mid + 1)
1820 .build();
1821 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1822 let metadata = reader.metadata();
1823 assert_eq!(metadata.num_row_groups(), 1);
1824 assert_eq!(metadata.column_index().unwrap().len(), 1);
1825 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1826
1827 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1829 let read_options = ReadOptionsBuilder::new()
1830 .with_page_index()
1831 .with_predicate(Box::new(|_, _| true))
1832 .with_range(0, mid)
1833 .build();
1834 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1835 let metadata = reader.metadata();
1836 assert_eq!(metadata.num_row_groups(), 0);
1837 assert!(metadata.column_index().is_none());
1838 assert!(metadata.offset_index().is_none());
1839
1840 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1842 let read_options = ReadOptionsBuilder::new()
1843 .with_page_index()
1844 .with_predicate(Box::new(|_, _| false))
1845 .with_range(mid, mid + 1)
1846 .build();
1847 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1848 let metadata = reader.metadata();
1849 assert_eq!(metadata.num_row_groups(), 0);
1850 assert!(metadata.column_index().is_none());
1851 assert!(metadata.offset_index().is_none());
1852
1853 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1855 let read_options = ReadOptionsBuilder::new()
1856 .with_page_index()
1857 .with_predicate(Box::new(|_, _| false))
1858 .with_range(0, mid)
1859 .build();
1860 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1861 let metadata = reader.metadata();
1862 assert_eq!(metadata.num_row_groups(), 0);
1863 assert!(metadata.column_index().is_none());
1864 assert!(metadata.offset_index().is_none());
1865 Ok(())
1866 }
1867
1868 #[test]
1869 fn test_file_reader_invalid_metadata() {
1870 let data = [
1871 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1872 80, 65, 82, 49,
1873 ];
1874 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1875 assert_eq!(
1876 ret.err().unwrap().to_string(),
1877 "Parquet error: Could not parse metadata: bad data"
1878 );
1879 }
1880
1881 #[test]
1882 fn test_page_index_reader() {
1899 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1900 let builder = ReadOptionsBuilder::new();
1901 let options = builder.with_page_index().build();
1903 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1904 let reader = reader_result.unwrap();
1905
1906 let metadata = reader.metadata();
1908 assert_eq!(metadata.num_row_groups(), 1);
1909
1910 let column_index = metadata.column_index().unwrap();
1911
1912 assert_eq!(column_index.len(), 1);
1914 let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1915 index
1916 } else {
1917 unreachable!()
1918 };
1919
1920 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1921 let index_in_pages = &index.indexes;
1922
1923 assert_eq!(index_in_pages.len(), 1);
1925
1926 let page0 = &index_in_pages[0];
1927 let min = page0.min.as_ref().unwrap();
1928 let max = page0.max.as_ref().unwrap();
1929 assert_eq!(b"Hello", min.as_bytes());
1930 assert_eq!(b"today", max.as_bytes());
1931
1932 let offset_indexes = metadata.offset_index().unwrap();
1933 assert_eq!(offset_indexes.len(), 1);
1935 let offset_index = &offset_indexes[0];
1936 let page_offset = &offset_index[0].page_locations()[0];
1937
1938 assert_eq!(4, page_offset.offset);
1939 assert_eq!(152, page_offset.compressed_page_size);
1940 assert_eq!(0, page_offset.first_row_index);
1941 }
1942
1943 #[test]
1944 #[allow(deprecated)]
1945 fn test_page_index_reader_out_of_order() {
1946 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1947 let options = ReadOptionsBuilder::new().with_page_index().build();
1948 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1949 let metadata = reader.metadata();
1950
1951 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1952 let columns = metadata.row_group(0).columns();
1953 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1954
1955 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1956 let mut b = read_columns_indexes(&test_file, &reversed)
1957 .unwrap()
1958 .unwrap();
1959 b.reverse();
1960 assert_eq!(a, b);
1961
1962 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1963 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1964 b.reverse();
1965 assert_eq!(a, b);
1966 }
1967
1968 #[test]
1969 fn test_page_index_reader_all_type() {
1970 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1971 let builder = ReadOptionsBuilder::new();
1972 let options = builder.with_page_index().build();
1974 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1975 let reader = reader_result.unwrap();
1976
1977 let metadata = reader.metadata();
1979 assert_eq!(metadata.num_row_groups(), 1);
1980
1981 let column_index = metadata.column_index().unwrap();
1982 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1983
1984 assert_eq!(column_index.len(), 1);
1986 let row_group_metadata = metadata.row_group(0);
1987
1988 assert!(!&column_index[0][0].is_sorted());
1990 let boundary_order = &column_index[0][0].get_boundary_order();
1991 assert!(boundary_order.is_some());
1992 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1993 if let Index::INT32(index) = &column_index[0][0] {
1994 check_native_page_index(
1995 index,
1996 325,
1997 get_row_group_min_max_bytes(row_group_metadata, 0),
1998 BoundaryOrder::UNORDERED,
1999 );
2000 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2001 } else {
2002 unreachable!()
2003 };
2004 assert!(&column_index[0][1].is_sorted());
2006 if let Index::BOOLEAN(index) = &column_index[0][1] {
2007 assert_eq!(index.indexes.len(), 82);
2008 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2009 } else {
2010 unreachable!()
2011 };
2012 assert!(&column_index[0][2].is_sorted());
2014 if let Index::INT32(index) = &column_index[0][2] {
2015 check_native_page_index(
2016 index,
2017 325,
2018 get_row_group_min_max_bytes(row_group_metadata, 2),
2019 BoundaryOrder::ASCENDING,
2020 );
2021 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2022 } else {
2023 unreachable!()
2024 };
2025 assert!(&column_index[0][3].is_sorted());
2027 if let Index::INT32(index) = &column_index[0][3] {
2028 check_native_page_index(
2029 index,
2030 325,
2031 get_row_group_min_max_bytes(row_group_metadata, 3),
2032 BoundaryOrder::ASCENDING,
2033 );
2034 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2035 } else {
2036 unreachable!()
2037 };
2038 assert!(&column_index[0][4].is_sorted());
2040 if let Index::INT32(index) = &column_index[0][4] {
2041 check_native_page_index(
2042 index,
2043 325,
2044 get_row_group_min_max_bytes(row_group_metadata, 4),
2045 BoundaryOrder::ASCENDING,
2046 );
2047 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2048 } else {
2049 unreachable!()
2050 };
2051 assert!(!&column_index[0][5].is_sorted());
2053 if let Index::INT64(index) = &column_index[0][5] {
2054 check_native_page_index(
2055 index,
2056 528,
2057 get_row_group_min_max_bytes(row_group_metadata, 5),
2058 BoundaryOrder::UNORDERED,
2059 );
2060 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2061 } else {
2062 unreachable!()
2063 };
2064 assert!(&column_index[0][6].is_sorted());
2066 if let Index::FLOAT(index) = &column_index[0][6] {
2067 check_native_page_index(
2068 index,
2069 325,
2070 get_row_group_min_max_bytes(row_group_metadata, 6),
2071 BoundaryOrder::ASCENDING,
2072 );
2073 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2074 } else {
2075 unreachable!()
2076 };
2077 assert!(!&column_index[0][7].is_sorted());
2079 if let Index::DOUBLE(index) = &column_index[0][7] {
2080 check_native_page_index(
2081 index,
2082 528,
2083 get_row_group_min_max_bytes(row_group_metadata, 7),
2084 BoundaryOrder::UNORDERED,
2085 );
2086 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2087 } else {
2088 unreachable!()
2089 };
2090 assert!(!&column_index[0][8].is_sorted());
2092 if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2093 check_native_page_index(
2094 index,
2095 974,
2096 get_row_group_min_max_bytes(row_group_metadata, 8),
2097 BoundaryOrder::UNORDERED,
2098 );
2099 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2100 } else {
2101 unreachable!()
2102 };
2103 assert!(&column_index[0][9].is_sorted());
2105 if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2106 check_native_page_index(
2107 index,
2108 352,
2109 get_row_group_min_max_bytes(row_group_metadata, 9),
2110 BoundaryOrder::ASCENDING,
2111 );
2112 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2113 } else {
2114 unreachable!()
2115 };
2116 assert!(!&column_index[0][10].is_sorted());
2119 if let Index::NONE = &column_index[0][10] {
2120 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2121 } else {
2122 unreachable!()
2123 };
2124 assert!(&column_index[0][11].is_sorted());
2126 if let Index::INT32(index) = &column_index[0][11] {
2127 check_native_page_index(
2128 index,
2129 325,
2130 get_row_group_min_max_bytes(row_group_metadata, 11),
2131 BoundaryOrder::ASCENDING,
2132 );
2133 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2134 } else {
2135 unreachable!()
2136 };
2137 assert!(!&column_index[0][12].is_sorted());
2139 if let Index::INT32(index) = &column_index[0][12] {
2140 check_native_page_index(
2141 index,
2142 325,
2143 get_row_group_min_max_bytes(row_group_metadata, 12),
2144 BoundaryOrder::UNORDERED,
2145 );
2146 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2147 } else {
2148 unreachable!()
2149 };
2150 }
2151
2152 fn check_native_page_index<T: ParquetValueType>(
2153 row_group_index: &NativeIndex<T>,
2154 page_size: usize,
2155 min_max: (&[u8], &[u8]),
2156 boundary_order: BoundaryOrder,
2157 ) {
2158 assert_eq!(row_group_index.indexes.len(), page_size);
2159 assert_eq!(row_group_index.boundary_order, boundary_order);
2160 row_group_index.indexes.iter().all(|x| {
2161 x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2162 && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2163 });
2164 }
2165
2166 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2167 let statistics = r.column(col_num).statistics().unwrap();
2168 (
2169 statistics.min_bytes_opt().unwrap_or_default(),
2170 statistics.max_bytes_opt().unwrap_or_default(),
2171 )
2172 }
2173
2174 #[test]
2175 fn test_skip_next_page_with_dictionary_page() {
2176 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2177 let builder = ReadOptionsBuilder::new();
2178 let options = builder.with_page_index().build();
2180 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2181 let reader = reader_result.unwrap();
2182
2183 let row_group_reader = reader.get_row_group(0).unwrap();
2184
2185 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2187
2188 let mut vec = vec![];
2189
2190 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2192 assert!(meta.is_dict);
2193
2194 column_page_reader.skip_next_page().unwrap();
2196
2197 let page = column_page_reader.get_next_page().unwrap().unwrap();
2199 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2200
2201 for _i in 0..351 {
2203 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2205 assert!(!meta.is_dict); vec.push(meta);
2207
2208 let page = column_page_reader.get_next_page().unwrap().unwrap();
2209 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2210 }
2211
2212 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2214 assert!(column_page_reader.get_next_page().unwrap().is_none());
2215
2216 assert_eq!(vec.len(), 351);
2218 }
2219
2220 #[test]
2221 fn test_skip_page_with_offset_index() {
2222 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2223 let builder = ReadOptionsBuilder::new();
2224 let options = builder.with_page_index().build();
2226 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2227 let reader = reader_result.unwrap();
2228
2229 let row_group_reader = reader.get_row_group(0).unwrap();
2230
2231 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2233
2234 let mut vec = vec![];
2235
2236 for i in 0..325 {
2237 if i % 2 == 0 {
2238 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2239 } else {
2240 column_page_reader.skip_next_page().unwrap();
2241 }
2242 }
2243 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2245 assert!(column_page_reader.get_next_page().unwrap().is_none());
2246
2247 assert_eq!(vec.len(), 163);
2248 }
2249
2250 #[test]
2251 fn test_skip_page_without_offset_index() {
2252 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2253
2254 let reader_result = SerializedFileReader::new(test_file);
2256 let reader = reader_result.unwrap();
2257
2258 let row_group_reader = reader.get_row_group(0).unwrap();
2259
2260 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2262
2263 let mut vec = vec![];
2264
2265 for i in 0..325 {
2266 if i % 2 == 0 {
2267 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2268 } else {
2269 column_page_reader.peek_next_page().unwrap().unwrap();
2270 column_page_reader.skip_next_page().unwrap();
2271 }
2272 }
2273 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2275 assert!(column_page_reader.get_next_page().unwrap().is_none());
2276
2277 assert_eq!(vec.len(), 163);
2278 }
2279
2280 #[test]
2281 fn test_peek_page_with_dictionary_page() {
2282 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2283 let builder = ReadOptionsBuilder::new();
2284 let options = builder.with_page_index().build();
2286 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2287 let reader = reader_result.unwrap();
2288 let row_group_reader = reader.get_row_group(0).unwrap();
2289
2290 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2292
2293 let mut vec = vec![];
2294
2295 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2296 assert!(meta.is_dict);
2297 let page = column_page_reader.get_next_page().unwrap().unwrap();
2298 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2299
2300 for i in 0..352 {
2301 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2302 if i != 351 {
2305 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2306 } else {
2307 assert_eq!(meta.num_rows, Some(10));
2310 }
2311 assert!(!meta.is_dict);
2312 vec.push(meta);
2313 let page = column_page_reader.get_next_page().unwrap().unwrap();
2314 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2315 }
2316
2317 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2319 assert!(column_page_reader.get_next_page().unwrap().is_none());
2320
2321 assert_eq!(vec.len(), 352);
2322 }
2323
2324 #[test]
2325 fn test_peek_page_with_dictionary_page_without_offset_index() {
2326 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2327
2328 let reader_result = SerializedFileReader::new(test_file);
2329 let reader = reader_result.unwrap();
2330 let row_group_reader = reader.get_row_group(0).unwrap();
2331
2332 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2334
2335 let mut vec = vec![];
2336
2337 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2338 assert!(meta.is_dict);
2339 let page = column_page_reader.get_next_page().unwrap().unwrap();
2340 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2341
2342 for i in 0..352 {
2343 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2344 if i != 351 {
2347 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2348 } else {
2349 assert_eq!(meta.num_levels, Some(10));
2352 }
2353 assert!(!meta.is_dict);
2354 vec.push(meta);
2355 let page = column_page_reader.get_next_page().unwrap().unwrap();
2356 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2357 }
2358
2359 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2361 assert!(column_page_reader.get_next_page().unwrap().is_none());
2362
2363 assert_eq!(vec.len(), 352);
2364 }
2365
2366 #[test]
2367 fn test_fixed_length_index() {
2368 let message_type = "
2369 message test_schema {
2370 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2371 }
2372 ";
2373
2374 let schema = parse_message_type(message_type).unwrap();
2375 let mut out = Vec::with_capacity(1024);
2376 let mut writer =
2377 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2378
2379 let mut r = writer.next_row_group().unwrap();
2380 let mut c = r.next_column().unwrap().unwrap();
2381 c.typed::<FixedLenByteArrayType>()
2382 .write_batch(
2383 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2384 Some(&[1, 1, 0, 1]),
2385 None,
2386 )
2387 .unwrap();
2388 c.close().unwrap();
2389 r.close().unwrap();
2390 writer.close().unwrap();
2391
2392 let b = Bytes::from(out);
2393 let options = ReadOptionsBuilder::new().with_page_index().build();
2394 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2395 let index = reader.metadata().column_index().unwrap();
2396
2397 assert_eq!(index.len(), 1);
2399 let c = &index[0];
2400 assert_eq!(c.len(), 1);
2402
2403 match &c[0] {
2404 Index::FIXED_LEN_BYTE_ARRAY(v) => {
2405 assert_eq!(v.indexes.len(), 1);
2406 let page_idx = &v.indexes[0];
2407 assert_eq!(page_idx.null_count.unwrap(), 1);
2408 assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2409 assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2410 }
2411 _ => unreachable!(),
2412 }
2413 }
2414
2415 #[test]
2416 fn test_multi_gz() {
2417 let file = get_test_file("concatenated_gzip_members.parquet");
2418 let reader = SerializedFileReader::new(file).unwrap();
2419 let row_group_reader = reader.get_row_group(0).unwrap();
2420 match row_group_reader.get_column_reader(0).unwrap() {
2421 ColumnReader::Int64ColumnReader(mut reader) => {
2422 let mut buffer = Vec::with_capacity(1024);
2423 let mut def_levels = Vec::with_capacity(1024);
2424 let (num_records, num_values, num_levels) = reader
2425 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2426 .unwrap();
2427
2428 assert_eq!(num_records, 513);
2429 assert_eq!(num_values, 513);
2430 assert_eq!(num_levels, 513);
2431
2432 let expected: Vec<i64> = (1..514).collect();
2433 assert_eq!(&buffer, &expected);
2434 }
2435 _ => unreachable!(),
2436 }
2437 }
2438
2439 #[test]
2440 fn test_byte_stream_split_extended() {
2441 let path = format!(
2442 "{}/byte_stream_split_extended.gzip.parquet",
2443 arrow::util::test_util::parquet_test_data(),
2444 );
2445 let file = File::open(path).unwrap();
2446 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2447
2448 let mut iter = reader
2450 .get_row_iter(None)
2451 .expect("Failed to create row iterator");
2452
2453 let mut start = 0;
2454 let end = reader.metadata().file_metadata().num_rows();
2455
2456 let check_row = |row: Result<Row, ParquetError>| {
2457 assert!(row.is_ok());
2458 let r = row.unwrap();
2459 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2460 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2461 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2462 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2463 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2464 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2465 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2466 };
2467
2468 while start < end {
2469 match iter.next() {
2470 Some(row) => check_row(row),
2471 None => break,
2472 };
2473 start += 1;
2474 }
2475 }
2476
2477 #[test]
2478 fn test_filtered_rowgroup_metadata() {
2479 let message_type = "
2480 message test_schema {
2481 REQUIRED INT32 a;
2482 }
2483 ";
2484 let schema = Arc::new(parse_message_type(message_type).unwrap());
2485 let props = Arc::new(
2486 WriterProperties::builder()
2487 .set_statistics_enabled(EnabledStatistics::Page)
2488 .build(),
2489 );
2490 let mut file: File = tempfile::tempfile().unwrap();
2491 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2492 let data = [1, 2, 3, 4, 5];
2493
2494 for idx in 0..5 {
2496 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2497 let mut row_group_writer = file_writer.next_row_group().unwrap();
2498 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2499 writer
2500 .typed::<Int32Type>()
2501 .write_batch(data_i.as_slice(), None, None)
2502 .unwrap();
2503 writer.close().unwrap();
2504 }
2505 row_group_writer.close().unwrap();
2506 file_writer.flushed_row_groups();
2507 }
2508 let file_metadata = file_writer.close().unwrap();
2509
2510 assert_eq!(file_metadata.num_rows, 25);
2511 assert_eq!(file_metadata.row_groups.len(), 5);
2512
2513 let read_options = ReadOptionsBuilder::new()
2515 .with_page_index()
2516 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2517 .build();
2518 let reader =
2519 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2520 .unwrap();
2521 let metadata = reader.metadata();
2522
2523 assert_eq!(metadata.num_row_groups(), 1);
2525 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2526
2527 assert!(metadata.column_index().is_some());
2529 assert!(metadata.offset_index().is_some());
2530 assert_eq!(metadata.column_index().unwrap().len(), 1);
2531 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2532 let col_idx = metadata.column_index().unwrap();
2533 let off_idx = metadata.offset_index().unwrap();
2534 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2535 let pg_idx = &col_idx[0][0];
2536 let off_idx_i = &off_idx[0][0];
2537
2538 match pg_idx {
2540 Index::INT32(int_idx) => {
2541 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2542 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2543 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2544 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2545 }
2546 _ => panic!("wrong stats type"),
2547 }
2548
2549 assert_eq!(
2551 off_idx_i.page_locations[0].offset,
2552 metadata.row_group(0).column(0).data_page_offset()
2553 );
2554
2555 let read_options = ReadOptionsBuilder::new()
2557 .with_page_index()
2558 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2559 .build();
2560 let reader =
2561 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2562 .unwrap();
2563 let metadata = reader.metadata();
2564
2565 assert_eq!(metadata.num_row_groups(), 2);
2567 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2568 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2569
2570 assert!(metadata.column_index().is_some());
2572 assert!(metadata.offset_index().is_some());
2573 assert_eq!(metadata.column_index().unwrap().len(), 2);
2574 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2575 let col_idx = metadata.column_index().unwrap();
2576 let off_idx = metadata.offset_index().unwrap();
2577
2578 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2579 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2580 let pg_idx = &col_idx_i[0];
2581 let off_idx_i = &off_idx[i][0];
2582
2583 match pg_idx {
2585 Index::INT32(int_idx) => {
2586 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2587 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2588 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2589 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2590 }
2591 _ => panic!("wrong stats type"),
2592 }
2593
2594 assert_eq!(
2596 off_idx_i.page_locations[0].offset,
2597 metadata.row_group(i).column(0).data_page_offset()
2598 );
2599 }
2600 }
2601}