1use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30 metadata::*,
31 properties::{ReaderProperties, ReaderPropertiesPtr},
32 reader::*,
33 statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40use bytes::Bytes;
41use std::collections::VecDeque;
42use std::iter;
43use std::{fs::File, io::Read, path::Path, sync::Arc};
44use thrift::protocol::TCompactInputProtocol;
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113}
114
115impl ReadOptionsBuilder {
116 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
124 self.predicates.push(predicate);
125 self
126 }
127
128 pub fn with_range(mut self, start: i64, end: i64) -> Self {
131 assert!(start < end);
132 let predicate = move |rg: &RowGroupMetaData, _: usize| {
133 let mid = get_midpoint_offset(rg);
134 mid >= start && mid < end
135 };
136 self.predicates.push(Box::new(predicate));
137 self
138 }
139
140 pub fn with_page_index(mut self) -> Self {
145 self.enable_page_index = true;
146 self
147 }
148
149 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
151 self.props = Some(properties);
152 self
153 }
154
155 pub fn build(self) -> ReadOptions {
157 let props = self
158 .props
159 .unwrap_or_else(|| ReaderProperties::builder().build());
160 ReadOptions {
161 predicates: self.predicates,
162 enable_page_index: self.enable_page_index,
163 props,
164 }
165 }
166}
167
168pub struct ReadOptions {
173 predicates: Vec<ReadGroupPredicate>,
174 enable_page_index: bool,
175 props: ReaderProperties,
176}
177
178impl<R: 'static + ChunkReader> SerializedFileReader<R> {
179 pub fn new(chunk_reader: R) -> Result<Self> {
182 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
183 let props = Arc::new(ReaderProperties::builder().build());
184 Ok(Self {
185 chunk_reader: Arc::new(chunk_reader),
186 metadata: Arc::new(metadata),
187 props,
188 })
189 }
190
191 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
194 let mut metadata_builder = ParquetMetaDataReader::new()
195 .parse_and_finish(&chunk_reader)?
196 .into_builder();
197 let mut predicates = options.predicates;
198
199 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
201 let mut keep = true;
202 for predicate in &mut predicates {
203 if !predicate(&rg_meta, i) {
204 keep = false;
205 break;
206 }
207 }
208 if keep {
209 metadata_builder = metadata_builder.add_row_group(rg_meta);
210 }
211 }
212
213 let mut metadata = metadata_builder.build();
214
215 if options.enable_page_index {
217 let mut reader =
218 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
219 reader.read_page_indexes(&chunk_reader)?;
220 metadata = reader.finish()?;
221 }
222
223 Ok(Self {
224 chunk_reader: Arc::new(chunk_reader),
225 metadata: Arc::new(metadata),
226 props: Arc::new(options.props),
227 })
228 }
229}
230
231fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
233 let col = meta.column(0);
234 let mut offset = col.data_page_offset();
235 if let Some(dic_offset) = col.dictionary_page_offset() {
236 if offset > dic_offset {
237 offset = dic_offset
238 }
239 };
240 offset + meta.compressed_size() / 2
241}
242
243impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
244 fn metadata(&self) -> &ParquetMetaData {
245 &self.metadata
246 }
247
248 fn num_row_groups(&self) -> usize {
249 self.metadata.num_row_groups()
250 }
251
252 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
253 let row_group_metadata = self.metadata.row_group(i);
254 let props = Arc::clone(&self.props);
256 let f = Arc::clone(&self.chunk_reader);
257 Ok(Box::new(SerializedRowGroupReader::new(
258 f,
259 row_group_metadata,
260 self.metadata.offset_index().map(|x| x[i].as_slice()),
261 props,
262 )?))
263 }
264
265 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
266 RowIter::from_file(projection, self)
267 }
268}
269
270pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
272 chunk_reader: Arc<R>,
273 metadata: &'a RowGroupMetaData,
274 offset_index: Option<&'a [OffsetIndexMetaData]>,
275 props: ReaderPropertiesPtr,
276 bloom_filters: Vec<Option<Sbbf>>,
277}
278
279impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
280 pub fn new(
282 chunk_reader: Arc<R>,
283 metadata: &'a RowGroupMetaData,
284 offset_index: Option<&'a [OffsetIndexMetaData]>,
285 props: ReaderPropertiesPtr,
286 ) -> Result<Self> {
287 let bloom_filters = if props.read_bloom_filter() {
288 metadata
289 .columns()
290 .iter()
291 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
292 .collect::<Result<Vec<_>>>()?
293 } else {
294 iter::repeat(None).take(metadata.columns().len()).collect()
295 };
296 Ok(Self {
297 chunk_reader,
298 metadata,
299 offset_index,
300 props,
301 bloom_filters,
302 })
303 }
304}
305
306impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
307 fn metadata(&self) -> &RowGroupMetaData {
308 self.metadata
309 }
310
311 fn num_columns(&self) -> usize {
312 self.metadata.num_columns()
313 }
314
315 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
317 let col = self.metadata.column(i);
318
319 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
320
321 let props = Arc::clone(&self.props);
322 Ok(Box::new(SerializedPageReader::new_with_properties(
323 Arc::clone(&self.chunk_reader),
324 col,
325 usize::try_from(self.metadata.num_rows())?,
326 page_locations,
327 props,
328 )?))
329 }
330
331 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
333 self.bloom_filters[i].as_ref()
334 }
335
336 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
337 RowIter::from_row_group(projection, self)
338 }
339}
340
341pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
343 let mut prot = TCompactInputProtocol::new(input);
344 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
345}
346
347#[cfg(feature = "encryption")]
348pub(crate) fn read_encrypted_page_header<T: Read>(
349 input: &mut T,
350 crypto_context: Arc<CryptoContext>,
351) -> Result<PageHeader> {
352 let data_decryptor = crypto_context.data_decryptor();
353 let aad = crypto_context.create_page_header_aad()?;
354
355 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
356 ParquetError::General(format!(
357 "Error decrypting column {}, decryptor may be wrong or missing",
358 crypto_context.column_ordinal
359 ))
360 })?;
361
362 let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
363 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
364}
365
366#[cfg(feature = "encryption")]
369fn read_encrypted_page_header_len<T: Read>(
370 input: &mut T,
371 crypto_context: Option<Arc<CryptoContext>>,
372) -> Result<(usize, PageHeader)> {
373 struct TrackedRead<R> {
375 inner: R,
376 bytes_read: usize,
377 }
378
379 impl<R: Read> Read for TrackedRead<R> {
380 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
381 let v = self.inner.read(buf)?;
382 self.bytes_read += v;
383 Ok(v)
384 }
385 }
386
387 let mut tracked = TrackedRead {
388 inner: input,
389 bytes_read: 0,
390 };
391 let header = read_encrypted_page_header(&mut tracked, crypto_context.unwrap())?;
392 Ok((tracked.bytes_read, header))
393}
394
395fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
397 struct TrackedRead<R> {
399 inner: R,
400 bytes_read: usize,
401 }
402
403 impl<R: Read> Read for TrackedRead<R> {
404 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
405 let v = self.inner.read(buf)?;
406 self.bytes_read += v;
407 Ok(v)
408 }
409 }
410
411 let mut tracked = TrackedRead {
412 inner: input,
413 bytes_read: 0,
414 };
415 let header = read_page_header(&mut tracked)?;
416 Ok((tracked.bytes_read, header))
417}
418
419pub(crate) fn decode_page(
421 page_header: PageHeader,
422 buffer: Bytes,
423 physical_type: Type,
424 decompressor: Option<&mut Box<dyn Codec>>,
425) -> Result<Page> {
426 #[cfg(feature = "crc")]
428 if let Some(expected_crc) = page_header.crc {
429 let crc = crc32fast::hash(&buffer);
430 if crc != expected_crc as u32 {
431 return Err(general_err!("Page CRC checksum mismatch"));
432 }
433 }
434
435 let mut offset: usize = 0;
442 let mut can_decompress = true;
443
444 if let Some(ref header_v2) = page_header.data_page_header_v2 {
445 if header_v2.definition_levels_byte_length < 0
446 || header_v2.repetition_levels_byte_length < 0
447 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
448 > page_header.uncompressed_page_size
449 {
450 return Err(general_err!(
451 "DataPage v2 header contains implausible values \
452 for definition_levels_byte_length ({}) \
453 and repetition_levels_byte_length ({}) \
454 given DataPage header provides uncompressed_page_size ({})",
455 header_v2.definition_levels_byte_length,
456 header_v2.repetition_levels_byte_length,
457 page_header.uncompressed_page_size
458 ));
459 }
460 offset = usize::try_from(
461 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
462 )?;
463 can_decompress = header_v2.is_compressed.unwrap_or(true);
465 }
466
467 let buffer = match decompressor {
470 Some(decompressor) if can_decompress => {
471 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
472 let decompressed_size = uncompressed_page_size - offset;
473 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
474 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
475 if decompressed_size > 0 {
476 let compressed = &buffer.as_ref()[offset..];
477 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
478 }
479
480 if decompressed.len() != uncompressed_page_size {
481 return Err(general_err!(
482 "Actual decompressed size doesn't match the expected one ({} vs {})",
483 decompressed.len(),
484 uncompressed_page_size
485 ));
486 }
487
488 Bytes::from(decompressed)
489 }
490 _ => buffer,
491 };
492
493 let result = match page_header.type_ {
494 PageType::DICTIONARY_PAGE => {
495 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
496 ParquetError::General("Missing dictionary page header".to_string())
497 })?;
498 let is_sorted = dict_header.is_sorted.unwrap_or(false);
499 Page::DictionaryPage {
500 buf: buffer,
501 num_values: dict_header.num_values.try_into()?,
502 encoding: Encoding::try_from(dict_header.encoding)?,
503 is_sorted,
504 }
505 }
506 PageType::DATA_PAGE => {
507 let header = page_header
508 .data_page_header
509 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
510 Page::DataPage {
511 buf: buffer,
512 num_values: header.num_values.try_into()?,
513 encoding: Encoding::try_from(header.encoding)?,
514 def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
515 rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
516 statistics: statistics::from_thrift(physical_type, header.statistics)?,
517 }
518 }
519 PageType::DATA_PAGE_V2 => {
520 let header = page_header
521 .data_page_header_v2
522 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
523 let is_compressed = header.is_compressed.unwrap_or(true);
524 Page::DataPageV2 {
525 buf: buffer,
526 num_values: header.num_values.try_into()?,
527 encoding: Encoding::try_from(header.encoding)?,
528 num_nulls: header.num_nulls.try_into()?,
529 num_rows: header.num_rows.try_into()?,
530 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
531 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
532 is_compressed,
533 statistics: statistics::from_thrift(physical_type, header.statistics)?,
534 }
535 }
536 _ => {
537 unimplemented!("Page type {:?} is not supported", page_header.type_)
539 }
540 };
541
542 Ok(result)
543}
544
545enum SerializedPageReaderState {
546 Values {
547 offset: usize,
549
550 remaining_bytes: usize,
552
553 next_page_header: Option<Box<PageHeader>>,
555
556 page_ordinal: usize,
558
559 require_dictionary: bool,
561 },
562 Pages {
563 page_locations: VecDeque<PageLocation>,
565 dictionary_page: Option<PageLocation>,
567 total_rows: usize,
569 },
570}
571
572pub struct SerializedPageReader<R: ChunkReader> {
574 reader: Arc<R>,
576
577 decompressor: Option<Box<dyn Codec>>,
579
580 physical_type: Type,
582
583 state: SerializedPageReaderState,
584
585 #[cfg(feature = "encryption")]
587 crypto_context: Option<Arc<CryptoContext>>,
588}
589
590impl<R: ChunkReader> SerializedPageReader<R> {
591 pub fn new(
593 reader: Arc<R>,
594 column_chunk_metadata: &ColumnChunkMetaData,
595 total_rows: usize,
596 page_locations: Option<Vec<PageLocation>>,
597 ) -> Result<Self> {
598 let props = Arc::new(ReaderProperties::builder().build());
599 SerializedPageReader::new_with_properties(
600 reader,
601 column_chunk_metadata,
602 total_rows,
603 page_locations,
604 props,
605 )
606 }
607
608 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
610 pub(crate) fn add_crypto_context(
611 self,
612 _rg_idx: usize,
613 _column_idx: usize,
614 _parquet_meta_data: &ParquetMetaData,
615 _column_chunk_metadata: &ColumnChunkMetaData,
616 ) -> Result<SerializedPageReader<R>> {
617 Ok(self)
618 }
619
620 #[cfg(feature = "encryption")]
622 pub(crate) fn add_crypto_context(
623 mut self,
624 rg_idx: usize,
625 column_idx: usize,
626 parquet_meta_data: &ParquetMetaData,
627 column_chunk_metadata: &ColumnChunkMetaData,
628 ) -> Result<SerializedPageReader<R>> {
629 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
630 return Ok(self);
631 };
632 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
633 return Ok(self);
634 };
635 let crypto_context =
636 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
637 self.crypto_context = Some(Arc::new(crypto_context));
638 Ok(self)
639 }
640
641 pub fn new_with_properties(
643 reader: Arc<R>,
644 meta: &ColumnChunkMetaData,
645 total_rows: usize,
646 page_locations: Option<Vec<PageLocation>>,
647 props: ReaderPropertiesPtr,
648 ) -> Result<Self> {
649 let decompressor = create_codec(meta.compression(), props.codec_options())?;
650 let (start, len) = meta.byte_range();
651
652 let state = match page_locations {
653 Some(locations) => {
654 let dictionary_page = match locations.first() {
655 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
656 offset: start as i64,
657 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
658 first_row_index: 0,
659 }),
660 _ => None,
661 };
662
663 SerializedPageReaderState::Pages {
664 page_locations: locations.into(),
665 dictionary_page,
666 total_rows,
667 }
668 }
669 None => SerializedPageReaderState::Values {
670 offset: usize::try_from(start)?,
671 remaining_bytes: usize::try_from(len)?,
672 next_page_header: None,
673 page_ordinal: 0,
674 require_dictionary: meta.dictionary_page_offset().is_some(),
675 },
676 };
677 Ok(Self {
678 reader,
679 decompressor,
680 state,
681 physical_type: meta.column_type(),
682 #[cfg(feature = "encryption")]
683 crypto_context: None,
684 })
685 }
686
687 #[cfg(test)]
693 fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
694 match &mut self.state {
695 SerializedPageReaderState::Values {
696 offset,
697 remaining_bytes,
698 next_page_header,
699 ..
700 } => {
701 loop {
702 if *remaining_bytes == 0 {
703 return Ok(None);
704 }
705 return if let Some(header) = next_page_header.as_ref() {
706 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
707 Ok(Some(*offset))
708 } else {
709 *next_page_header = None;
711 continue;
712 }
713 } else {
714 let mut read = self.reader.get_read(*offset as u64)?;
715 let (header_len, header) = read_page_header_len(&mut read)?;
716 *offset += header_len;
717 *remaining_bytes -= header_len;
718 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
719 Ok(Some(*offset))
720 } else {
721 continue;
723 };
724 *next_page_header = Some(Box::new(header));
725 page_meta
726 };
727 }
728 }
729 SerializedPageReaderState::Pages {
730 page_locations,
731 dictionary_page,
732 ..
733 } => {
734 if let Some(page) = dictionary_page {
735 Ok(Some(usize::try_from(page.offset)?))
736 } else if let Some(page) = page_locations.front() {
737 Ok(Some(usize::try_from(page.offset)?))
738 } else {
739 Ok(None)
740 }
741 }
742 }
743 }
744}
745
746impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
747 type Item = Result<Page>;
748
749 fn next(&mut self) -> Option<Self::Item> {
750 self.get_next_page().transpose()
751 }
752}
753
754fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
755 if header_len > remaining_bytes {
756 return Err(eof_err!("Invalid page header"));
757 }
758 Ok(())
759}
760
761fn verify_page_size(
762 compressed_size: i32,
763 uncompressed_size: i32,
764 remaining_bytes: usize,
765) -> Result<()> {
766 if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
770 return Err(eof_err!("Invalid page header"));
771 }
772 Ok(())
773}
774
775impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
776 fn get_next_page(&mut self) -> Result<Option<Page>> {
777 loop {
778 let page = match &mut self.state {
779 SerializedPageReaderState::Values {
780 offset,
781 remaining_bytes: remaining,
782 next_page_header,
783 page_ordinal,
784 require_dictionary,
785 } => {
786 if *remaining == 0 {
787 return Ok(None);
788 }
789
790 let mut read = self.reader.get_read(*offset as u64)?;
791 let header = if let Some(header) = next_page_header.take() {
792 *header
793 } else {
794 #[cfg(feature = "encryption")]
795 let (header_len, header) = if self.crypto_context.is_some() {
796 let crypto_context = page_crypto_context(
797 &self.crypto_context,
798 *page_ordinal,
799 *require_dictionary,
800 )?;
801 read_encrypted_page_header_len(&mut read, crypto_context)?
802 } else {
803 read_page_header_len(&mut read)?
804 };
805
806 #[cfg(not(feature = "encryption"))]
807 let (header_len, header) = read_page_header_len(&mut read)?;
808
809 verify_page_header_len(header_len, *remaining)?;
810 *offset += header_len;
811 *remaining -= header_len;
812 header
813 };
814 verify_page_size(
815 header.compressed_page_size,
816 header.uncompressed_page_size,
817 *remaining,
818 )?;
819 let data_len = header.compressed_page_size as usize;
820 *offset += data_len;
821 *remaining -= data_len;
822
823 if header.type_ == PageType::INDEX_PAGE {
824 continue;
825 }
826
827 let mut buffer = Vec::with_capacity(data_len);
828 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
829
830 if read != data_len {
831 return Err(eof_err!(
832 "Expected to read {} bytes of page, read only {}",
833 data_len,
834 read
835 ));
836 }
837
838 #[cfg(feature = "encryption")]
839 let crypto_context = page_crypto_context(
840 &self.crypto_context,
841 *page_ordinal,
842 *require_dictionary,
843 )?;
844 #[cfg(feature = "encryption")]
845 let buffer: Vec<u8> = if let Some(crypto_context) = crypto_context {
846 let decryptor = crypto_context.data_decryptor();
847 let aad = crypto_context.create_page_aad()?;
848 decryptor.decrypt(buffer.as_ref(), &aad)?
849 } else {
850 buffer
851 };
852
853 let page = decode_page(
854 header,
855 Bytes::from(buffer),
856 self.physical_type,
857 self.decompressor.as_mut(),
858 )?;
859 if page.is_data_page() {
860 *page_ordinal += 1;
861 } else if page.is_dictionary_page() {
862 *require_dictionary = false;
863 }
864 page
865 }
866 SerializedPageReaderState::Pages {
867 page_locations,
868 dictionary_page,
869 ..
870 } => {
871 let front = match dictionary_page
872 .take()
873 .or_else(|| page_locations.pop_front())
874 {
875 Some(front) => front,
876 None => return Ok(None),
877 };
878
879 let page_len = usize::try_from(front.compressed_page_size)?;
880
881 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
882
883 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
884 let header = PageHeader::read_from_in_protocol(&mut prot)?;
885 let offset = buffer.len() - prot.as_slice().len();
886
887 let bytes = buffer.slice(offset..);
888 decode_page(
889 header,
890 bytes,
891 self.physical_type,
892 self.decompressor.as_mut(),
893 )?
894 }
895 };
896
897 return Ok(Some(page));
898 }
899 }
900
901 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
902 match &mut self.state {
903 SerializedPageReaderState::Values {
904 offset,
905 remaining_bytes,
906 next_page_header,
907 ..
908 } => {
909 loop {
910 if *remaining_bytes == 0 {
911 return Ok(None);
912 }
913 return if let Some(header) = next_page_header.as_ref() {
914 if let Ok(page_meta) = (&**header).try_into() {
915 Ok(Some(page_meta))
916 } else {
917 *next_page_header = None;
919 continue;
920 }
921 } else {
922 let mut read = self.reader.get_read(*offset as u64)?;
923 let (header_len, header) = read_page_header_len(&mut read)?;
924 verify_page_header_len(header_len, *remaining_bytes)?;
925 *offset += header_len;
926 *remaining_bytes -= header_len;
927 let page_meta = if let Ok(page_meta) = (&header).try_into() {
928 Ok(Some(page_meta))
929 } else {
930 continue;
932 };
933 *next_page_header = Some(Box::new(header));
934 page_meta
935 };
936 }
937 }
938 SerializedPageReaderState::Pages {
939 page_locations,
940 dictionary_page,
941 total_rows,
942 } => {
943 if dictionary_page.is_some() {
944 Ok(Some(PageMetadata {
945 num_rows: None,
946 num_levels: None,
947 is_dict: true,
948 }))
949 } else if let Some(page) = page_locations.front() {
950 let next_rows = page_locations
951 .get(1)
952 .map(|x| x.first_row_index as usize)
953 .unwrap_or(*total_rows);
954
955 Ok(Some(PageMetadata {
956 num_rows: Some(next_rows - page.first_row_index as usize),
957 num_levels: None,
958 is_dict: false,
959 }))
960 } else {
961 Ok(None)
962 }
963 }
964 }
965 }
966
967 fn skip_next_page(&mut self) -> Result<()> {
968 match &mut self.state {
969 SerializedPageReaderState::Values {
970 offset,
971 remaining_bytes,
972 next_page_header,
973 ..
974 } => {
975 if let Some(buffered_header) = next_page_header.take() {
976 verify_page_size(
977 buffered_header.compressed_page_size,
978 buffered_header.uncompressed_page_size,
979 *remaining_bytes,
980 )?;
981 *offset += buffered_header.compressed_page_size as usize;
983 *remaining_bytes -= buffered_header.compressed_page_size as usize;
984 } else {
985 let mut read = self.reader.get_read(*offset as u64)?;
986 let (header_len, header) = read_page_header_len(&mut read)?;
987 verify_page_header_len(header_len, *remaining_bytes)?;
988 verify_page_size(
989 header.compressed_page_size,
990 header.uncompressed_page_size,
991 *remaining_bytes,
992 )?;
993 let data_page_size = header.compressed_page_size as usize;
994 *offset += header_len + data_page_size;
995 *remaining_bytes -= header_len + data_page_size;
996 }
997 Ok(())
998 }
999 SerializedPageReaderState::Pages {
1000 page_locations,
1001 dictionary_page,
1002 ..
1003 } => {
1004 if dictionary_page.is_some() {
1005 dictionary_page.take();
1007 } else {
1008 page_locations.pop_front();
1010 }
1011
1012 Ok(())
1013 }
1014 }
1015 }
1016
1017 fn at_record_boundary(&mut self) -> Result<bool> {
1018 match &mut self.state {
1019 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1020 SerializedPageReaderState::Pages { .. } => Ok(true),
1021 }
1022 }
1023}
1024
1025#[cfg(feature = "encryption")]
1026fn page_crypto_context(
1027 crypto_context: &Option<Arc<CryptoContext>>,
1028 page_ordinal: usize,
1029 dictionary_page: bool,
1030) -> Result<Option<Arc<CryptoContext>>> {
1031 Ok(crypto_context.as_ref().map(|c| {
1032 Arc::new(if dictionary_page {
1033 c.for_dictionary_page()
1034 } else {
1035 c.with_page_ordinal(page_ordinal)
1036 })
1037 }))
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use std::collections::HashSet;
1043
1044 use bytes::Buf;
1045
1046 use crate::file::properties::{EnabledStatistics, WriterProperties};
1047 use crate::format::BoundaryOrder;
1048
1049 use crate::basic::{self, ColumnOrder, SortOrder};
1050 use crate::column::reader::ColumnReader;
1051 use crate::data_type::private::ParquetValueType;
1052 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1053 use crate::file::page_index::index::{Index, NativeIndex};
1054 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1055 use crate::file::writer::SerializedFileWriter;
1056 use crate::record::RowAccessor;
1057 use crate::schema::parser::parse_message_type;
1058 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1059
1060 use super::*;
1061
1062 #[test]
1063 fn test_cursor_and_file_has_the_same_behaviour() {
1064 let mut buf: Vec<u8> = Vec::new();
1065 get_test_file("alltypes_plain.parquet")
1066 .read_to_end(&mut buf)
1067 .unwrap();
1068 let cursor = Bytes::from(buf);
1069 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1070
1071 let test_file = get_test_file("alltypes_plain.parquet");
1072 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1073
1074 let file_iter = read_from_file.get_row_iter(None).unwrap();
1075 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1076
1077 for (a, b) in file_iter.zip(cursor_iter) {
1078 assert_eq!(a.unwrap(), b.unwrap())
1079 }
1080 }
1081
1082 #[test]
1083 fn test_file_reader_try_from() {
1084 let test_file = get_test_file("alltypes_plain.parquet");
1086 let test_path_buf = get_test_path("alltypes_plain.parquet");
1087 let test_path = test_path_buf.as_path();
1088 let test_path_str = test_path.to_str().unwrap();
1089
1090 let reader = SerializedFileReader::try_from(test_file);
1091 assert!(reader.is_ok());
1092
1093 let reader = SerializedFileReader::try_from(test_path);
1094 assert!(reader.is_ok());
1095
1096 let reader = SerializedFileReader::try_from(test_path_str);
1097 assert!(reader.is_ok());
1098
1099 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1100 assert!(reader.is_ok());
1101
1102 let test_path = Path::new("invalid.parquet");
1104 let test_path_str = test_path.to_str().unwrap();
1105
1106 let reader = SerializedFileReader::try_from(test_path);
1107 assert!(reader.is_err());
1108
1109 let reader = SerializedFileReader::try_from(test_path_str);
1110 assert!(reader.is_err());
1111
1112 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1113 assert!(reader.is_err());
1114 }
1115
1116 #[test]
1117 fn test_file_reader_into_iter() {
1118 let path = get_test_path("alltypes_plain.parquet");
1119 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1120 let iter = reader.into_iter();
1121 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1122
1123 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1124 }
1125
1126 #[test]
1127 fn test_file_reader_into_iter_project() {
1128 let path = get_test_path("alltypes_plain.parquet");
1129 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1130 let schema = "message schema { OPTIONAL INT32 id; }";
1131 let proj = parse_message_type(schema).ok();
1132 let iter = reader.into_iter().project(proj).unwrap();
1133 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1134
1135 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1136 }
1137
1138 #[test]
1139 fn test_reuse_file_chunk() {
1140 let test_file = get_test_file("alltypes_plain.parquet");
1144 let reader = SerializedFileReader::new(test_file).unwrap();
1145 let row_group = reader.get_row_group(0).unwrap();
1146
1147 let mut page_readers = Vec::new();
1148 for i in 0..row_group.num_columns() {
1149 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1150 }
1151
1152 for mut page_reader in page_readers {
1155 assert!(page_reader.get_next_page().is_ok());
1156 }
1157 }
1158
1159 #[test]
1160 fn test_file_reader() {
1161 let test_file = get_test_file("alltypes_plain.parquet");
1162 let reader_result = SerializedFileReader::new(test_file);
1163 assert!(reader_result.is_ok());
1164 let reader = reader_result.unwrap();
1165
1166 let metadata = reader.metadata();
1168 assert_eq!(metadata.num_row_groups(), 1);
1169
1170 let file_metadata = metadata.file_metadata();
1172 assert!(file_metadata.created_by().is_some());
1173 assert_eq!(
1174 file_metadata.created_by().unwrap(),
1175 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1176 );
1177 assert!(file_metadata.key_value_metadata().is_none());
1178 assert_eq!(file_metadata.num_rows(), 8);
1179 assert_eq!(file_metadata.version(), 1);
1180 assert_eq!(file_metadata.column_orders(), None);
1181
1182 let row_group_metadata = metadata.row_group(0);
1184 assert_eq!(row_group_metadata.num_columns(), 11);
1185 assert_eq!(row_group_metadata.num_rows(), 8);
1186 assert_eq!(row_group_metadata.total_byte_size(), 671);
1187 for i in 0..row_group_metadata.num_columns() {
1189 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1190 }
1191
1192 let row_group_reader_result = reader.get_row_group(0);
1194 assert!(row_group_reader_result.is_ok());
1195 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1196 assert_eq!(
1197 row_group_reader.num_columns(),
1198 row_group_metadata.num_columns()
1199 );
1200 assert_eq!(
1201 row_group_reader.metadata().total_byte_size(),
1202 row_group_metadata.total_byte_size()
1203 );
1204
1205 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1208 assert!(page_reader_0_result.is_ok());
1209 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1210 let mut page_count = 0;
1211 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1212 let is_expected_page = match page {
1213 Page::DictionaryPage {
1214 buf,
1215 num_values,
1216 encoding,
1217 is_sorted,
1218 } => {
1219 assert_eq!(buf.len(), 32);
1220 assert_eq!(num_values, 8);
1221 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1222 assert!(!is_sorted);
1223 true
1224 }
1225 Page::DataPage {
1226 buf,
1227 num_values,
1228 encoding,
1229 def_level_encoding,
1230 rep_level_encoding,
1231 statistics,
1232 } => {
1233 assert_eq!(buf.len(), 11);
1234 assert_eq!(num_values, 8);
1235 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1236 assert_eq!(def_level_encoding, Encoding::RLE);
1237 #[allow(deprecated)]
1238 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1239 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1240 assert!(statistics.is_none());
1241 true
1242 }
1243 _ => false,
1244 };
1245 assert!(is_expected_page);
1246 page_count += 1;
1247 }
1248 assert_eq!(page_count, 2);
1249 }
1250
1251 #[test]
1252 fn test_file_reader_datapage_v2() {
1253 let test_file = get_test_file("datapage_v2.snappy.parquet");
1254 let reader_result = SerializedFileReader::new(test_file);
1255 assert!(reader_result.is_ok());
1256 let reader = reader_result.unwrap();
1257
1258 let metadata = reader.metadata();
1260 assert_eq!(metadata.num_row_groups(), 1);
1261
1262 let file_metadata = metadata.file_metadata();
1264 assert!(file_metadata.created_by().is_some());
1265 assert_eq!(
1266 file_metadata.created_by().unwrap(),
1267 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1268 );
1269 assert!(file_metadata.key_value_metadata().is_some());
1270 assert_eq!(
1271 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1272 1
1273 );
1274
1275 assert_eq!(file_metadata.num_rows(), 5);
1276 assert_eq!(file_metadata.version(), 1);
1277 assert_eq!(file_metadata.column_orders(), None);
1278
1279 let row_group_metadata = metadata.row_group(0);
1280
1281 for i in 0..row_group_metadata.num_columns() {
1283 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1284 }
1285
1286 let row_group_reader_result = reader.get_row_group(0);
1288 assert!(row_group_reader_result.is_ok());
1289 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1290 assert_eq!(
1291 row_group_reader.num_columns(),
1292 row_group_metadata.num_columns()
1293 );
1294 assert_eq!(
1295 row_group_reader.metadata().total_byte_size(),
1296 row_group_metadata.total_byte_size()
1297 );
1298
1299 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1302 assert!(page_reader_0_result.is_ok());
1303 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1304 let mut page_count = 0;
1305 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1306 let is_expected_page = match page {
1307 Page::DictionaryPage {
1308 buf,
1309 num_values,
1310 encoding,
1311 is_sorted,
1312 } => {
1313 assert_eq!(buf.len(), 7);
1314 assert_eq!(num_values, 1);
1315 assert_eq!(encoding, Encoding::PLAIN);
1316 assert!(!is_sorted);
1317 true
1318 }
1319 Page::DataPageV2 {
1320 buf,
1321 num_values,
1322 encoding,
1323 num_nulls,
1324 num_rows,
1325 def_levels_byte_len,
1326 rep_levels_byte_len,
1327 is_compressed,
1328 statistics,
1329 } => {
1330 assert_eq!(buf.len(), 4);
1331 assert_eq!(num_values, 5);
1332 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1333 assert_eq!(num_nulls, 1);
1334 assert_eq!(num_rows, 5);
1335 assert_eq!(def_levels_byte_len, 2);
1336 assert_eq!(rep_levels_byte_len, 0);
1337 assert!(is_compressed);
1338 assert!(statistics.is_some());
1339 true
1340 }
1341 _ => false,
1342 };
1343 assert!(is_expected_page);
1344 page_count += 1;
1345 }
1346 assert_eq!(page_count, 2);
1347 }
1348
1349 #[test]
1350 fn test_file_reader_empty_compressed_datapage_v2() {
1351 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1353 let reader_result = SerializedFileReader::new(test_file);
1354 assert!(reader_result.is_ok());
1355 let reader = reader_result.unwrap();
1356
1357 let metadata = reader.metadata();
1359 assert_eq!(metadata.num_row_groups(), 1);
1360
1361 let file_metadata = metadata.file_metadata();
1363 assert!(file_metadata.created_by().is_some());
1364 assert_eq!(
1365 file_metadata.created_by().unwrap(),
1366 "parquet-cpp-arrow version 14.0.2"
1367 );
1368 assert!(file_metadata.key_value_metadata().is_some());
1369 assert_eq!(
1370 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1371 1
1372 );
1373
1374 assert_eq!(file_metadata.num_rows(), 10);
1375 assert_eq!(file_metadata.version(), 2);
1376 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1377 assert_eq!(
1378 file_metadata.column_orders(),
1379 Some(vec![expected_order].as_ref())
1380 );
1381
1382 let row_group_metadata = metadata.row_group(0);
1383
1384 for i in 0..row_group_metadata.num_columns() {
1386 assert_eq!(file_metadata.column_order(i), expected_order);
1387 }
1388
1389 let row_group_reader_result = reader.get_row_group(0);
1391 assert!(row_group_reader_result.is_ok());
1392 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1393 assert_eq!(
1394 row_group_reader.num_columns(),
1395 row_group_metadata.num_columns()
1396 );
1397 assert_eq!(
1398 row_group_reader.metadata().total_byte_size(),
1399 row_group_metadata.total_byte_size()
1400 );
1401
1402 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1404 assert!(page_reader_0_result.is_ok());
1405 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1406 let mut page_count = 0;
1407 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1408 let is_expected_page = match page {
1409 Page::DictionaryPage {
1410 buf,
1411 num_values,
1412 encoding,
1413 is_sorted,
1414 } => {
1415 assert_eq!(buf.len(), 0);
1416 assert_eq!(num_values, 0);
1417 assert_eq!(encoding, Encoding::PLAIN);
1418 assert!(!is_sorted);
1419 true
1420 }
1421 Page::DataPageV2 {
1422 buf,
1423 num_values,
1424 encoding,
1425 num_nulls,
1426 num_rows,
1427 def_levels_byte_len,
1428 rep_levels_byte_len,
1429 is_compressed,
1430 statistics,
1431 } => {
1432 assert_eq!(buf.len(), 3);
1433 assert_eq!(num_values, 10);
1434 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1435 assert_eq!(num_nulls, 10);
1436 assert_eq!(num_rows, 10);
1437 assert_eq!(def_levels_byte_len, 2);
1438 assert_eq!(rep_levels_byte_len, 0);
1439 assert!(is_compressed);
1440 assert!(statistics.is_some());
1441 true
1442 }
1443 _ => false,
1444 };
1445 assert!(is_expected_page);
1446 page_count += 1;
1447 }
1448 assert_eq!(page_count, 2);
1449 }
1450
1451 #[test]
1452 fn test_file_reader_empty_datapage_v2() {
1453 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1455 let reader_result = SerializedFileReader::new(test_file);
1456 assert!(reader_result.is_ok());
1457 let reader = reader_result.unwrap();
1458
1459 let metadata = reader.metadata();
1461 assert_eq!(metadata.num_row_groups(), 1);
1462
1463 let file_metadata = metadata.file_metadata();
1465 assert!(file_metadata.created_by().is_some());
1466 assert_eq!(
1467 file_metadata.created_by().unwrap(),
1468 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1469 );
1470 assert!(file_metadata.key_value_metadata().is_some());
1471 assert_eq!(
1472 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1473 2
1474 );
1475
1476 assert_eq!(file_metadata.num_rows(), 1);
1477 assert_eq!(file_metadata.version(), 1);
1478 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1479 assert_eq!(
1480 file_metadata.column_orders(),
1481 Some(vec![expected_order].as_ref())
1482 );
1483
1484 let row_group_metadata = metadata.row_group(0);
1485
1486 for i in 0..row_group_metadata.num_columns() {
1488 assert_eq!(file_metadata.column_order(i), expected_order);
1489 }
1490
1491 let row_group_reader_result = reader.get_row_group(0);
1493 assert!(row_group_reader_result.is_ok());
1494 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1495 assert_eq!(
1496 row_group_reader.num_columns(),
1497 row_group_metadata.num_columns()
1498 );
1499 assert_eq!(
1500 row_group_reader.metadata().total_byte_size(),
1501 row_group_metadata.total_byte_size()
1502 );
1503
1504 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1506 assert!(page_reader_0_result.is_ok());
1507 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1508 let mut page_count = 0;
1509 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1510 let is_expected_page = match page {
1511 Page::DataPageV2 {
1512 buf,
1513 num_values,
1514 encoding,
1515 num_nulls,
1516 num_rows,
1517 def_levels_byte_len,
1518 rep_levels_byte_len,
1519 is_compressed,
1520 statistics,
1521 } => {
1522 assert_eq!(buf.len(), 2);
1523 assert_eq!(num_values, 1);
1524 assert_eq!(encoding, Encoding::PLAIN);
1525 assert_eq!(num_nulls, 1);
1526 assert_eq!(num_rows, 1);
1527 assert_eq!(def_levels_byte_len, 2);
1528 assert_eq!(rep_levels_byte_len, 0);
1529 assert!(is_compressed);
1530 assert!(statistics.is_none());
1531 true
1532 }
1533 _ => false,
1534 };
1535 assert!(is_expected_page);
1536 page_count += 1;
1537 }
1538 assert_eq!(page_count, 1);
1539 }
1540
1541 fn get_serialized_page_reader<R: ChunkReader>(
1542 file_reader: &SerializedFileReader<R>,
1543 row_group: usize,
1544 column: usize,
1545 ) -> Result<SerializedPageReader<R>> {
1546 let row_group = {
1547 let row_group_metadata = file_reader.metadata.row_group(row_group);
1548 let props = Arc::clone(&file_reader.props);
1549 let f = Arc::clone(&file_reader.chunk_reader);
1550 SerializedRowGroupReader::new(
1551 f,
1552 row_group_metadata,
1553 file_reader
1554 .metadata
1555 .offset_index()
1556 .map(|x| x[row_group].as_slice()),
1557 props,
1558 )?
1559 };
1560
1561 let col = row_group.metadata.column(column);
1562
1563 let page_locations = row_group
1564 .offset_index
1565 .map(|x| x[column].page_locations.clone());
1566
1567 let props = Arc::clone(&row_group.props);
1568 SerializedPageReader::new_with_properties(
1569 Arc::clone(&row_group.chunk_reader),
1570 col,
1571 usize::try_from(row_group.metadata.num_rows())?,
1572 page_locations,
1573 props,
1574 )
1575 }
1576
1577 #[test]
1578 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1579 let test_file = get_test_file("alltypes_plain.parquet");
1580 let reader = SerializedFileReader::new(test_file)?;
1581
1582 let mut offset_set = HashSet::new();
1583 let num_row_groups = reader.metadata.num_row_groups();
1584 for row_group in 0..num_row_groups {
1585 let num_columns = reader.metadata.row_group(row_group).num_columns();
1586 for column in 0..num_columns {
1587 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1588
1589 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1590 match &page_reader.state {
1591 SerializedPageReaderState::Pages {
1592 page_locations,
1593 dictionary_page,
1594 ..
1595 } => {
1596 if let Some(page) = dictionary_page {
1597 assert_eq!(page.offset as usize, page_offset);
1598 } else if let Some(page) = page_locations.front() {
1599 assert_eq!(page.offset as usize, page_offset);
1600 } else {
1601 unreachable!()
1602 }
1603 }
1604 SerializedPageReaderState::Values {
1605 offset,
1606 next_page_header,
1607 ..
1608 } => {
1609 assert!(next_page_header.is_some());
1610 assert_eq!(*offset, page_offset);
1611 }
1612 }
1613 let page = page_reader.get_next_page()?;
1614 assert!(page.is_some());
1615 let newly_inserted = offset_set.insert(page_offset);
1616 assert!(newly_inserted);
1617 }
1618 }
1619 }
1620
1621 Ok(())
1622 }
1623
1624 #[test]
1625 fn test_page_iterator() {
1626 let file = get_test_file("alltypes_plain.parquet");
1627 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1628
1629 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1630
1631 let page = page_iterator.next();
1633 assert!(page.is_some());
1634 assert!(page.unwrap().is_ok());
1635
1636 let page = page_iterator.next();
1638 assert!(page.is_none());
1639
1640 let row_group_indices = Box::new(0..1);
1641 let mut page_iterator =
1642 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1643
1644 let page = page_iterator.next();
1646 assert!(page.is_some());
1647 assert!(page.unwrap().is_ok());
1648
1649 let page = page_iterator.next();
1651 assert!(page.is_none());
1652 }
1653
1654 #[test]
1655 fn test_file_reader_key_value_metadata() {
1656 let file = get_test_file("binary.parquet");
1657 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1658
1659 let metadata = file_reader
1660 .metadata
1661 .file_metadata()
1662 .key_value_metadata()
1663 .unwrap();
1664
1665 assert_eq!(metadata.len(), 3);
1666
1667 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1668
1669 assert_eq!(metadata[1].key, "writer.model.name");
1670 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1671
1672 assert_eq!(metadata[2].key, "parquet.proto.class");
1673 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1674 }
1675
1676 #[test]
1677 fn test_file_reader_optional_metadata() {
1678 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1680 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1681
1682 let row_group_metadata = file_reader.metadata.row_group(0);
1683 let col0_metadata = row_group_metadata.column(0);
1684
1685 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1687
1688 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1690
1691 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1692 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1693 assert_eq!(page_encoding_stats.count, 1);
1694
1695 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1697 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1698
1699 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1701 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1702 }
1703
1704 #[test]
1705 fn test_file_reader_with_no_filter() -> Result<()> {
1706 let test_file = get_test_file("alltypes_plain.parquet");
1707 let origin_reader = SerializedFileReader::new(test_file)?;
1708 let metadata = origin_reader.metadata();
1710 assert_eq!(metadata.num_row_groups(), 1);
1711 Ok(())
1712 }
1713
1714 #[test]
1715 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1716 let test_file = get_test_file("alltypes_plain.parquet");
1717 let read_options = ReadOptionsBuilder::new()
1718 .with_predicate(Box::new(|_, _| false))
1719 .build();
1720 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1721 let metadata = reader.metadata();
1722 assert_eq!(metadata.num_row_groups(), 0);
1723 Ok(())
1724 }
1725
1726 #[test]
1727 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1728 let test_file = get_test_file("alltypes_plain.parquet");
1729 let origin_reader = SerializedFileReader::new(test_file)?;
1730 let metadata = origin_reader.metadata();
1732 assert_eq!(metadata.num_row_groups(), 1);
1733 let mid = get_midpoint_offset(metadata.row_group(0));
1734
1735 let test_file = get_test_file("alltypes_plain.parquet");
1736 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1737 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1738 let metadata = reader.metadata();
1739 assert_eq!(metadata.num_row_groups(), 1);
1740
1741 let test_file = get_test_file("alltypes_plain.parquet");
1742 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1743 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1744 let metadata = reader.metadata();
1745 assert_eq!(metadata.num_row_groups(), 0);
1746 Ok(())
1747 }
1748
1749 #[test]
1750 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1751 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1752 let origin_reader = SerializedFileReader::new(test_file)?;
1753 let metadata = origin_reader.metadata();
1754 let mid = get_midpoint_offset(metadata.row_group(0));
1755
1756 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1758 let read_options = ReadOptionsBuilder::new()
1759 .with_page_index()
1760 .with_predicate(Box::new(|_, _| true))
1761 .with_range(mid, mid + 1)
1762 .build();
1763 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1764 let metadata = reader.metadata();
1765 assert_eq!(metadata.num_row_groups(), 1);
1766 assert_eq!(metadata.column_index().unwrap().len(), 1);
1767 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1768
1769 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1771 let read_options = ReadOptionsBuilder::new()
1772 .with_page_index()
1773 .with_predicate(Box::new(|_, _| true))
1774 .with_range(0, mid)
1775 .build();
1776 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1777 let metadata = reader.metadata();
1778 assert_eq!(metadata.num_row_groups(), 0);
1779 assert!(metadata.column_index().is_none());
1780 assert!(metadata.offset_index().is_none());
1781
1782 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1784 let read_options = ReadOptionsBuilder::new()
1785 .with_page_index()
1786 .with_predicate(Box::new(|_, _| false))
1787 .with_range(mid, mid + 1)
1788 .build();
1789 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1790 let metadata = reader.metadata();
1791 assert_eq!(metadata.num_row_groups(), 0);
1792 assert!(metadata.column_index().is_none());
1793 assert!(metadata.offset_index().is_none());
1794
1795 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1797 let read_options = ReadOptionsBuilder::new()
1798 .with_page_index()
1799 .with_predicate(Box::new(|_, _| false))
1800 .with_range(0, mid)
1801 .build();
1802 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1803 let metadata = reader.metadata();
1804 assert_eq!(metadata.num_row_groups(), 0);
1805 assert!(metadata.column_index().is_none());
1806 assert!(metadata.offset_index().is_none());
1807 Ok(())
1808 }
1809
1810 #[test]
1811 fn test_file_reader_invalid_metadata() {
1812 let data = [
1813 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1814 80, 65, 82, 49,
1815 ];
1816 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1817 assert_eq!(
1818 ret.err().unwrap().to_string(),
1819 "Parquet error: Could not parse metadata: bad data"
1820 );
1821 }
1822
1823 #[test]
1824 fn test_page_index_reader() {
1841 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1842 let builder = ReadOptionsBuilder::new();
1843 let options = builder.with_page_index().build();
1845 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1846 let reader = reader_result.unwrap();
1847
1848 let metadata = reader.metadata();
1850 assert_eq!(metadata.num_row_groups(), 1);
1851
1852 let column_index = metadata.column_index().unwrap();
1853
1854 assert_eq!(column_index.len(), 1);
1856 let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1857 index
1858 } else {
1859 unreachable!()
1860 };
1861
1862 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1863 let index_in_pages = &index.indexes;
1864
1865 assert_eq!(index_in_pages.len(), 1);
1867
1868 let page0 = &index_in_pages[0];
1869 let min = page0.min.as_ref().unwrap();
1870 let max = page0.max.as_ref().unwrap();
1871 assert_eq!(b"Hello", min.as_bytes());
1872 assert_eq!(b"today", max.as_bytes());
1873
1874 let offset_indexes = metadata.offset_index().unwrap();
1875 assert_eq!(offset_indexes.len(), 1);
1877 let offset_index = &offset_indexes[0];
1878 let page_offset = &offset_index[0].page_locations()[0];
1879
1880 assert_eq!(4, page_offset.offset);
1881 assert_eq!(152, page_offset.compressed_page_size);
1882 assert_eq!(0, page_offset.first_row_index);
1883 }
1884
1885 #[test]
1886 fn test_page_index_reader_out_of_order() {
1887 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1888 let options = ReadOptionsBuilder::new().with_page_index().build();
1889 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1890 let metadata = reader.metadata();
1891
1892 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1893 let columns = metadata.row_group(0).columns();
1894 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1895
1896 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1897 let mut b = read_columns_indexes(&test_file, &reversed)
1898 .unwrap()
1899 .unwrap();
1900 b.reverse();
1901 assert_eq!(a, b);
1902
1903 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1904 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1905 b.reverse();
1906 assert_eq!(a, b);
1907 }
1908
1909 #[test]
1910 fn test_page_index_reader_all_type() {
1911 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1912 let builder = ReadOptionsBuilder::new();
1913 let options = builder.with_page_index().build();
1915 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1916 let reader = reader_result.unwrap();
1917
1918 let metadata = reader.metadata();
1920 assert_eq!(metadata.num_row_groups(), 1);
1921
1922 let column_index = metadata.column_index().unwrap();
1923 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1924
1925 assert_eq!(column_index.len(), 1);
1927 let row_group_metadata = metadata.row_group(0);
1928
1929 assert!(!&column_index[0][0].is_sorted());
1931 let boundary_order = &column_index[0][0].get_boundary_order();
1932 assert!(boundary_order.is_some());
1933 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1934 if let Index::INT32(index) = &column_index[0][0] {
1935 check_native_page_index(
1936 index,
1937 325,
1938 get_row_group_min_max_bytes(row_group_metadata, 0),
1939 BoundaryOrder::UNORDERED,
1940 );
1941 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1942 } else {
1943 unreachable!()
1944 };
1945 assert!(&column_index[0][1].is_sorted());
1947 if let Index::BOOLEAN(index) = &column_index[0][1] {
1948 assert_eq!(index.indexes.len(), 82);
1949 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1950 } else {
1951 unreachable!()
1952 };
1953 assert!(&column_index[0][2].is_sorted());
1955 if let Index::INT32(index) = &column_index[0][2] {
1956 check_native_page_index(
1957 index,
1958 325,
1959 get_row_group_min_max_bytes(row_group_metadata, 2),
1960 BoundaryOrder::ASCENDING,
1961 );
1962 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1963 } else {
1964 unreachable!()
1965 };
1966 assert!(&column_index[0][3].is_sorted());
1968 if let Index::INT32(index) = &column_index[0][3] {
1969 check_native_page_index(
1970 index,
1971 325,
1972 get_row_group_min_max_bytes(row_group_metadata, 3),
1973 BoundaryOrder::ASCENDING,
1974 );
1975 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1976 } else {
1977 unreachable!()
1978 };
1979 assert!(&column_index[0][4].is_sorted());
1981 if let Index::INT32(index) = &column_index[0][4] {
1982 check_native_page_index(
1983 index,
1984 325,
1985 get_row_group_min_max_bytes(row_group_metadata, 4),
1986 BoundaryOrder::ASCENDING,
1987 );
1988 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1989 } else {
1990 unreachable!()
1991 };
1992 assert!(!&column_index[0][5].is_sorted());
1994 if let Index::INT64(index) = &column_index[0][5] {
1995 check_native_page_index(
1996 index,
1997 528,
1998 get_row_group_min_max_bytes(row_group_metadata, 5),
1999 BoundaryOrder::UNORDERED,
2000 );
2001 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2002 } else {
2003 unreachable!()
2004 };
2005 assert!(&column_index[0][6].is_sorted());
2007 if let Index::FLOAT(index) = &column_index[0][6] {
2008 check_native_page_index(
2009 index,
2010 325,
2011 get_row_group_min_max_bytes(row_group_metadata, 6),
2012 BoundaryOrder::ASCENDING,
2013 );
2014 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2015 } else {
2016 unreachable!()
2017 };
2018 assert!(!&column_index[0][7].is_sorted());
2020 if let Index::DOUBLE(index) = &column_index[0][7] {
2021 check_native_page_index(
2022 index,
2023 528,
2024 get_row_group_min_max_bytes(row_group_metadata, 7),
2025 BoundaryOrder::UNORDERED,
2026 );
2027 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2028 } else {
2029 unreachable!()
2030 };
2031 assert!(!&column_index[0][8].is_sorted());
2033 if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2034 check_native_page_index(
2035 index,
2036 974,
2037 get_row_group_min_max_bytes(row_group_metadata, 8),
2038 BoundaryOrder::UNORDERED,
2039 );
2040 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2041 } else {
2042 unreachable!()
2043 };
2044 assert!(&column_index[0][9].is_sorted());
2046 if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2047 check_native_page_index(
2048 index,
2049 352,
2050 get_row_group_min_max_bytes(row_group_metadata, 9),
2051 BoundaryOrder::ASCENDING,
2052 );
2053 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2054 } else {
2055 unreachable!()
2056 };
2057 assert!(!&column_index[0][10].is_sorted());
2060 if let Index::NONE = &column_index[0][10] {
2061 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2062 } else {
2063 unreachable!()
2064 };
2065 assert!(&column_index[0][11].is_sorted());
2067 if let Index::INT32(index) = &column_index[0][11] {
2068 check_native_page_index(
2069 index,
2070 325,
2071 get_row_group_min_max_bytes(row_group_metadata, 11),
2072 BoundaryOrder::ASCENDING,
2073 );
2074 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2075 } else {
2076 unreachable!()
2077 };
2078 assert!(!&column_index[0][12].is_sorted());
2080 if let Index::INT32(index) = &column_index[0][12] {
2081 check_native_page_index(
2082 index,
2083 325,
2084 get_row_group_min_max_bytes(row_group_metadata, 12),
2085 BoundaryOrder::UNORDERED,
2086 );
2087 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2088 } else {
2089 unreachable!()
2090 };
2091 }
2092
2093 fn check_native_page_index<T: ParquetValueType>(
2094 row_group_index: &NativeIndex<T>,
2095 page_size: usize,
2096 min_max: (&[u8], &[u8]),
2097 boundary_order: BoundaryOrder,
2098 ) {
2099 assert_eq!(row_group_index.indexes.len(), page_size);
2100 assert_eq!(row_group_index.boundary_order, boundary_order);
2101 row_group_index.indexes.iter().all(|x| {
2102 x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2103 && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2104 });
2105 }
2106
2107 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2108 let statistics = r.column(col_num).statistics().unwrap();
2109 (
2110 statistics.min_bytes_opt().unwrap_or_default(),
2111 statistics.max_bytes_opt().unwrap_or_default(),
2112 )
2113 }
2114
2115 #[test]
2116 fn test_skip_next_page_with_dictionary_page() {
2117 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2118 let builder = ReadOptionsBuilder::new();
2119 let options = builder.with_page_index().build();
2121 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2122 let reader = reader_result.unwrap();
2123
2124 let row_group_reader = reader.get_row_group(0).unwrap();
2125
2126 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2128
2129 let mut vec = vec![];
2130
2131 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2133 assert!(meta.is_dict);
2134
2135 column_page_reader.skip_next_page().unwrap();
2137
2138 let page = column_page_reader.get_next_page().unwrap().unwrap();
2140 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2141
2142 for _i in 0..351 {
2144 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2146 assert!(!meta.is_dict); vec.push(meta);
2148
2149 let page = column_page_reader.get_next_page().unwrap().unwrap();
2150 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2151 }
2152
2153 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2155 assert!(column_page_reader.get_next_page().unwrap().is_none());
2156
2157 assert_eq!(vec.len(), 351);
2159 }
2160
2161 #[test]
2162 fn test_skip_page_with_offset_index() {
2163 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2164 let builder = ReadOptionsBuilder::new();
2165 let options = builder.with_page_index().build();
2167 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2168 let reader = reader_result.unwrap();
2169
2170 let row_group_reader = reader.get_row_group(0).unwrap();
2171
2172 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2174
2175 let mut vec = vec![];
2176
2177 for i in 0..325 {
2178 if i % 2 == 0 {
2179 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2180 } else {
2181 column_page_reader.skip_next_page().unwrap();
2182 }
2183 }
2184 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2186 assert!(column_page_reader.get_next_page().unwrap().is_none());
2187
2188 assert_eq!(vec.len(), 163);
2189 }
2190
2191 #[test]
2192 fn test_skip_page_without_offset_index() {
2193 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2194
2195 let reader_result = SerializedFileReader::new(test_file);
2197 let reader = reader_result.unwrap();
2198
2199 let row_group_reader = reader.get_row_group(0).unwrap();
2200
2201 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2203
2204 let mut vec = vec![];
2205
2206 for i in 0..325 {
2207 if i % 2 == 0 {
2208 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2209 } else {
2210 column_page_reader.peek_next_page().unwrap().unwrap();
2211 column_page_reader.skip_next_page().unwrap();
2212 }
2213 }
2214 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2216 assert!(column_page_reader.get_next_page().unwrap().is_none());
2217
2218 assert_eq!(vec.len(), 163);
2219 }
2220
2221 #[test]
2222 fn test_peek_page_with_dictionary_page() {
2223 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2224 let builder = ReadOptionsBuilder::new();
2225 let options = builder.with_page_index().build();
2227 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2228 let reader = reader_result.unwrap();
2229 let row_group_reader = reader.get_row_group(0).unwrap();
2230
2231 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2233
2234 let mut vec = vec![];
2235
2236 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2237 assert!(meta.is_dict);
2238 let page = column_page_reader.get_next_page().unwrap().unwrap();
2239 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2240
2241 for i in 0..352 {
2242 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2243 if i != 351 {
2246 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2247 } else {
2248 assert_eq!(meta.num_rows, Some(10));
2251 }
2252 assert!(!meta.is_dict);
2253 vec.push(meta);
2254 let page = column_page_reader.get_next_page().unwrap().unwrap();
2255 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2256 }
2257
2258 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2260 assert!(column_page_reader.get_next_page().unwrap().is_none());
2261
2262 assert_eq!(vec.len(), 352);
2263 }
2264
2265 #[test]
2266 fn test_peek_page_with_dictionary_page_without_offset_index() {
2267 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2268
2269 let reader_result = SerializedFileReader::new(test_file);
2270 let reader = reader_result.unwrap();
2271 let row_group_reader = reader.get_row_group(0).unwrap();
2272
2273 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2275
2276 let mut vec = vec![];
2277
2278 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2279 assert!(meta.is_dict);
2280 let page = column_page_reader.get_next_page().unwrap().unwrap();
2281 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2282
2283 for i in 0..352 {
2284 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2285 if i != 351 {
2288 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2289 } else {
2290 assert_eq!(meta.num_levels, Some(10));
2293 }
2294 assert!(!meta.is_dict);
2295 vec.push(meta);
2296 let page = column_page_reader.get_next_page().unwrap().unwrap();
2297 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2298 }
2299
2300 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2302 assert!(column_page_reader.get_next_page().unwrap().is_none());
2303
2304 assert_eq!(vec.len(), 352);
2305 }
2306
2307 #[test]
2308 fn test_fixed_length_index() {
2309 let message_type = "
2310 message test_schema {
2311 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2312 }
2313 ";
2314
2315 let schema = parse_message_type(message_type).unwrap();
2316 let mut out = Vec::with_capacity(1024);
2317 let mut writer =
2318 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2319
2320 let mut r = writer.next_row_group().unwrap();
2321 let mut c = r.next_column().unwrap().unwrap();
2322 c.typed::<FixedLenByteArrayType>()
2323 .write_batch(
2324 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2325 Some(&[1, 1, 0, 1]),
2326 None,
2327 )
2328 .unwrap();
2329 c.close().unwrap();
2330 r.close().unwrap();
2331 writer.close().unwrap();
2332
2333 let b = Bytes::from(out);
2334 let options = ReadOptionsBuilder::new().with_page_index().build();
2335 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2336 let index = reader.metadata().column_index().unwrap();
2337
2338 assert_eq!(index.len(), 1);
2340 let c = &index[0];
2341 assert_eq!(c.len(), 1);
2343
2344 match &c[0] {
2345 Index::FIXED_LEN_BYTE_ARRAY(v) => {
2346 assert_eq!(v.indexes.len(), 1);
2347 let page_idx = &v.indexes[0];
2348 assert_eq!(page_idx.null_count.unwrap(), 1);
2349 assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2350 assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2351 }
2352 _ => unreachable!(),
2353 }
2354 }
2355
2356 #[test]
2357 fn test_multi_gz() {
2358 let file = get_test_file("concatenated_gzip_members.parquet");
2359 let reader = SerializedFileReader::new(file).unwrap();
2360 let row_group_reader = reader.get_row_group(0).unwrap();
2361 match row_group_reader.get_column_reader(0).unwrap() {
2362 ColumnReader::Int64ColumnReader(mut reader) => {
2363 let mut buffer = Vec::with_capacity(1024);
2364 let mut def_levels = Vec::with_capacity(1024);
2365 let (num_records, num_values, num_levels) = reader
2366 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2367 .unwrap();
2368
2369 assert_eq!(num_records, 513);
2370 assert_eq!(num_values, 513);
2371 assert_eq!(num_levels, 513);
2372
2373 let expected: Vec<i64> = (1..514).collect();
2374 assert_eq!(&buffer, &expected);
2375 }
2376 _ => unreachable!(),
2377 }
2378 }
2379
2380 #[test]
2381 fn test_byte_stream_split_extended() {
2382 let path = format!(
2383 "{}/byte_stream_split_extended.gzip.parquet",
2384 arrow::util::test_util::parquet_test_data(),
2385 );
2386 let file = File::open(path).unwrap();
2387 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2388
2389 let mut iter = reader
2391 .get_row_iter(None)
2392 .expect("Failed to create row iterator");
2393
2394 let mut start = 0;
2395 let end = reader.metadata().file_metadata().num_rows();
2396
2397 let check_row = |row: Result<Row, ParquetError>| {
2398 assert!(row.is_ok());
2399 let r = row.unwrap();
2400 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2401 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2402 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2403 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2404 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2405 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2406 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2407 };
2408
2409 while start < end {
2410 match iter.next() {
2411 Some(row) => check_row(row),
2412 None => break,
2413 };
2414 start += 1;
2415 }
2416 }
2417
2418 #[test]
2419 fn test_filtered_rowgroup_metadata() {
2420 let message_type = "
2421 message test_schema {
2422 REQUIRED INT32 a;
2423 }
2424 ";
2425 let schema = Arc::new(parse_message_type(message_type).unwrap());
2426 let props = Arc::new(
2427 WriterProperties::builder()
2428 .set_statistics_enabled(EnabledStatistics::Page)
2429 .build(),
2430 );
2431 let mut file: File = tempfile::tempfile().unwrap();
2432 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2433 let data = [1, 2, 3, 4, 5];
2434
2435 for idx in 0..5 {
2437 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2438 let mut row_group_writer = file_writer.next_row_group().unwrap();
2439 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2440 writer
2441 .typed::<Int32Type>()
2442 .write_batch(data_i.as_slice(), None, None)
2443 .unwrap();
2444 writer.close().unwrap();
2445 }
2446 row_group_writer.close().unwrap();
2447 file_writer.flushed_row_groups();
2448 }
2449 let file_metadata = file_writer.close().unwrap();
2450
2451 assert_eq!(file_metadata.num_rows, 25);
2452 assert_eq!(file_metadata.row_groups.len(), 5);
2453
2454 let read_options = ReadOptionsBuilder::new()
2456 .with_page_index()
2457 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2458 .build();
2459 let reader =
2460 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2461 .unwrap();
2462 let metadata = reader.metadata();
2463
2464 assert_eq!(metadata.num_row_groups(), 1);
2466 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2467
2468 assert!(metadata.column_index().is_some());
2470 assert!(metadata.offset_index().is_some());
2471 assert_eq!(metadata.column_index().unwrap().len(), 1);
2472 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2473 let col_idx = metadata.column_index().unwrap();
2474 let off_idx = metadata.offset_index().unwrap();
2475 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2476 let pg_idx = &col_idx[0][0];
2477 let off_idx_i = &off_idx[0][0];
2478
2479 match pg_idx {
2481 Index::INT32(int_idx) => {
2482 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2483 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2484 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2485 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2486 }
2487 _ => panic!("wrong stats type"),
2488 }
2489
2490 assert_eq!(
2492 off_idx_i.page_locations[0].offset,
2493 metadata.row_group(0).column(0).data_page_offset()
2494 );
2495
2496 let read_options = ReadOptionsBuilder::new()
2498 .with_page_index()
2499 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2500 .build();
2501 let reader =
2502 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2503 .unwrap();
2504 let metadata = reader.metadata();
2505
2506 assert_eq!(metadata.num_row_groups(), 2);
2508 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2509 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2510
2511 assert!(metadata.column_index().is_some());
2513 assert!(metadata.offset_index().is_some());
2514 assert_eq!(metadata.column_index().unwrap().len(), 2);
2515 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2516 let col_idx = metadata.column_index().unwrap();
2517 let off_idx = metadata.offset_index().unwrap();
2518
2519 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2520 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2521 let pg_idx = &col_idx_i[0];
2522 let off_idx_i = &off_idx[i][0];
2523
2524 match pg_idx {
2526 Index::INT32(int_idx) => {
2527 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2528 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2529 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2530 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2531 }
2532 _ => panic!("wrong stats type"),
2533 }
2534
2535 assert_eq!(
2537 off_idx_i.page_locations[0].offset,
2538 metadata.row_group(i).column(0).data_page_offset()
2539 );
2540 }
2541 }
2542}