1use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30 metadata::*,
31 properties::{ReaderProperties, ReaderPropertiesPtr},
32 reader::*,
33 statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40use bytes::Bytes;
41use std::collections::VecDeque;
42use std::iter;
43use std::{fs::File, io::Read, path::Path, sync::Arc};
44use thrift::protocol::TCompactInputProtocol;
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113}
114
115impl ReadOptionsBuilder {
116 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
124 self.predicates.push(predicate);
125 self
126 }
127
128 pub fn with_range(mut self, start: i64, end: i64) -> Self {
131 assert!(start < end);
132 let predicate = move |rg: &RowGroupMetaData, _: usize| {
133 let mid = get_midpoint_offset(rg);
134 mid >= start && mid < end
135 };
136 self.predicates.push(Box::new(predicate));
137 self
138 }
139
140 pub fn with_page_index(mut self) -> Self {
145 self.enable_page_index = true;
146 self
147 }
148
149 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
151 self.props = Some(properties);
152 self
153 }
154
155 pub fn build(self) -> ReadOptions {
157 let props = self
158 .props
159 .unwrap_or_else(|| ReaderProperties::builder().build());
160 ReadOptions {
161 predicates: self.predicates,
162 enable_page_index: self.enable_page_index,
163 props,
164 }
165 }
166}
167
168pub struct ReadOptions {
173 predicates: Vec<ReadGroupPredicate>,
174 enable_page_index: bool,
175 props: ReaderProperties,
176}
177
178impl<R: 'static + ChunkReader> SerializedFileReader<R> {
179 pub fn new(chunk_reader: R) -> Result<Self> {
182 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
183 let props = Arc::new(ReaderProperties::builder().build());
184 Ok(Self {
185 chunk_reader: Arc::new(chunk_reader),
186 metadata: Arc::new(metadata),
187 props,
188 })
189 }
190
191 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
194 let mut metadata_builder = ParquetMetaDataReader::new()
195 .parse_and_finish(&chunk_reader)?
196 .into_builder();
197 let mut predicates = options.predicates;
198
199 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
201 let mut keep = true;
202 for predicate in &mut predicates {
203 if !predicate(&rg_meta, i) {
204 keep = false;
205 break;
206 }
207 }
208 if keep {
209 metadata_builder = metadata_builder.add_row_group(rg_meta);
210 }
211 }
212
213 let mut metadata = metadata_builder.build();
214
215 if options.enable_page_index {
217 let mut reader =
218 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
219 reader.read_page_indexes(&chunk_reader)?;
220 metadata = reader.finish()?;
221 }
222
223 Ok(Self {
224 chunk_reader: Arc::new(chunk_reader),
225 metadata: Arc::new(metadata),
226 props: Arc::new(options.props),
227 })
228 }
229}
230
231fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
233 let col = meta.column(0);
234 let mut offset = col.data_page_offset();
235 if let Some(dic_offset) = col.dictionary_page_offset() {
236 if offset > dic_offset {
237 offset = dic_offset
238 }
239 };
240 offset + meta.compressed_size() / 2
241}
242
243impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
244 fn metadata(&self) -> &ParquetMetaData {
245 &self.metadata
246 }
247
248 fn num_row_groups(&self) -> usize {
249 self.metadata.num_row_groups()
250 }
251
252 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
253 let row_group_metadata = self.metadata.row_group(i);
254 let props = Arc::clone(&self.props);
256 let f = Arc::clone(&self.chunk_reader);
257 Ok(Box::new(SerializedRowGroupReader::new(
258 f,
259 row_group_metadata,
260 self.metadata.offset_index().map(|x| x[i].as_slice()),
261 props,
262 )?))
263 }
264
265 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
266 RowIter::from_file(projection, self)
267 }
268}
269
270pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
272 chunk_reader: Arc<R>,
273 metadata: &'a RowGroupMetaData,
274 offset_index: Option<&'a [OffsetIndexMetaData]>,
275 props: ReaderPropertiesPtr,
276 bloom_filters: Vec<Option<Sbbf>>,
277}
278
279impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
280 pub fn new(
282 chunk_reader: Arc<R>,
283 metadata: &'a RowGroupMetaData,
284 offset_index: Option<&'a [OffsetIndexMetaData]>,
285 props: ReaderPropertiesPtr,
286 ) -> Result<Self> {
287 let bloom_filters = if props.read_bloom_filter() {
288 metadata
289 .columns()
290 .iter()
291 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
292 .collect::<Result<Vec<_>>>()?
293 } else {
294 iter::repeat(None).take(metadata.columns().len()).collect()
295 };
296 Ok(Self {
297 chunk_reader,
298 metadata,
299 offset_index,
300 props,
301 bloom_filters,
302 })
303 }
304}
305
306impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
307 fn metadata(&self) -> &RowGroupMetaData {
308 self.metadata
309 }
310
311 fn num_columns(&self) -> usize {
312 self.metadata.num_columns()
313 }
314
315 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
317 let col = self.metadata.column(i);
318
319 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
320
321 let props = Arc::clone(&self.props);
322 Ok(Box::new(SerializedPageReader::new_with_properties(
323 Arc::clone(&self.chunk_reader),
324 col,
325 self.metadata.num_rows() as usize,
326 page_locations,
327 props,
328 )?))
329 }
330
331 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
333 self.bloom_filters[i].as_ref()
334 }
335
336 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
337 RowIter::from_row_group(projection, self)
338 }
339}
340
341pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
343 let mut prot = TCompactInputProtocol::new(input);
344 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
345}
346
347#[cfg(feature = "encryption")]
348pub(crate) fn read_encrypted_page_header<T: Read>(
349 input: &mut T,
350 crypto_context: Arc<CryptoContext>,
351) -> Result<PageHeader> {
352 let data_decryptor = crypto_context.data_decryptor();
353 let aad = crypto_context.create_page_header_aad()?;
354
355 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
356 ParquetError::General(format!(
357 "Error decrypting column {}, decryptor may be wrong or missing",
358 crypto_context.column_ordinal
359 ))
360 })?;
361
362 let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
363 Ok(PageHeader::read_from_in_protocol(&mut prot)?)
364}
365
366#[cfg(feature = "encryption")]
369fn read_encrypted_page_header_len<T: Read>(
370 input: &mut T,
371 crypto_context: Option<Arc<CryptoContext>>,
372) -> Result<(usize, PageHeader)> {
373 struct TrackedRead<R> {
375 inner: R,
376 bytes_read: usize,
377 }
378
379 impl<R: Read> Read for TrackedRead<R> {
380 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
381 let v = self.inner.read(buf)?;
382 self.bytes_read += v;
383 Ok(v)
384 }
385 }
386
387 let mut tracked = TrackedRead {
388 inner: input,
389 bytes_read: 0,
390 };
391 let header = read_encrypted_page_header(&mut tracked, crypto_context.unwrap())?;
392 Ok((tracked.bytes_read, header))
393}
394
395fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
397 struct TrackedRead<R> {
399 inner: R,
400 bytes_read: usize,
401 }
402
403 impl<R: Read> Read for TrackedRead<R> {
404 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
405 let v = self.inner.read(buf)?;
406 self.bytes_read += v;
407 Ok(v)
408 }
409 }
410
411 let mut tracked = TrackedRead {
412 inner: input,
413 bytes_read: 0,
414 };
415 let header = read_page_header(&mut tracked)?;
416 Ok((tracked.bytes_read, header))
417}
418
419pub(crate) fn decode_page(
421 page_header: PageHeader,
422 buffer: Bytes,
423 physical_type: Type,
424 decompressor: Option<&mut Box<dyn Codec>>,
425) -> Result<Page> {
426 #[cfg(feature = "crc")]
428 if let Some(expected_crc) = page_header.crc {
429 let crc = crc32fast::hash(&buffer);
430 if crc != expected_crc as u32 {
431 return Err(general_err!("Page CRC checksum mismatch"));
432 }
433 }
434
435 let mut offset: usize = 0;
442 let mut can_decompress = true;
443
444 if let Some(ref header_v2) = page_header.data_page_header_v2 {
445 offset = (header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length)
446 as usize;
447 can_decompress = header_v2.is_compressed.unwrap_or(true);
449 }
450
451 let buffer = match decompressor {
454 Some(decompressor) if can_decompress => {
455 let uncompressed_size = page_header.uncompressed_page_size as usize;
456 let mut decompressed = Vec::with_capacity(uncompressed_size);
457 let compressed = &buffer.as_ref()[offset..];
458 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
459 decompressor.decompress(
460 compressed,
461 &mut decompressed,
462 Some(uncompressed_size - offset),
463 )?;
464
465 if decompressed.len() != uncompressed_size {
466 return Err(general_err!(
467 "Actual decompressed size doesn't match the expected one ({} vs {})",
468 decompressed.len(),
469 uncompressed_size
470 ));
471 }
472
473 Bytes::from(decompressed)
474 }
475 _ => buffer,
476 };
477
478 let result = match page_header.type_ {
479 PageType::DICTIONARY_PAGE => {
480 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
481 ParquetError::General("Missing dictionary page header".to_string())
482 })?;
483 let is_sorted = dict_header.is_sorted.unwrap_or(false);
484 Page::DictionaryPage {
485 buf: buffer,
486 num_values: dict_header.num_values.try_into()?,
487 encoding: Encoding::try_from(dict_header.encoding)?,
488 is_sorted,
489 }
490 }
491 PageType::DATA_PAGE => {
492 let header = page_header
493 .data_page_header
494 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
495 Page::DataPage {
496 buf: buffer,
497 num_values: header.num_values.try_into()?,
498 encoding: Encoding::try_from(header.encoding)?,
499 def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
500 rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
501 statistics: statistics::from_thrift(physical_type, header.statistics)?,
502 }
503 }
504 PageType::DATA_PAGE_V2 => {
505 let header = page_header
506 .data_page_header_v2
507 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
508 let is_compressed = header.is_compressed.unwrap_or(true);
509 Page::DataPageV2 {
510 buf: buffer,
511 num_values: header.num_values.try_into()?,
512 encoding: Encoding::try_from(header.encoding)?,
513 num_nulls: header.num_nulls.try_into()?,
514 num_rows: header.num_rows.try_into()?,
515 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
516 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
517 is_compressed,
518 statistics: statistics::from_thrift(physical_type, header.statistics)?,
519 }
520 }
521 _ => {
522 unimplemented!("Page type {:?} is not supported", page_header.type_)
524 }
525 };
526
527 Ok(result)
528}
529
530enum SerializedPageReaderState {
531 Values {
532 offset: usize,
534
535 remaining_bytes: usize,
537
538 next_page_header: Option<Box<PageHeader>>,
540
541 page_ordinal: usize,
543
544 require_dictionary: bool,
546 },
547 Pages {
548 page_locations: VecDeque<PageLocation>,
550 dictionary_page: Option<PageLocation>,
552 total_rows: usize,
554 },
555}
556
557pub struct SerializedPageReader<R: ChunkReader> {
559 reader: Arc<R>,
561
562 decompressor: Option<Box<dyn Codec>>,
564
565 physical_type: Type,
567
568 state: SerializedPageReaderState,
569
570 #[cfg(feature = "encryption")]
572 crypto_context: Option<Arc<CryptoContext>>,
573}
574
575impl<R: ChunkReader> SerializedPageReader<R> {
576 pub fn new(
578 reader: Arc<R>,
579 column_chunk_metadata: &ColumnChunkMetaData,
580 total_rows: usize,
581 page_locations: Option<Vec<PageLocation>>,
582 ) -> Result<Self> {
583 let props = Arc::new(ReaderProperties::builder().build());
584 SerializedPageReader::new_with_properties(
585 reader,
586 column_chunk_metadata,
587 total_rows,
588 page_locations,
589 props,
590 )
591 }
592
593 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
595 pub(crate) fn add_crypto_context(
596 self,
597 _rg_idx: usize,
598 _column_idx: usize,
599 _parquet_meta_data: &ParquetMetaData,
600 _column_chunk_metadata: &ColumnChunkMetaData,
601 ) -> Result<SerializedPageReader<R>> {
602 Ok(self)
603 }
604
605 #[cfg(feature = "encryption")]
607 pub(crate) fn add_crypto_context(
608 mut self,
609 rg_idx: usize,
610 column_idx: usize,
611 parquet_meta_data: &ParquetMetaData,
612 column_chunk_metadata: &ColumnChunkMetaData,
613 ) -> Result<SerializedPageReader<R>> {
614 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
615 return Ok(self);
616 };
617 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
618 return Ok(self);
619 };
620 let crypto_context =
621 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
622 self.crypto_context = Some(Arc::new(crypto_context));
623 Ok(self)
624 }
625
626 pub fn new_with_properties(
628 reader: Arc<R>,
629 meta: &ColumnChunkMetaData,
630 total_rows: usize,
631 page_locations: Option<Vec<PageLocation>>,
632 props: ReaderPropertiesPtr,
633 ) -> Result<Self> {
634 let decompressor = create_codec(meta.compression(), props.codec_options())?;
635 let (start, len) = meta.byte_range();
636
637 let state = match page_locations {
638 Some(locations) => {
639 let dictionary_page = match locations.first() {
640 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
641 offset: start as i64,
642 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
643 first_row_index: 0,
644 }),
645 _ => None,
646 };
647
648 SerializedPageReaderState::Pages {
649 page_locations: locations.into(),
650 dictionary_page,
651 total_rows,
652 }
653 }
654 None => SerializedPageReaderState::Values {
655 offset: start as usize,
656 remaining_bytes: len as usize,
657 next_page_header: None,
658 page_ordinal: 0,
659 require_dictionary: meta.dictionary_page_offset().is_some(),
660 },
661 };
662 Ok(Self {
663 reader,
664 decompressor,
665 state,
666 physical_type: meta.column_type(),
667 #[cfg(feature = "encryption")]
668 crypto_context: None,
669 })
670 }
671
672 #[cfg(test)]
678 fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
679 match &mut self.state {
680 SerializedPageReaderState::Values {
681 offset,
682 remaining_bytes,
683 next_page_header,
684 ..
685 } => {
686 loop {
687 if *remaining_bytes == 0 {
688 return Ok(None);
689 }
690 return if let Some(header) = next_page_header.as_ref() {
691 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
692 Ok(Some(*offset))
693 } else {
694 *next_page_header = None;
696 continue;
697 }
698 } else {
699 let mut read = self.reader.get_read(*offset as u64)?;
700 let (header_len, header) = read_page_header_len(&mut read)?;
701 *offset += header_len;
702 *remaining_bytes -= header_len;
703 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
704 Ok(Some(*offset))
705 } else {
706 continue;
708 };
709 *next_page_header = Some(Box::new(header));
710 page_meta
711 };
712 }
713 }
714 SerializedPageReaderState::Pages {
715 page_locations,
716 dictionary_page,
717 ..
718 } => {
719 if let Some(page) = dictionary_page {
720 Ok(Some(page.offset as usize))
721 } else if let Some(page) = page_locations.front() {
722 Ok(Some(page.offset as usize))
723 } else {
724 Ok(None)
725 }
726 }
727 }
728 }
729}
730
731impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
732 type Item = Result<Page>;
733
734 fn next(&mut self) -> Option<Self::Item> {
735 self.get_next_page().transpose()
736 }
737}
738
739fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
740 if header_len > remaining_bytes {
741 return Err(eof_err!("Invalid page header"));
742 }
743 Ok(())
744}
745
746fn verify_page_size(
747 compressed_size: i32,
748 uncompressed_size: i32,
749 remaining_bytes: usize,
750) -> Result<()> {
751 if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
755 return Err(eof_err!("Invalid page header"));
756 }
757 Ok(())
758}
759
760impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
761 fn get_next_page(&mut self) -> Result<Option<Page>> {
762 loop {
763 let page = match &mut self.state {
764 SerializedPageReaderState::Values {
765 offset,
766 remaining_bytes: remaining,
767 next_page_header,
768 page_ordinal,
769 require_dictionary,
770 } => {
771 if *remaining == 0 {
772 return Ok(None);
773 }
774
775 let mut read = self.reader.get_read(*offset as u64)?;
776 let header = if let Some(header) = next_page_header.take() {
777 *header
778 } else {
779 #[cfg(feature = "encryption")]
780 let (header_len, header) = if self.crypto_context.is_some() {
781 let crypto_context = page_crypto_context(
782 &self.crypto_context,
783 *page_ordinal,
784 *require_dictionary,
785 )?;
786 read_encrypted_page_header_len(&mut read, crypto_context)?
787 } else {
788 read_page_header_len(&mut read)?
789 };
790
791 #[cfg(not(feature = "encryption"))]
792 let (header_len, header) = read_page_header_len(&mut read)?;
793
794 verify_page_header_len(header_len, *remaining)?;
795 *offset += header_len;
796 *remaining -= header_len;
797 header
798 };
799 verify_page_size(
800 header.compressed_page_size,
801 header.uncompressed_page_size,
802 *remaining,
803 )?;
804 let data_len = header.compressed_page_size as usize;
805 *offset += data_len;
806 *remaining -= data_len;
807
808 if header.type_ == PageType::INDEX_PAGE {
809 continue;
810 }
811
812 let mut buffer = Vec::with_capacity(data_len);
813 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
814
815 if read != data_len {
816 return Err(eof_err!(
817 "Expected to read {} bytes of page, read only {}",
818 data_len,
819 read
820 ));
821 }
822
823 #[cfg(feature = "encryption")]
824 let crypto_context = page_crypto_context(
825 &self.crypto_context,
826 *page_ordinal,
827 *require_dictionary,
828 )?;
829 #[cfg(feature = "encryption")]
830 let buffer: Vec<u8> = if let Some(crypto_context) = crypto_context {
831 let decryptor = crypto_context.data_decryptor();
832 let aad = crypto_context.create_page_aad()?;
833 decryptor.decrypt(buffer.as_ref(), &aad)?
834 } else {
835 buffer
836 };
837
838 let page = decode_page(
839 header,
840 Bytes::from(buffer),
841 self.physical_type,
842 self.decompressor.as_mut(),
843 )?;
844 if page.is_data_page() {
845 *page_ordinal += 1;
846 } else if page.is_dictionary_page() {
847 *require_dictionary = false;
848 }
849 page
850 }
851 SerializedPageReaderState::Pages {
852 page_locations,
853 dictionary_page,
854 ..
855 } => {
856 let front = match dictionary_page
857 .take()
858 .or_else(|| page_locations.pop_front())
859 {
860 Some(front) => front,
861 None => return Ok(None),
862 };
863
864 let page_len = front.compressed_page_size as usize;
865
866 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
867
868 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
869 let header = PageHeader::read_from_in_protocol(&mut prot)?;
870 let offset = buffer.len() - prot.as_slice().len();
871
872 let bytes = buffer.slice(offset..);
873 decode_page(
874 header,
875 bytes,
876 self.physical_type,
877 self.decompressor.as_mut(),
878 )?
879 }
880 };
881
882 return Ok(Some(page));
883 }
884 }
885
886 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
887 match &mut self.state {
888 SerializedPageReaderState::Values {
889 offset,
890 remaining_bytes,
891 next_page_header,
892 ..
893 } => {
894 loop {
895 if *remaining_bytes == 0 {
896 return Ok(None);
897 }
898 return if let Some(header) = next_page_header.as_ref() {
899 if let Ok(page_meta) = (&**header).try_into() {
900 Ok(Some(page_meta))
901 } else {
902 *next_page_header = None;
904 continue;
905 }
906 } else {
907 let mut read = self.reader.get_read(*offset as u64)?;
908 let (header_len, header) = read_page_header_len(&mut read)?;
909 verify_page_header_len(header_len, *remaining_bytes)?;
910 *offset += header_len;
911 *remaining_bytes -= header_len;
912 let page_meta = if let Ok(page_meta) = (&header).try_into() {
913 Ok(Some(page_meta))
914 } else {
915 continue;
917 };
918 *next_page_header = Some(Box::new(header));
919 page_meta
920 };
921 }
922 }
923 SerializedPageReaderState::Pages {
924 page_locations,
925 dictionary_page,
926 total_rows,
927 } => {
928 if dictionary_page.is_some() {
929 Ok(Some(PageMetadata {
930 num_rows: None,
931 num_levels: None,
932 is_dict: true,
933 }))
934 } else if let Some(page) = page_locations.front() {
935 let next_rows = page_locations
936 .get(1)
937 .map(|x| x.first_row_index as usize)
938 .unwrap_or(*total_rows);
939
940 Ok(Some(PageMetadata {
941 num_rows: Some(next_rows - page.first_row_index as usize),
942 num_levels: None,
943 is_dict: false,
944 }))
945 } else {
946 Ok(None)
947 }
948 }
949 }
950 }
951
952 fn skip_next_page(&mut self) -> Result<()> {
953 match &mut self.state {
954 SerializedPageReaderState::Values {
955 offset,
956 remaining_bytes,
957 next_page_header,
958 ..
959 } => {
960 if let Some(buffered_header) = next_page_header.take() {
961 verify_page_size(
962 buffered_header.compressed_page_size,
963 buffered_header.uncompressed_page_size,
964 *remaining_bytes,
965 )?;
966 *offset += buffered_header.compressed_page_size as usize;
968 *remaining_bytes -= buffered_header.compressed_page_size as usize;
969 } else {
970 let mut read = self.reader.get_read(*offset as u64)?;
971 let (header_len, header) = read_page_header_len(&mut read)?;
972 verify_page_header_len(header_len, *remaining_bytes)?;
973 verify_page_size(
974 header.compressed_page_size,
975 header.uncompressed_page_size,
976 *remaining_bytes,
977 )?;
978 let data_page_size = header.compressed_page_size as usize;
979 *offset += header_len + data_page_size;
980 *remaining_bytes -= header_len + data_page_size;
981 }
982 Ok(())
983 }
984 SerializedPageReaderState::Pages { page_locations, .. } => {
985 page_locations.pop_front();
986
987 Ok(())
988 }
989 }
990 }
991
992 fn at_record_boundary(&mut self) -> Result<bool> {
993 match &mut self.state {
994 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
995 SerializedPageReaderState::Pages { .. } => Ok(true),
996 }
997 }
998}
999
1000#[cfg(feature = "encryption")]
1001fn page_crypto_context(
1002 crypto_context: &Option<Arc<CryptoContext>>,
1003 page_ordinal: usize,
1004 dictionary_page: bool,
1005) -> Result<Option<Arc<CryptoContext>>> {
1006 Ok(crypto_context.as_ref().map(|c| {
1007 Arc::new(if dictionary_page {
1008 c.for_dictionary_page()
1009 } else {
1010 c.with_page_ordinal(page_ordinal)
1011 })
1012 }))
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017 use std::collections::HashSet;
1018
1019 use bytes::Buf;
1020
1021 use crate::file::properties::{EnabledStatistics, WriterProperties};
1022 use crate::format::BoundaryOrder;
1023
1024 use crate::basic::{self, ColumnOrder};
1025 use crate::column::reader::ColumnReader;
1026 use crate::data_type::private::ParquetValueType;
1027 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1028 use crate::file::page_index::index::{Index, NativeIndex};
1029 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1030 use crate::file::writer::SerializedFileWriter;
1031 use crate::record::RowAccessor;
1032 use crate::schema::parser::parse_message_type;
1033 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1034
1035 use super::*;
1036
1037 #[test]
1038 fn test_cursor_and_file_has_the_same_behaviour() {
1039 let mut buf: Vec<u8> = Vec::new();
1040 get_test_file("alltypes_plain.parquet")
1041 .read_to_end(&mut buf)
1042 .unwrap();
1043 let cursor = Bytes::from(buf);
1044 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1045
1046 let test_file = get_test_file("alltypes_plain.parquet");
1047 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1048
1049 let file_iter = read_from_file.get_row_iter(None).unwrap();
1050 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1051
1052 for (a, b) in file_iter.zip(cursor_iter) {
1053 assert_eq!(a.unwrap(), b.unwrap())
1054 }
1055 }
1056
1057 #[test]
1058 fn test_file_reader_try_from() {
1059 let test_file = get_test_file("alltypes_plain.parquet");
1061 let test_path_buf = get_test_path("alltypes_plain.parquet");
1062 let test_path = test_path_buf.as_path();
1063 let test_path_str = test_path.to_str().unwrap();
1064
1065 let reader = SerializedFileReader::try_from(test_file);
1066 assert!(reader.is_ok());
1067
1068 let reader = SerializedFileReader::try_from(test_path);
1069 assert!(reader.is_ok());
1070
1071 let reader = SerializedFileReader::try_from(test_path_str);
1072 assert!(reader.is_ok());
1073
1074 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1075 assert!(reader.is_ok());
1076
1077 let test_path = Path::new("invalid.parquet");
1079 let test_path_str = test_path.to_str().unwrap();
1080
1081 let reader = SerializedFileReader::try_from(test_path);
1082 assert!(reader.is_err());
1083
1084 let reader = SerializedFileReader::try_from(test_path_str);
1085 assert!(reader.is_err());
1086
1087 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1088 assert!(reader.is_err());
1089 }
1090
1091 #[test]
1092 fn test_file_reader_into_iter() {
1093 let path = get_test_path("alltypes_plain.parquet");
1094 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1095 let iter = reader.into_iter();
1096 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1097
1098 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1099 }
1100
1101 #[test]
1102 fn test_file_reader_into_iter_project() {
1103 let path = get_test_path("alltypes_plain.parquet");
1104 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1105 let schema = "message schema { OPTIONAL INT32 id; }";
1106 let proj = parse_message_type(schema).ok();
1107 let iter = reader.into_iter().project(proj).unwrap();
1108 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1109
1110 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1111 }
1112
1113 #[test]
1114 fn test_reuse_file_chunk() {
1115 let test_file = get_test_file("alltypes_plain.parquet");
1119 let reader = SerializedFileReader::new(test_file).unwrap();
1120 let row_group = reader.get_row_group(0).unwrap();
1121
1122 let mut page_readers = Vec::new();
1123 for i in 0..row_group.num_columns() {
1124 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1125 }
1126
1127 for mut page_reader in page_readers {
1130 assert!(page_reader.get_next_page().is_ok());
1131 }
1132 }
1133
1134 #[test]
1135 fn test_file_reader() {
1136 let test_file = get_test_file("alltypes_plain.parquet");
1137 let reader_result = SerializedFileReader::new(test_file);
1138 assert!(reader_result.is_ok());
1139 let reader = reader_result.unwrap();
1140
1141 let metadata = reader.metadata();
1143 assert_eq!(metadata.num_row_groups(), 1);
1144
1145 let file_metadata = metadata.file_metadata();
1147 assert!(file_metadata.created_by().is_some());
1148 assert_eq!(
1149 file_metadata.created_by().unwrap(),
1150 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1151 );
1152 assert!(file_metadata.key_value_metadata().is_none());
1153 assert_eq!(file_metadata.num_rows(), 8);
1154 assert_eq!(file_metadata.version(), 1);
1155 assert_eq!(file_metadata.column_orders(), None);
1156
1157 let row_group_metadata = metadata.row_group(0);
1159 assert_eq!(row_group_metadata.num_columns(), 11);
1160 assert_eq!(row_group_metadata.num_rows(), 8);
1161 assert_eq!(row_group_metadata.total_byte_size(), 671);
1162 for i in 0..row_group_metadata.num_columns() {
1164 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1165 }
1166
1167 let row_group_reader_result = reader.get_row_group(0);
1169 assert!(row_group_reader_result.is_ok());
1170 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1171 assert_eq!(
1172 row_group_reader.num_columns(),
1173 row_group_metadata.num_columns()
1174 );
1175 assert_eq!(
1176 row_group_reader.metadata().total_byte_size(),
1177 row_group_metadata.total_byte_size()
1178 );
1179
1180 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1183 assert!(page_reader_0_result.is_ok());
1184 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1185 let mut page_count = 0;
1186 while let Ok(Some(page)) = page_reader_0.get_next_page() {
1187 let is_expected_page = match page {
1188 Page::DictionaryPage {
1189 buf,
1190 num_values,
1191 encoding,
1192 is_sorted,
1193 } => {
1194 assert_eq!(buf.len(), 32);
1195 assert_eq!(num_values, 8);
1196 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1197 assert!(!is_sorted);
1198 true
1199 }
1200 Page::DataPage {
1201 buf,
1202 num_values,
1203 encoding,
1204 def_level_encoding,
1205 rep_level_encoding,
1206 statistics,
1207 } => {
1208 assert_eq!(buf.len(), 11);
1209 assert_eq!(num_values, 8);
1210 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1211 assert_eq!(def_level_encoding, Encoding::RLE);
1212 #[allow(deprecated)]
1213 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1214 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1215 assert!(statistics.is_none());
1216 true
1217 }
1218 _ => false,
1219 };
1220 assert!(is_expected_page);
1221 page_count += 1;
1222 }
1223 assert_eq!(page_count, 2);
1224 }
1225
1226 #[test]
1227 fn test_file_reader_datapage_v2() {
1228 let test_file = get_test_file("datapage_v2.snappy.parquet");
1229 let reader_result = SerializedFileReader::new(test_file);
1230 assert!(reader_result.is_ok());
1231 let reader = reader_result.unwrap();
1232
1233 let metadata = reader.metadata();
1235 assert_eq!(metadata.num_row_groups(), 1);
1236
1237 let file_metadata = metadata.file_metadata();
1239 assert!(file_metadata.created_by().is_some());
1240 assert_eq!(
1241 file_metadata.created_by().unwrap(),
1242 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1243 );
1244 assert!(file_metadata.key_value_metadata().is_some());
1245 assert_eq!(
1246 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1247 1
1248 );
1249
1250 assert_eq!(file_metadata.num_rows(), 5);
1251 assert_eq!(file_metadata.version(), 1);
1252 assert_eq!(file_metadata.column_orders(), None);
1253
1254 let row_group_metadata = metadata.row_group(0);
1255
1256 for i in 0..row_group_metadata.num_columns() {
1258 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1259 }
1260
1261 let row_group_reader_result = reader.get_row_group(0);
1263 assert!(row_group_reader_result.is_ok());
1264 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1265 assert_eq!(
1266 row_group_reader.num_columns(),
1267 row_group_metadata.num_columns()
1268 );
1269 assert_eq!(
1270 row_group_reader.metadata().total_byte_size(),
1271 row_group_metadata.total_byte_size()
1272 );
1273
1274 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1277 assert!(page_reader_0_result.is_ok());
1278 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1279 let mut page_count = 0;
1280 while let Ok(Some(page)) = page_reader_0.get_next_page() {
1281 let is_expected_page = match page {
1282 Page::DictionaryPage {
1283 buf,
1284 num_values,
1285 encoding,
1286 is_sorted,
1287 } => {
1288 assert_eq!(buf.len(), 7);
1289 assert_eq!(num_values, 1);
1290 assert_eq!(encoding, Encoding::PLAIN);
1291 assert!(!is_sorted);
1292 true
1293 }
1294 Page::DataPageV2 {
1295 buf,
1296 num_values,
1297 encoding,
1298 num_nulls,
1299 num_rows,
1300 def_levels_byte_len,
1301 rep_levels_byte_len,
1302 is_compressed,
1303 statistics,
1304 } => {
1305 assert_eq!(buf.len(), 4);
1306 assert_eq!(num_values, 5);
1307 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1308 assert_eq!(num_nulls, 1);
1309 assert_eq!(num_rows, 5);
1310 assert_eq!(def_levels_byte_len, 2);
1311 assert_eq!(rep_levels_byte_len, 0);
1312 assert!(is_compressed);
1313 assert!(statistics.is_some());
1314 true
1315 }
1316 _ => false,
1317 };
1318 assert!(is_expected_page);
1319 page_count += 1;
1320 }
1321 assert_eq!(page_count, 2);
1322 }
1323
1324 fn get_serialized_page_reader<R: ChunkReader>(
1325 file_reader: &SerializedFileReader<R>,
1326 row_group: usize,
1327 column: usize,
1328 ) -> Result<SerializedPageReader<R>> {
1329 let row_group = {
1330 let row_group_metadata = file_reader.metadata.row_group(row_group);
1331 let props = Arc::clone(&file_reader.props);
1332 let f = Arc::clone(&file_reader.chunk_reader);
1333 SerializedRowGroupReader::new(
1334 f,
1335 row_group_metadata,
1336 file_reader
1337 .metadata
1338 .offset_index()
1339 .map(|x| x[row_group].as_slice()),
1340 props,
1341 )?
1342 };
1343
1344 let col = row_group.metadata.column(column);
1345
1346 let page_locations = row_group
1347 .offset_index
1348 .map(|x| x[column].page_locations.clone());
1349
1350 let props = Arc::clone(&row_group.props);
1351 SerializedPageReader::new_with_properties(
1352 Arc::clone(&row_group.chunk_reader),
1353 col,
1354 row_group.metadata.num_rows() as usize,
1355 page_locations,
1356 props,
1357 )
1358 }
1359
1360 #[test]
1361 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1362 let test_file = get_test_file("alltypes_plain.parquet");
1363 let reader = SerializedFileReader::new(test_file)?;
1364
1365 let mut offset_set = HashSet::new();
1366 let num_row_groups = reader.metadata.num_row_groups();
1367 for row_group in 0..num_row_groups {
1368 let num_columns = reader.metadata.row_group(row_group).num_columns();
1369 for column in 0..num_columns {
1370 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1371
1372 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1373 match &page_reader.state {
1374 SerializedPageReaderState::Pages {
1375 page_locations,
1376 dictionary_page,
1377 ..
1378 } => {
1379 if let Some(page) = dictionary_page {
1380 assert_eq!(page.offset as usize, page_offset);
1381 } else if let Some(page) = page_locations.front() {
1382 assert_eq!(page.offset as usize, page_offset);
1383 } else {
1384 unreachable!()
1385 }
1386 }
1387 SerializedPageReaderState::Values {
1388 offset,
1389 next_page_header,
1390 ..
1391 } => {
1392 assert!(next_page_header.is_some());
1393 assert_eq!(*offset, page_offset);
1394 }
1395 }
1396 let page = page_reader.get_next_page()?;
1397 assert!(page.is_some());
1398 let newly_inserted = offset_set.insert(page_offset);
1399 assert!(newly_inserted);
1400 }
1401 }
1402 }
1403
1404 Ok(())
1405 }
1406
1407 #[test]
1408 fn test_page_iterator() {
1409 let file = get_test_file("alltypes_plain.parquet");
1410 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1411
1412 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1413
1414 let page = page_iterator.next();
1416 assert!(page.is_some());
1417 assert!(page.unwrap().is_ok());
1418
1419 let page = page_iterator.next();
1421 assert!(page.is_none());
1422
1423 let row_group_indices = Box::new(0..1);
1424 let mut page_iterator =
1425 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1426
1427 let page = page_iterator.next();
1429 assert!(page.is_some());
1430 assert!(page.unwrap().is_ok());
1431
1432 let page = page_iterator.next();
1434 assert!(page.is_none());
1435 }
1436
1437 #[test]
1438 fn test_file_reader_key_value_metadata() {
1439 let file = get_test_file("binary.parquet");
1440 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1441
1442 let metadata = file_reader
1443 .metadata
1444 .file_metadata()
1445 .key_value_metadata()
1446 .unwrap();
1447
1448 assert_eq!(metadata.len(), 3);
1449
1450 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1451
1452 assert_eq!(metadata[1].key, "writer.model.name");
1453 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1454
1455 assert_eq!(metadata[2].key, "parquet.proto.class");
1456 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1457 }
1458
1459 #[test]
1460 fn test_file_reader_optional_metadata() {
1461 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1463 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1464
1465 let row_group_metadata = file_reader.metadata.row_group(0);
1466 let col0_metadata = row_group_metadata.column(0);
1467
1468 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1470
1471 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1473
1474 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1475 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1476 assert_eq!(page_encoding_stats.count, 1);
1477
1478 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1480 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1481
1482 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1484 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1485 }
1486
1487 #[test]
1488 fn test_file_reader_with_no_filter() -> Result<()> {
1489 let test_file = get_test_file("alltypes_plain.parquet");
1490 let origin_reader = SerializedFileReader::new(test_file)?;
1491 let metadata = origin_reader.metadata();
1493 assert_eq!(metadata.num_row_groups(), 1);
1494 Ok(())
1495 }
1496
1497 #[test]
1498 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1499 let test_file = get_test_file("alltypes_plain.parquet");
1500 let read_options = ReadOptionsBuilder::new()
1501 .with_predicate(Box::new(|_, _| false))
1502 .build();
1503 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1504 let metadata = reader.metadata();
1505 assert_eq!(metadata.num_row_groups(), 0);
1506 Ok(())
1507 }
1508
1509 #[test]
1510 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1511 let test_file = get_test_file("alltypes_plain.parquet");
1512 let origin_reader = SerializedFileReader::new(test_file)?;
1513 let metadata = origin_reader.metadata();
1515 assert_eq!(metadata.num_row_groups(), 1);
1516 let mid = get_midpoint_offset(metadata.row_group(0));
1517
1518 let test_file = get_test_file("alltypes_plain.parquet");
1519 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1520 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1521 let metadata = reader.metadata();
1522 assert_eq!(metadata.num_row_groups(), 1);
1523
1524 let test_file = get_test_file("alltypes_plain.parquet");
1525 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1526 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1527 let metadata = reader.metadata();
1528 assert_eq!(metadata.num_row_groups(), 0);
1529 Ok(())
1530 }
1531
1532 #[test]
1533 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1534 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1535 let origin_reader = SerializedFileReader::new(test_file)?;
1536 let metadata = origin_reader.metadata();
1537 let mid = get_midpoint_offset(metadata.row_group(0));
1538
1539 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1541 let read_options = ReadOptionsBuilder::new()
1542 .with_page_index()
1543 .with_predicate(Box::new(|_, _| true))
1544 .with_range(mid, mid + 1)
1545 .build();
1546 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1547 let metadata = reader.metadata();
1548 assert_eq!(metadata.num_row_groups(), 1);
1549 assert_eq!(metadata.column_index().unwrap().len(), 1);
1550 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1551
1552 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1554 let read_options = ReadOptionsBuilder::new()
1555 .with_page_index()
1556 .with_predicate(Box::new(|_, _| true))
1557 .with_range(0, mid)
1558 .build();
1559 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1560 let metadata = reader.metadata();
1561 assert_eq!(metadata.num_row_groups(), 0);
1562 assert!(metadata.column_index().is_none());
1563 assert!(metadata.offset_index().is_none());
1564
1565 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1567 let read_options = ReadOptionsBuilder::new()
1568 .with_page_index()
1569 .with_predicate(Box::new(|_, _| false))
1570 .with_range(mid, mid + 1)
1571 .build();
1572 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1573 let metadata = reader.metadata();
1574 assert_eq!(metadata.num_row_groups(), 0);
1575 assert!(metadata.column_index().is_none());
1576 assert!(metadata.offset_index().is_none());
1577
1578 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1580 let read_options = ReadOptionsBuilder::new()
1581 .with_page_index()
1582 .with_predicate(Box::new(|_, _| false))
1583 .with_range(0, mid)
1584 .build();
1585 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1586 let metadata = reader.metadata();
1587 assert_eq!(metadata.num_row_groups(), 0);
1588 assert!(metadata.column_index().is_none());
1589 assert!(metadata.offset_index().is_none());
1590 Ok(())
1591 }
1592
1593 #[test]
1594 fn test_file_reader_invalid_metadata() {
1595 let data = [
1596 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1597 80, 65, 82, 49,
1598 ];
1599 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1600 assert_eq!(
1601 ret.err().unwrap().to_string(),
1602 "Parquet error: Could not parse metadata: bad data"
1603 );
1604 }
1605
1606 #[test]
1607 fn test_page_index_reader() {
1624 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1625 let builder = ReadOptionsBuilder::new();
1626 let options = builder.with_page_index().build();
1628 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1629 let reader = reader_result.unwrap();
1630
1631 let metadata = reader.metadata();
1633 assert_eq!(metadata.num_row_groups(), 1);
1634
1635 let column_index = metadata.column_index().unwrap();
1636
1637 assert_eq!(column_index.len(), 1);
1639 let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1640 index
1641 } else {
1642 unreachable!()
1643 };
1644
1645 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1646 let index_in_pages = &index.indexes;
1647
1648 assert_eq!(index_in_pages.len(), 1);
1650
1651 let page0 = &index_in_pages[0];
1652 let min = page0.min.as_ref().unwrap();
1653 let max = page0.max.as_ref().unwrap();
1654 assert_eq!(b"Hello", min.as_bytes());
1655 assert_eq!(b"today", max.as_bytes());
1656
1657 let offset_indexes = metadata.offset_index().unwrap();
1658 assert_eq!(offset_indexes.len(), 1);
1660 let offset_index = &offset_indexes[0];
1661 let page_offset = &offset_index[0].page_locations()[0];
1662
1663 assert_eq!(4, page_offset.offset);
1664 assert_eq!(152, page_offset.compressed_page_size);
1665 assert_eq!(0, page_offset.first_row_index);
1666 }
1667
1668 #[test]
1669 fn test_page_index_reader_out_of_order() {
1670 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1671 let options = ReadOptionsBuilder::new().with_page_index().build();
1672 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1673 let metadata = reader.metadata();
1674
1675 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1676 let columns = metadata.row_group(0).columns();
1677 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1678
1679 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1680 let mut b = read_columns_indexes(&test_file, &reversed)
1681 .unwrap()
1682 .unwrap();
1683 b.reverse();
1684 assert_eq!(a, b);
1685
1686 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1687 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1688 b.reverse();
1689 assert_eq!(a, b);
1690 }
1691
1692 #[test]
1693 fn test_page_index_reader_all_type() {
1694 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1695 let builder = ReadOptionsBuilder::new();
1696 let options = builder.with_page_index().build();
1698 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1699 let reader = reader_result.unwrap();
1700
1701 let metadata = reader.metadata();
1703 assert_eq!(metadata.num_row_groups(), 1);
1704
1705 let column_index = metadata.column_index().unwrap();
1706 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1707
1708 assert_eq!(column_index.len(), 1);
1710 let row_group_metadata = metadata.row_group(0);
1711
1712 assert!(!&column_index[0][0].is_sorted());
1714 let boundary_order = &column_index[0][0].get_boundary_order();
1715 assert!(boundary_order.is_some());
1716 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1717 if let Index::INT32(index) = &column_index[0][0] {
1718 check_native_page_index(
1719 index,
1720 325,
1721 get_row_group_min_max_bytes(row_group_metadata, 0),
1722 BoundaryOrder::UNORDERED,
1723 );
1724 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1725 } else {
1726 unreachable!()
1727 };
1728 assert!(&column_index[0][1].is_sorted());
1730 if let Index::BOOLEAN(index) = &column_index[0][1] {
1731 assert_eq!(index.indexes.len(), 82);
1732 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1733 } else {
1734 unreachable!()
1735 };
1736 assert!(&column_index[0][2].is_sorted());
1738 if let Index::INT32(index) = &column_index[0][2] {
1739 check_native_page_index(
1740 index,
1741 325,
1742 get_row_group_min_max_bytes(row_group_metadata, 2),
1743 BoundaryOrder::ASCENDING,
1744 );
1745 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1746 } else {
1747 unreachable!()
1748 };
1749 assert!(&column_index[0][3].is_sorted());
1751 if let Index::INT32(index) = &column_index[0][3] {
1752 check_native_page_index(
1753 index,
1754 325,
1755 get_row_group_min_max_bytes(row_group_metadata, 3),
1756 BoundaryOrder::ASCENDING,
1757 );
1758 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1759 } else {
1760 unreachable!()
1761 };
1762 assert!(&column_index[0][4].is_sorted());
1764 if let Index::INT32(index) = &column_index[0][4] {
1765 check_native_page_index(
1766 index,
1767 325,
1768 get_row_group_min_max_bytes(row_group_metadata, 4),
1769 BoundaryOrder::ASCENDING,
1770 );
1771 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1772 } else {
1773 unreachable!()
1774 };
1775 assert!(!&column_index[0][5].is_sorted());
1777 if let Index::INT64(index) = &column_index[0][5] {
1778 check_native_page_index(
1779 index,
1780 528,
1781 get_row_group_min_max_bytes(row_group_metadata, 5),
1782 BoundaryOrder::UNORDERED,
1783 );
1784 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
1785 } else {
1786 unreachable!()
1787 };
1788 assert!(&column_index[0][6].is_sorted());
1790 if let Index::FLOAT(index) = &column_index[0][6] {
1791 check_native_page_index(
1792 index,
1793 325,
1794 get_row_group_min_max_bytes(row_group_metadata, 6),
1795 BoundaryOrder::ASCENDING,
1796 );
1797 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
1798 } else {
1799 unreachable!()
1800 };
1801 assert!(!&column_index[0][7].is_sorted());
1803 if let Index::DOUBLE(index) = &column_index[0][7] {
1804 check_native_page_index(
1805 index,
1806 528,
1807 get_row_group_min_max_bytes(row_group_metadata, 7),
1808 BoundaryOrder::UNORDERED,
1809 );
1810 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
1811 } else {
1812 unreachable!()
1813 };
1814 assert!(!&column_index[0][8].is_sorted());
1816 if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
1817 check_native_page_index(
1818 index,
1819 974,
1820 get_row_group_min_max_bytes(row_group_metadata, 8),
1821 BoundaryOrder::UNORDERED,
1822 );
1823 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
1824 } else {
1825 unreachable!()
1826 };
1827 assert!(&column_index[0][9].is_sorted());
1829 if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
1830 check_native_page_index(
1831 index,
1832 352,
1833 get_row_group_min_max_bytes(row_group_metadata, 9),
1834 BoundaryOrder::ASCENDING,
1835 );
1836 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
1837 } else {
1838 unreachable!()
1839 };
1840 assert!(!&column_index[0][10].is_sorted());
1843 if let Index::NONE = &column_index[0][10] {
1844 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
1845 } else {
1846 unreachable!()
1847 };
1848 assert!(&column_index[0][11].is_sorted());
1850 if let Index::INT32(index) = &column_index[0][11] {
1851 check_native_page_index(
1852 index,
1853 325,
1854 get_row_group_min_max_bytes(row_group_metadata, 11),
1855 BoundaryOrder::ASCENDING,
1856 );
1857 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
1858 } else {
1859 unreachable!()
1860 };
1861 assert!(!&column_index[0][12].is_sorted());
1863 if let Index::INT32(index) = &column_index[0][12] {
1864 check_native_page_index(
1865 index,
1866 325,
1867 get_row_group_min_max_bytes(row_group_metadata, 12),
1868 BoundaryOrder::UNORDERED,
1869 );
1870 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
1871 } else {
1872 unreachable!()
1873 };
1874 }
1875
1876 fn check_native_page_index<T: ParquetValueType>(
1877 row_group_index: &NativeIndex<T>,
1878 page_size: usize,
1879 min_max: (&[u8], &[u8]),
1880 boundary_order: BoundaryOrder,
1881 ) {
1882 assert_eq!(row_group_index.indexes.len(), page_size);
1883 assert_eq!(row_group_index.boundary_order, boundary_order);
1884 row_group_index.indexes.iter().all(|x| {
1885 x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
1886 && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
1887 });
1888 }
1889
1890 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
1891 let statistics = r.column(col_num).statistics().unwrap();
1892 (
1893 statistics.min_bytes_opt().unwrap_or_default(),
1894 statistics.max_bytes_opt().unwrap_or_default(),
1895 )
1896 }
1897
1898 #[test]
1899 fn test_skip_page_with_offset_index() {
1900 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1901 let builder = ReadOptionsBuilder::new();
1902 let options = builder.with_page_index().build();
1904 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1905 let reader = reader_result.unwrap();
1906
1907 let row_group_reader = reader.get_row_group(0).unwrap();
1908
1909 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1911
1912 let mut vec = vec![];
1913
1914 for i in 0..325 {
1915 if i % 2 == 0 {
1916 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1917 } else {
1918 column_page_reader.skip_next_page().unwrap();
1919 }
1920 }
1921 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1923 assert!(column_page_reader.get_next_page().unwrap().is_none());
1924
1925 assert_eq!(vec.len(), 163);
1926 }
1927
1928 #[test]
1929 fn test_skip_page_without_offset_index() {
1930 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1931
1932 let reader_result = SerializedFileReader::new(test_file);
1934 let reader = reader_result.unwrap();
1935
1936 let row_group_reader = reader.get_row_group(0).unwrap();
1937
1938 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1940
1941 let mut vec = vec![];
1942
1943 for i in 0..325 {
1944 if i % 2 == 0 {
1945 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1946 } else {
1947 column_page_reader.peek_next_page().unwrap().unwrap();
1948 column_page_reader.skip_next_page().unwrap();
1949 }
1950 }
1951 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1953 assert!(column_page_reader.get_next_page().unwrap().is_none());
1954
1955 assert_eq!(vec.len(), 163);
1956 }
1957
1958 #[test]
1959 fn test_peek_page_with_dictionary_page() {
1960 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1961 let builder = ReadOptionsBuilder::new();
1962 let options = builder.with_page_index().build();
1964 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1965 let reader = reader_result.unwrap();
1966 let row_group_reader = reader.get_row_group(0).unwrap();
1967
1968 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1970
1971 let mut vec = vec![];
1972
1973 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1974 assert!(meta.is_dict);
1975 let page = column_page_reader.get_next_page().unwrap().unwrap();
1976 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1977
1978 for i in 0..352 {
1979 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1980 if i != 351 {
1983 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
1984 } else {
1985 assert_eq!(meta.num_rows, Some(10));
1988 }
1989 assert!(!meta.is_dict);
1990 vec.push(meta);
1991 let page = column_page_reader.get_next_page().unwrap().unwrap();
1992 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1993 }
1994
1995 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1997 assert!(column_page_reader.get_next_page().unwrap().is_none());
1998
1999 assert_eq!(vec.len(), 352);
2000 }
2001
2002 #[test]
2003 fn test_peek_page_with_dictionary_page_without_offset_index() {
2004 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2005
2006 let reader_result = SerializedFileReader::new(test_file);
2007 let reader = reader_result.unwrap();
2008 let row_group_reader = reader.get_row_group(0).unwrap();
2009
2010 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2012
2013 let mut vec = vec![];
2014
2015 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2016 assert!(meta.is_dict);
2017 let page = column_page_reader.get_next_page().unwrap().unwrap();
2018 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2019
2020 for i in 0..352 {
2021 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2022 if i != 351 {
2025 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2026 } else {
2027 assert_eq!(meta.num_levels, Some(10));
2030 }
2031 assert!(!meta.is_dict);
2032 vec.push(meta);
2033 let page = column_page_reader.get_next_page().unwrap().unwrap();
2034 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2035 }
2036
2037 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2039 assert!(column_page_reader.get_next_page().unwrap().is_none());
2040
2041 assert_eq!(vec.len(), 352);
2042 }
2043
2044 #[test]
2045 fn test_fixed_length_index() {
2046 let message_type = "
2047 message test_schema {
2048 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2049 }
2050 ";
2051
2052 let schema = parse_message_type(message_type).unwrap();
2053 let mut out = Vec::with_capacity(1024);
2054 let mut writer =
2055 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2056
2057 let mut r = writer.next_row_group().unwrap();
2058 let mut c = r.next_column().unwrap().unwrap();
2059 c.typed::<FixedLenByteArrayType>()
2060 .write_batch(
2061 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2062 Some(&[1, 1, 0, 1]),
2063 None,
2064 )
2065 .unwrap();
2066 c.close().unwrap();
2067 r.close().unwrap();
2068 writer.close().unwrap();
2069
2070 let b = Bytes::from(out);
2071 let options = ReadOptionsBuilder::new().with_page_index().build();
2072 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2073 let index = reader.metadata().column_index().unwrap();
2074
2075 assert_eq!(index.len(), 1);
2077 let c = &index[0];
2078 assert_eq!(c.len(), 1);
2080
2081 match &c[0] {
2082 Index::FIXED_LEN_BYTE_ARRAY(v) => {
2083 assert_eq!(v.indexes.len(), 1);
2084 let page_idx = &v.indexes[0];
2085 assert_eq!(page_idx.null_count.unwrap(), 1);
2086 assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2087 assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2088 }
2089 _ => unreachable!(),
2090 }
2091 }
2092
2093 #[test]
2094 fn test_multi_gz() {
2095 let file = get_test_file("concatenated_gzip_members.parquet");
2096 let reader = SerializedFileReader::new(file).unwrap();
2097 let row_group_reader = reader.get_row_group(0).unwrap();
2098 match row_group_reader.get_column_reader(0).unwrap() {
2099 ColumnReader::Int64ColumnReader(mut reader) => {
2100 let mut buffer = Vec::with_capacity(1024);
2101 let mut def_levels = Vec::with_capacity(1024);
2102 let (num_records, num_values, num_levels) = reader
2103 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2104 .unwrap();
2105
2106 assert_eq!(num_records, 513);
2107 assert_eq!(num_values, 513);
2108 assert_eq!(num_levels, 513);
2109
2110 let expected: Vec<i64> = (1..514).collect();
2111 assert_eq!(&buffer, &expected);
2112 }
2113 _ => unreachable!(),
2114 }
2115 }
2116
2117 #[test]
2118 fn test_byte_stream_split_extended() {
2119 let path = format!(
2120 "{}/byte_stream_split_extended.gzip.parquet",
2121 arrow::util::test_util::parquet_test_data(),
2122 );
2123 let file = File::open(path).unwrap();
2124 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2125
2126 let mut iter = reader
2128 .get_row_iter(None)
2129 .expect("Failed to create row iterator");
2130
2131 let mut start = 0;
2132 let end = reader.metadata().file_metadata().num_rows();
2133
2134 let check_row = |row: Result<Row, ParquetError>| {
2135 assert!(row.is_ok());
2136 let r = row.unwrap();
2137 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2138 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2139 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2140 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2141 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2142 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2143 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2144 };
2145
2146 while start < end {
2147 match iter.next() {
2148 Some(row) => check_row(row),
2149 None => break,
2150 };
2151 start += 1;
2152 }
2153 }
2154
2155 #[test]
2156 fn test_filtered_rowgroup_metadata() {
2157 let message_type = "
2158 message test_schema {
2159 REQUIRED INT32 a;
2160 }
2161 ";
2162 let schema = Arc::new(parse_message_type(message_type).unwrap());
2163 let props = Arc::new(
2164 WriterProperties::builder()
2165 .set_statistics_enabled(EnabledStatistics::Page)
2166 .build(),
2167 );
2168 let mut file: File = tempfile::tempfile().unwrap();
2169 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2170 let data = [1, 2, 3, 4, 5];
2171
2172 for idx in 0..5 {
2174 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2175 let mut row_group_writer = file_writer.next_row_group().unwrap();
2176 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2177 writer
2178 .typed::<Int32Type>()
2179 .write_batch(data_i.as_slice(), None, None)
2180 .unwrap();
2181 writer.close().unwrap();
2182 }
2183 row_group_writer.close().unwrap();
2184 file_writer.flushed_row_groups();
2185 }
2186 let file_metadata = file_writer.close().unwrap();
2187
2188 assert_eq!(file_metadata.num_rows, 25);
2189 assert_eq!(file_metadata.row_groups.len(), 5);
2190
2191 let read_options = ReadOptionsBuilder::new()
2193 .with_page_index()
2194 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2195 .build();
2196 let reader =
2197 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2198 .unwrap();
2199 let metadata = reader.metadata();
2200
2201 assert_eq!(metadata.num_row_groups(), 1);
2203 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2204
2205 assert!(metadata.column_index().is_some());
2207 assert!(metadata.offset_index().is_some());
2208 assert_eq!(metadata.column_index().unwrap().len(), 1);
2209 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2210 let col_idx = metadata.column_index().unwrap();
2211 let off_idx = metadata.offset_index().unwrap();
2212 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2213 let pg_idx = &col_idx[0][0];
2214 let off_idx_i = &off_idx[0][0];
2215
2216 match pg_idx {
2218 Index::INT32(int_idx) => {
2219 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2220 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2221 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2222 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2223 }
2224 _ => panic!("wrong stats type"),
2225 }
2226
2227 assert_eq!(
2229 off_idx_i.page_locations[0].offset,
2230 metadata.row_group(0).column(0).data_page_offset()
2231 );
2232
2233 let read_options = ReadOptionsBuilder::new()
2235 .with_page_index()
2236 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2237 .build();
2238 let reader =
2239 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2240 .unwrap();
2241 let metadata = reader.metadata();
2242
2243 assert_eq!(metadata.num_row_groups(), 2);
2245 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2246 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2247
2248 assert!(metadata.column_index().is_some());
2250 assert!(metadata.offset_index().is_some());
2251 assert_eq!(metadata.column_index().unwrap().len(), 2);
2252 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2253 let col_idx = metadata.column_index().unwrap();
2254 let off_idx = metadata.offset_index().unwrap();
2255
2256 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2257 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2258 let pg_idx = &col_idx_i[0];
2259 let off_idx_i = &off_idx[i][0];
2260
2261 match pg_idx {
2263 Index::INT32(int_idx) => {
2264 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2265 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2266 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2267 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2268 }
2269 _ => panic!("wrong stats type"),
2270 }
2271
2272 assert_eq!(
2274 off_idx_i.page_locations[0].offset,
2275 metadata.row_group(i).column(0).data_page_offset()
2276 );
2277 }
2278 }
2279}