1use std::collections::VecDeque;
22use std::iter;
23use std::{fs::File, io::Read, path::Path, sync::Arc};
24
25use crate::basic::{Encoding, Type};
26use crate::bloom_filter::Sbbf;
27use crate::column::page::{Page, PageMetadata, PageReader};
28use crate::compression::{create_codec, Codec};
29use crate::errors::{ParquetError, Result};
30use crate::file::page_index::offset_index::OffsetIndexMetaData;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35 statistics,
36};
37use crate::format::{PageHeader, PageLocation, PageType};
38use crate::record::reader::RowIter;
39use crate::record::Row;
40use crate::schema::types::Type as SchemaType;
41use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
42use bytes::Bytes;
43use thrift::protocol::TCompactInputProtocol;
44
45impl TryFrom<File> for SerializedFileReader<File> {
46 type Error = ParquetError;
47
48 fn try_from(file: File) -> Result<Self> {
49 Self::new(file)
50 }
51}
52
53impl TryFrom<&Path> for SerializedFileReader<File> {
54 type Error = ParquetError;
55
56 fn try_from(path: &Path) -> Result<Self> {
57 let file = File::open(path)?;
58 Self::try_from(file)
59 }
60}
61
62impl TryFrom<String> for SerializedFileReader<File> {
63 type Error = ParquetError;
64
65 fn try_from(path: String) -> Result<Self> {
66 Self::try_from(Path::new(&path))
67 }
68}
69
70impl TryFrom<&str> for SerializedFileReader<File> {
71 type Error = ParquetError;
72
73 fn try_from(path: &str) -> Result<Self> {
74 Self::try_from(Path::new(&path))
75 }
76}
77
78impl IntoIterator for SerializedFileReader<File> {
81 type Item = Result<Row>;
82 type IntoIter = RowIter<'static>;
83
84 fn into_iter(self) -> Self::IntoIter {
85 RowIter::from_file_into(Box::new(self))
86 }
87}
88
89pub struct SerializedFileReader<R: ChunkReader> {
94 chunk_reader: Arc<R>,
95 metadata: Arc<ParquetMetaData>,
96 props: ReaderPropertiesPtr,
97}
98
99pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
103
104#[derive(Default)]
108pub struct ReadOptionsBuilder {
109 predicates: Vec<ReadGroupPredicate>,
110 enable_page_index: bool,
111 props: Option<ReaderProperties>,
112}
113
114impl ReadOptionsBuilder {
115 pub fn new() -> Self {
117 Self::default()
118 }
119
120 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
123 self.predicates.push(predicate);
124 self
125 }
126
127 pub fn with_range(mut self, start: i64, end: i64) -> Self {
130 assert!(start < end);
131 let predicate = move |rg: &RowGroupMetaData, _: usize| {
132 let mid = get_midpoint_offset(rg);
133 mid >= start && mid < end
134 };
135 self.predicates.push(Box::new(predicate));
136 self
137 }
138
139 pub fn with_page_index(mut self) -> Self {
144 self.enable_page_index = true;
145 self
146 }
147
148 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
150 self.props = Some(properties);
151 self
152 }
153
154 pub fn build(self) -> ReadOptions {
156 let props = self
157 .props
158 .unwrap_or_else(|| ReaderProperties::builder().build());
159 ReadOptions {
160 predicates: self.predicates,
161 enable_page_index: self.enable_page_index,
162 props,
163 }
164 }
165}
166
167pub struct ReadOptions {
172 predicates: Vec<ReadGroupPredicate>,
173 enable_page_index: bool,
174 props: ReaderProperties,
175}
176
177impl<R: 'static + ChunkReader> SerializedFileReader<R> {
178 pub fn new(chunk_reader: R) -> Result<Self> {
181 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
182 let props = Arc::new(ReaderProperties::builder().build());
183 Ok(Self {
184 chunk_reader: Arc::new(chunk_reader),
185 metadata: Arc::new(metadata),
186 props,
187 })
188 }
189
190 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
193 let mut metadata_builder = ParquetMetaDataReader::new()
194 .parse_and_finish(&chunk_reader)?
195 .into_builder();
196 let mut predicates = options.predicates;
197
198 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
200 let mut keep = true;
201 for predicate in &mut predicates {
202 if !predicate(&rg_meta, i) {
203 keep = false;
204 break;
205 }
206 }
207 if keep {
208 metadata_builder = metadata_builder.add_row_group(rg_meta);
209 }
210 }
211
212 let mut metadata = metadata_builder.build();
213
214 if options.enable_page_index {
216 let mut reader =
217 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
218 reader.read_page_indexes(&chunk_reader)?;
219 metadata = reader.finish()?;
220 }
221
222 Ok(Self {
223 chunk_reader: Arc::new(chunk_reader),
224 metadata: Arc::new(metadata),
225 props: Arc::new(options.props),
226 })
227 }
228}
229
230fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
232 let col = meta.column(0);
233 let mut offset = col.data_page_offset();
234 if let Some(dic_offset) = col.dictionary_page_offset() {
235 if offset > dic_offset {
236 offset = dic_offset
237 }
238 };
239 offset + meta.compressed_size() / 2
240}
241
242impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
243 fn metadata(&self) -> &ParquetMetaData {
244 &self.metadata
245 }
246
247 fn num_row_groups(&self) -> usize {
248 self.metadata.num_row_groups()
249 }
250
251 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
252 let row_group_metadata = self.metadata.row_group(i);
253 let props = Arc::clone(&self.props);
255 let f = Arc::clone(&self.chunk_reader);
256 Ok(Box::new(SerializedRowGroupReader::new(
257 f,
258 row_group_metadata,
259 self.metadata.offset_index().map(|x| x[i].as_slice()),
260 props,
261 )?))
262 }
263
264 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
265 RowIter::from_file(projection, self)
266 }
267}
268
269pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
271 chunk_reader: Arc<R>,
272 metadata: &'a RowGroupMetaData,
273 offset_index: Option<&'a [OffsetIndexMetaData]>,
274 props: ReaderPropertiesPtr,
275 bloom_filters: Vec<Option<Sbbf>>,
276}
277
278impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
279 pub fn new(
281 chunk_reader: Arc<R>,
282 metadata: &'a RowGroupMetaData,
283 offset_index: Option<&'a [OffsetIndexMetaData]>,
284 props: ReaderPropertiesPtr,
285 ) -> Result<Self> {
286 let bloom_filters = if props.read_bloom_filter() {
287 metadata
288 .columns()
289 .iter()
290 .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone()))
291 .collect::<Result<Vec<_>>>()?
292 } else {
293 iter::repeat(None).take(metadata.columns().len()).collect()
294 };
295 Ok(Self {
296 chunk_reader,
297 metadata,
298 offset_index,
299 props,
300 bloom_filters,
301 })
302 }
303}
304
305impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
306 fn metadata(&self) -> &RowGroupMetaData {
307 self.metadata
308 }
309
310 fn num_columns(&self) -> usize {
311 self.metadata.num_columns()
312 }
313
314 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
316 let col = self.metadata.column(i);
317
318 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
319
320 let props = Arc::clone(&self.props);
321 Ok(Box::new(SerializedPageReader::new_with_properties(
322 Arc::clone(&self.chunk_reader),
323 col,
324 self.metadata.num_rows() as usize,
325 page_locations,
326 props,
327 )?))
328 }
329
330 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
332 self.bloom_filters[i].as_ref()
333 }
334
335 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
336 RowIter::from_row_group(projection, self)
337 }
338}
339
340pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
342 let mut prot = TCompactInputProtocol::new(input);
343 let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
344 Ok(page_header)
345}
346
347fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
349 struct TrackedRead<R> {
351 inner: R,
352 bytes_read: usize,
353 }
354
355 impl<R: Read> Read for TrackedRead<R> {
356 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
357 let v = self.inner.read(buf)?;
358 self.bytes_read += v;
359 Ok(v)
360 }
361 }
362
363 let mut tracked = TrackedRead {
364 inner: input,
365 bytes_read: 0,
366 };
367 let header = read_page_header(&mut tracked)?;
368 Ok((tracked.bytes_read, header))
369}
370
371pub(crate) fn decode_page(
373 page_header: PageHeader,
374 buffer: Bytes,
375 physical_type: Type,
376 decompressor: Option<&mut Box<dyn Codec>>,
377) -> Result<Page> {
378 #[cfg(feature = "crc")]
380 if let Some(expected_crc) = page_header.crc {
381 let crc = crc32fast::hash(&buffer);
382 if crc != expected_crc as u32 {
383 return Err(general_err!("Page CRC checksum mismatch"));
384 }
385 }
386
387 let mut offset: usize = 0;
394 let mut can_decompress = true;
395
396 if let Some(ref header_v2) = page_header.data_page_header_v2 {
397 offset = (header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length)
398 as usize;
399 can_decompress = header_v2.is_compressed.unwrap_or(true);
401 }
402
403 let buffer = match decompressor {
406 Some(decompressor) if can_decompress => {
407 let uncompressed_size = page_header.uncompressed_page_size as usize;
408 let mut decompressed = Vec::with_capacity(uncompressed_size);
409 let compressed = &buffer.as_ref()[offset..];
410 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
411 decompressor.decompress(
412 compressed,
413 &mut decompressed,
414 Some(uncompressed_size - offset),
415 )?;
416
417 if decompressed.len() != uncompressed_size {
418 return Err(general_err!(
419 "Actual decompressed size doesn't match the expected one ({} vs {})",
420 decompressed.len(),
421 uncompressed_size
422 ));
423 }
424
425 Bytes::from(decompressed)
426 }
427 _ => buffer,
428 };
429
430 let result = match page_header.type_ {
431 PageType::DICTIONARY_PAGE => {
432 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
433 ParquetError::General("Missing dictionary page header".to_string())
434 })?;
435 let is_sorted = dict_header.is_sorted.unwrap_or(false);
436 Page::DictionaryPage {
437 buf: buffer,
438 num_values: dict_header.num_values.try_into()?,
439 encoding: Encoding::try_from(dict_header.encoding)?,
440 is_sorted,
441 }
442 }
443 PageType::DATA_PAGE => {
444 let header = page_header
445 .data_page_header
446 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
447 Page::DataPage {
448 buf: buffer,
449 num_values: header.num_values.try_into()?,
450 encoding: Encoding::try_from(header.encoding)?,
451 def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
452 rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
453 statistics: statistics::from_thrift(physical_type, header.statistics)?,
454 }
455 }
456 PageType::DATA_PAGE_V2 => {
457 let header = page_header
458 .data_page_header_v2
459 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
460 let is_compressed = header.is_compressed.unwrap_or(true);
461 Page::DataPageV2 {
462 buf: buffer,
463 num_values: header.num_values.try_into()?,
464 encoding: Encoding::try_from(header.encoding)?,
465 num_nulls: header.num_nulls.try_into()?,
466 num_rows: header.num_rows.try_into()?,
467 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
468 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
469 is_compressed,
470 statistics: statistics::from_thrift(physical_type, header.statistics)?,
471 }
472 }
473 _ => {
474 unimplemented!("Page type {:?} is not supported", page_header.type_)
476 }
477 };
478
479 Ok(result)
480}
481
482enum SerializedPageReaderState {
483 Values {
484 offset: usize,
486
487 remaining_bytes: usize,
489
490 next_page_header: Option<Box<PageHeader>>,
492 },
493 Pages {
494 page_locations: VecDeque<PageLocation>,
496 dictionary_page: Option<PageLocation>,
498 total_rows: usize,
500 },
501}
502
503pub struct SerializedPageReader<R: ChunkReader> {
505 reader: Arc<R>,
507
508 decompressor: Option<Box<dyn Codec>>,
510
511 physical_type: Type,
513
514 state: SerializedPageReaderState,
515}
516
517impl<R: ChunkReader> SerializedPageReader<R> {
518 pub fn new(
520 reader: Arc<R>,
521 meta: &ColumnChunkMetaData,
522 total_rows: usize,
523 page_locations: Option<Vec<PageLocation>>,
524 ) -> Result<Self> {
525 let props = Arc::new(ReaderProperties::builder().build());
526 SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props)
527 }
528
529 pub fn new_with_properties(
531 reader: Arc<R>,
532 meta: &ColumnChunkMetaData,
533 total_rows: usize,
534 page_locations: Option<Vec<PageLocation>>,
535 props: ReaderPropertiesPtr,
536 ) -> Result<Self> {
537 let decompressor = create_codec(meta.compression(), props.codec_options())?;
538 let (start, len) = meta.byte_range();
539
540 let state = match page_locations {
541 Some(locations) => {
542 let dictionary_page = match locations.first() {
543 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
544 offset: start as i64,
545 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
546 first_row_index: 0,
547 }),
548 _ => None,
549 };
550
551 SerializedPageReaderState::Pages {
552 page_locations: locations.into(),
553 dictionary_page,
554 total_rows,
555 }
556 }
557 None => SerializedPageReaderState::Values {
558 offset: start as usize,
559 remaining_bytes: len as usize,
560 next_page_header: None,
561 },
562 };
563
564 Ok(Self {
565 reader,
566 decompressor,
567 state,
568 physical_type: meta.column_type(),
569 })
570 }
571
572 #[cfg(test)]
578 fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
579 match &mut self.state {
580 SerializedPageReaderState::Values {
581 offset,
582 remaining_bytes,
583 next_page_header,
584 } => {
585 loop {
586 if *remaining_bytes == 0 {
587 return Ok(None);
588 }
589 return if let Some(header) = next_page_header.as_ref() {
590 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
591 Ok(Some(*offset))
592 } else {
593 *next_page_header = None;
595 continue;
596 }
597 } else {
598 let mut read = self.reader.get_read(*offset as u64)?;
599 let (header_len, header) = read_page_header_len(&mut read)?;
600 *offset += header_len;
601 *remaining_bytes -= header_len;
602 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
603 Ok(Some(*offset))
604 } else {
605 continue;
607 };
608 *next_page_header = Some(Box::new(header));
609 page_meta
610 };
611 }
612 }
613 SerializedPageReaderState::Pages {
614 page_locations,
615 dictionary_page,
616 ..
617 } => {
618 if let Some(page) = dictionary_page {
619 Ok(Some(page.offset as usize))
620 } else if let Some(page) = page_locations.front() {
621 Ok(Some(page.offset as usize))
622 } else {
623 Ok(None)
624 }
625 }
626 }
627 }
628}
629
630impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
631 type Item = Result<Page>;
632
633 fn next(&mut self) -> Option<Self::Item> {
634 self.get_next_page().transpose()
635 }
636}
637
638fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
639 if header_len > remaining_bytes {
640 return Err(eof_err!("Invalid page header"));
641 }
642 Ok(())
643}
644
645fn verify_page_size(
646 compressed_size: i32,
647 uncompressed_size: i32,
648 remaining_bytes: usize,
649) -> Result<()> {
650 if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
654 return Err(eof_err!("Invalid page header"));
655 }
656 Ok(())
657}
658
659impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
660 fn get_next_page(&mut self) -> Result<Option<Page>> {
661 loop {
662 let page = match &mut self.state {
663 SerializedPageReaderState::Values {
664 offset,
665 remaining_bytes: remaining,
666 next_page_header,
667 } => {
668 if *remaining == 0 {
669 return Ok(None);
670 }
671
672 let mut read = self.reader.get_read(*offset as u64)?;
673 let header = if let Some(header) = next_page_header.take() {
674 *header
675 } else {
676 let (header_len, header) = read_page_header_len(&mut read)?;
677 verify_page_header_len(header_len, *remaining)?;
678 *offset += header_len;
679 *remaining -= header_len;
680 header
681 };
682 verify_page_size(
683 header.compressed_page_size,
684 header.uncompressed_page_size,
685 *remaining,
686 )?;
687 let data_len = header.compressed_page_size as usize;
688 *offset += data_len;
689 *remaining -= data_len;
690
691 if header.type_ == PageType::INDEX_PAGE {
692 continue;
693 }
694
695 let mut buffer = Vec::with_capacity(data_len);
696 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
697
698 if read != data_len {
699 return Err(eof_err!(
700 "Expected to read {} bytes of page, read only {}",
701 data_len,
702 read
703 ));
704 }
705
706 decode_page(
707 header,
708 Bytes::from(buffer),
709 self.physical_type,
710 self.decompressor.as_mut(),
711 )?
712 }
713 SerializedPageReaderState::Pages {
714 page_locations,
715 dictionary_page,
716 ..
717 } => {
718 let front = match dictionary_page
719 .take()
720 .or_else(|| page_locations.pop_front())
721 {
722 Some(front) => front,
723 None => return Ok(None),
724 };
725
726 let page_len = front.compressed_page_size as usize;
727
728 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
729
730 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
731 let header = PageHeader::read_from_in_protocol(&mut prot)?;
732 let offset = buffer.len() - prot.as_slice().len();
733
734 let bytes = buffer.slice(offset..);
735 decode_page(
736 header,
737 bytes,
738 self.physical_type,
739 self.decompressor.as_mut(),
740 )?
741 }
742 };
743
744 return Ok(Some(page));
745 }
746 }
747
748 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
749 match &mut self.state {
750 SerializedPageReaderState::Values {
751 offset,
752 remaining_bytes,
753 next_page_header,
754 } => {
755 loop {
756 if *remaining_bytes == 0 {
757 return Ok(None);
758 }
759 return if let Some(header) = next_page_header.as_ref() {
760 if let Ok(page_meta) = (&**header).try_into() {
761 Ok(Some(page_meta))
762 } else {
763 *next_page_header = None;
765 continue;
766 }
767 } else {
768 let mut read = self.reader.get_read(*offset as u64)?;
769 let (header_len, header) = read_page_header_len(&mut read)?;
770 verify_page_header_len(header_len, *remaining_bytes)?;
771 *offset += header_len;
772 *remaining_bytes -= header_len;
773 let page_meta = if let Ok(page_meta) = (&header).try_into() {
774 Ok(Some(page_meta))
775 } else {
776 continue;
778 };
779 *next_page_header = Some(Box::new(header));
780 page_meta
781 };
782 }
783 }
784 SerializedPageReaderState::Pages {
785 page_locations,
786 dictionary_page,
787 total_rows,
788 } => {
789 if dictionary_page.is_some() {
790 Ok(Some(PageMetadata {
791 num_rows: None,
792 num_levels: None,
793 is_dict: true,
794 }))
795 } else if let Some(page) = page_locations.front() {
796 let next_rows = page_locations
797 .get(1)
798 .map(|x| x.first_row_index as usize)
799 .unwrap_or(*total_rows);
800
801 Ok(Some(PageMetadata {
802 num_rows: Some(next_rows - page.first_row_index as usize),
803 num_levels: None,
804 is_dict: false,
805 }))
806 } else {
807 Ok(None)
808 }
809 }
810 }
811 }
812
813 fn skip_next_page(&mut self) -> Result<()> {
814 match &mut self.state {
815 SerializedPageReaderState::Values {
816 offset,
817 remaining_bytes,
818 next_page_header,
819 } => {
820 if let Some(buffered_header) = next_page_header.take() {
821 verify_page_size(
822 buffered_header.compressed_page_size,
823 buffered_header.uncompressed_page_size,
824 *remaining_bytes,
825 )?;
826 *offset += buffered_header.compressed_page_size as usize;
828 *remaining_bytes -= buffered_header.compressed_page_size as usize;
829 } else {
830 let mut read = self.reader.get_read(*offset as u64)?;
831 let (header_len, header) = read_page_header_len(&mut read)?;
832 verify_page_header_len(header_len, *remaining_bytes)?;
833 verify_page_size(
834 header.compressed_page_size,
835 header.uncompressed_page_size,
836 *remaining_bytes,
837 )?;
838 let data_page_size = header.compressed_page_size as usize;
839 *offset += header_len + data_page_size;
840 *remaining_bytes -= header_len + data_page_size;
841 }
842 Ok(())
843 }
844 SerializedPageReaderState::Pages { page_locations, .. } => {
845 page_locations.pop_front();
846
847 Ok(())
848 }
849 }
850 }
851
852 fn at_record_boundary(&mut self) -> Result<bool> {
853 match &mut self.state {
854 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
855 SerializedPageReaderState::Pages { .. } => Ok(true),
856 }
857 }
858}
859
860#[cfg(test)]
861mod tests {
862 use std::collections::HashSet;
863
864 use bytes::Buf;
865
866 use crate::file::properties::{EnabledStatistics, WriterProperties};
867 use crate::format::BoundaryOrder;
868
869 use crate::basic::{self, ColumnOrder};
870 use crate::column::reader::ColumnReader;
871 use crate::data_type::private::ParquetValueType;
872 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
873 use crate::file::page_index::index::{Index, NativeIndex};
874 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
875 use crate::file::writer::SerializedFileWriter;
876 use crate::record::RowAccessor;
877 use crate::schema::parser::parse_message_type;
878 use crate::util::test_common::file_util::{get_test_file, get_test_path};
879
880 use super::*;
881
882 #[test]
883 fn test_cursor_and_file_has_the_same_behaviour() {
884 let mut buf: Vec<u8> = Vec::new();
885 get_test_file("alltypes_plain.parquet")
886 .read_to_end(&mut buf)
887 .unwrap();
888 let cursor = Bytes::from(buf);
889 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
890
891 let test_file = get_test_file("alltypes_plain.parquet");
892 let read_from_file = SerializedFileReader::new(test_file).unwrap();
893
894 let file_iter = read_from_file.get_row_iter(None).unwrap();
895 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
896
897 for (a, b) in file_iter.zip(cursor_iter) {
898 assert_eq!(a.unwrap(), b.unwrap())
899 }
900 }
901
902 #[test]
903 fn test_file_reader_try_from() {
904 let test_file = get_test_file("alltypes_plain.parquet");
906 let test_path_buf = get_test_path("alltypes_plain.parquet");
907 let test_path = test_path_buf.as_path();
908 let test_path_str = test_path.to_str().unwrap();
909
910 let reader = SerializedFileReader::try_from(test_file);
911 assert!(reader.is_ok());
912
913 let reader = SerializedFileReader::try_from(test_path);
914 assert!(reader.is_ok());
915
916 let reader = SerializedFileReader::try_from(test_path_str);
917 assert!(reader.is_ok());
918
919 let reader = SerializedFileReader::try_from(test_path_str.to_string());
920 assert!(reader.is_ok());
921
922 let test_path = Path::new("invalid.parquet");
924 let test_path_str = test_path.to_str().unwrap();
925
926 let reader = SerializedFileReader::try_from(test_path);
927 assert!(reader.is_err());
928
929 let reader = SerializedFileReader::try_from(test_path_str);
930 assert!(reader.is_err());
931
932 let reader = SerializedFileReader::try_from(test_path_str.to_string());
933 assert!(reader.is_err());
934 }
935
936 #[test]
937 fn test_file_reader_into_iter() {
938 let path = get_test_path("alltypes_plain.parquet");
939 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
940 let iter = reader.into_iter();
941 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
942
943 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
944 }
945
946 #[test]
947 fn test_file_reader_into_iter_project() {
948 let path = get_test_path("alltypes_plain.parquet");
949 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
950 let schema = "message schema { OPTIONAL INT32 id; }";
951 let proj = parse_message_type(schema).ok();
952 let iter = reader.into_iter().project(proj).unwrap();
953 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
954
955 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
956 }
957
958 #[test]
959 fn test_reuse_file_chunk() {
960 let test_file = get_test_file("alltypes_plain.parquet");
964 let reader = SerializedFileReader::new(test_file).unwrap();
965 let row_group = reader.get_row_group(0).unwrap();
966
967 let mut page_readers = Vec::new();
968 for i in 0..row_group.num_columns() {
969 page_readers.push(row_group.get_column_page_reader(i).unwrap());
970 }
971
972 for mut page_reader in page_readers {
975 assert!(page_reader.get_next_page().is_ok());
976 }
977 }
978
979 #[test]
980 fn test_file_reader() {
981 let test_file = get_test_file("alltypes_plain.parquet");
982 let reader_result = SerializedFileReader::new(test_file);
983 assert!(reader_result.is_ok());
984 let reader = reader_result.unwrap();
985
986 let metadata = reader.metadata();
988 assert_eq!(metadata.num_row_groups(), 1);
989
990 let file_metadata = metadata.file_metadata();
992 assert!(file_metadata.created_by().is_some());
993 assert_eq!(
994 file_metadata.created_by().unwrap(),
995 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
996 );
997 assert!(file_metadata.key_value_metadata().is_none());
998 assert_eq!(file_metadata.num_rows(), 8);
999 assert_eq!(file_metadata.version(), 1);
1000 assert_eq!(file_metadata.column_orders(), None);
1001
1002 let row_group_metadata = metadata.row_group(0);
1004 assert_eq!(row_group_metadata.num_columns(), 11);
1005 assert_eq!(row_group_metadata.num_rows(), 8);
1006 assert_eq!(row_group_metadata.total_byte_size(), 671);
1007 for i in 0..row_group_metadata.num_columns() {
1009 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1010 }
1011
1012 let row_group_reader_result = reader.get_row_group(0);
1014 assert!(row_group_reader_result.is_ok());
1015 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1016 assert_eq!(
1017 row_group_reader.num_columns(),
1018 row_group_metadata.num_columns()
1019 );
1020 assert_eq!(
1021 row_group_reader.metadata().total_byte_size(),
1022 row_group_metadata.total_byte_size()
1023 );
1024
1025 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1028 assert!(page_reader_0_result.is_ok());
1029 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1030 let mut page_count = 0;
1031 while let Ok(Some(page)) = page_reader_0.get_next_page() {
1032 let is_expected_page = match page {
1033 Page::DictionaryPage {
1034 buf,
1035 num_values,
1036 encoding,
1037 is_sorted,
1038 } => {
1039 assert_eq!(buf.len(), 32);
1040 assert_eq!(num_values, 8);
1041 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1042 assert!(!is_sorted);
1043 true
1044 }
1045 Page::DataPage {
1046 buf,
1047 num_values,
1048 encoding,
1049 def_level_encoding,
1050 rep_level_encoding,
1051 statistics,
1052 } => {
1053 assert_eq!(buf.len(), 11);
1054 assert_eq!(num_values, 8);
1055 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1056 assert_eq!(def_level_encoding, Encoding::RLE);
1057 #[allow(deprecated)]
1058 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1059 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1060 assert!(statistics.is_none());
1061 true
1062 }
1063 _ => false,
1064 };
1065 assert!(is_expected_page);
1066 page_count += 1;
1067 }
1068 assert_eq!(page_count, 2);
1069 }
1070
1071 #[test]
1072 fn test_file_reader_datapage_v2() {
1073 let test_file = get_test_file("datapage_v2.snappy.parquet");
1074 let reader_result = SerializedFileReader::new(test_file);
1075 assert!(reader_result.is_ok());
1076 let reader = reader_result.unwrap();
1077
1078 let metadata = reader.metadata();
1080 assert_eq!(metadata.num_row_groups(), 1);
1081
1082 let file_metadata = metadata.file_metadata();
1084 assert!(file_metadata.created_by().is_some());
1085 assert_eq!(
1086 file_metadata.created_by().unwrap(),
1087 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1088 );
1089 assert!(file_metadata.key_value_metadata().is_some());
1090 assert_eq!(
1091 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1092 1
1093 );
1094
1095 assert_eq!(file_metadata.num_rows(), 5);
1096 assert_eq!(file_metadata.version(), 1);
1097 assert_eq!(file_metadata.column_orders(), None);
1098
1099 let row_group_metadata = metadata.row_group(0);
1100
1101 for i in 0..row_group_metadata.num_columns() {
1103 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1104 }
1105
1106 let row_group_reader_result = reader.get_row_group(0);
1108 assert!(row_group_reader_result.is_ok());
1109 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1110 assert_eq!(
1111 row_group_reader.num_columns(),
1112 row_group_metadata.num_columns()
1113 );
1114 assert_eq!(
1115 row_group_reader.metadata().total_byte_size(),
1116 row_group_metadata.total_byte_size()
1117 );
1118
1119 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1122 assert!(page_reader_0_result.is_ok());
1123 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1124 let mut page_count = 0;
1125 while let Ok(Some(page)) = page_reader_0.get_next_page() {
1126 let is_expected_page = match page {
1127 Page::DictionaryPage {
1128 buf,
1129 num_values,
1130 encoding,
1131 is_sorted,
1132 } => {
1133 assert_eq!(buf.len(), 7);
1134 assert_eq!(num_values, 1);
1135 assert_eq!(encoding, Encoding::PLAIN);
1136 assert!(!is_sorted);
1137 true
1138 }
1139 Page::DataPageV2 {
1140 buf,
1141 num_values,
1142 encoding,
1143 num_nulls,
1144 num_rows,
1145 def_levels_byte_len,
1146 rep_levels_byte_len,
1147 is_compressed,
1148 statistics,
1149 } => {
1150 assert_eq!(buf.len(), 4);
1151 assert_eq!(num_values, 5);
1152 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1153 assert_eq!(num_nulls, 1);
1154 assert_eq!(num_rows, 5);
1155 assert_eq!(def_levels_byte_len, 2);
1156 assert_eq!(rep_levels_byte_len, 0);
1157 assert!(is_compressed);
1158 assert!(statistics.is_some());
1159 true
1160 }
1161 _ => false,
1162 };
1163 assert!(is_expected_page);
1164 page_count += 1;
1165 }
1166 assert_eq!(page_count, 2);
1167 }
1168
1169 fn get_serialized_page_reader<R: ChunkReader>(
1170 file_reader: &SerializedFileReader<R>,
1171 row_group: usize,
1172 column: usize,
1173 ) -> Result<SerializedPageReader<R>> {
1174 let row_group = {
1175 let row_group_metadata = file_reader.metadata.row_group(row_group);
1176 let props = Arc::clone(&file_reader.props);
1177 let f = Arc::clone(&file_reader.chunk_reader);
1178 SerializedRowGroupReader::new(
1179 f,
1180 row_group_metadata,
1181 file_reader
1182 .metadata
1183 .offset_index()
1184 .map(|x| x[row_group].as_slice()),
1185 props,
1186 )?
1187 };
1188
1189 let col = row_group.metadata.column(column);
1190
1191 let page_locations = row_group
1192 .offset_index
1193 .map(|x| x[column].page_locations.clone());
1194
1195 let props = Arc::clone(&row_group.props);
1196 SerializedPageReader::new_with_properties(
1197 Arc::clone(&row_group.chunk_reader),
1198 col,
1199 row_group.metadata.num_rows() as usize,
1200 page_locations,
1201 props,
1202 )
1203 }
1204
1205 #[test]
1206 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1207 let test_file = get_test_file("alltypes_plain.parquet");
1208 let reader = SerializedFileReader::new(test_file)?;
1209
1210 let mut offset_set = HashSet::new();
1211 let num_row_groups = reader.metadata.num_row_groups();
1212 for row_group in 0..num_row_groups {
1213 let num_columns = reader.metadata.row_group(row_group).num_columns();
1214 for column in 0..num_columns {
1215 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1216
1217 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1218 match &page_reader.state {
1219 SerializedPageReaderState::Pages {
1220 page_locations,
1221 dictionary_page,
1222 ..
1223 } => {
1224 if let Some(page) = dictionary_page {
1225 assert_eq!(page.offset as usize, page_offset);
1226 } else if let Some(page) = page_locations.front() {
1227 assert_eq!(page.offset as usize, page_offset);
1228 } else {
1229 unreachable!()
1230 }
1231 }
1232 SerializedPageReaderState::Values {
1233 offset,
1234 next_page_header,
1235 ..
1236 } => {
1237 assert!(next_page_header.is_some());
1238 assert_eq!(*offset, page_offset);
1239 }
1240 }
1241 let page = page_reader.get_next_page()?;
1242 assert!(page.is_some());
1243 let newly_inserted = offset_set.insert(page_offset);
1244 assert!(newly_inserted);
1245 }
1246 }
1247 }
1248
1249 Ok(())
1250 }
1251
1252 #[test]
1253 fn test_page_iterator() {
1254 let file = get_test_file("alltypes_plain.parquet");
1255 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1256
1257 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1258
1259 let page = page_iterator.next();
1261 assert!(page.is_some());
1262 assert!(page.unwrap().is_ok());
1263
1264 let page = page_iterator.next();
1266 assert!(page.is_none());
1267
1268 let row_group_indices = Box::new(0..1);
1269 let mut page_iterator =
1270 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1271
1272 let page = page_iterator.next();
1274 assert!(page.is_some());
1275 assert!(page.unwrap().is_ok());
1276
1277 let page = page_iterator.next();
1279 assert!(page.is_none());
1280 }
1281
1282 #[test]
1283 fn test_file_reader_key_value_metadata() {
1284 let file = get_test_file("binary.parquet");
1285 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1286
1287 let metadata = file_reader
1288 .metadata
1289 .file_metadata()
1290 .key_value_metadata()
1291 .unwrap();
1292
1293 assert_eq!(metadata.len(), 3);
1294
1295 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1296
1297 assert_eq!(metadata[1].key, "writer.model.name");
1298 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1299
1300 assert_eq!(metadata[2].key, "parquet.proto.class");
1301 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1302 }
1303
1304 #[test]
1305 fn test_file_reader_optional_metadata() {
1306 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1308 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1309
1310 let row_group_metadata = file_reader.metadata.row_group(0);
1311 let col0_metadata = row_group_metadata.column(0);
1312
1313 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1315
1316 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1318
1319 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1320 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1321 assert_eq!(page_encoding_stats.count, 1);
1322
1323 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1325 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1326
1327 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1329 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1330 }
1331
1332 #[test]
1333 fn test_file_reader_with_no_filter() -> Result<()> {
1334 let test_file = get_test_file("alltypes_plain.parquet");
1335 let origin_reader = SerializedFileReader::new(test_file)?;
1336 let metadata = origin_reader.metadata();
1338 assert_eq!(metadata.num_row_groups(), 1);
1339 Ok(())
1340 }
1341
1342 #[test]
1343 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1344 let test_file = get_test_file("alltypes_plain.parquet");
1345 let read_options = ReadOptionsBuilder::new()
1346 .with_predicate(Box::new(|_, _| false))
1347 .build();
1348 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1349 let metadata = reader.metadata();
1350 assert_eq!(metadata.num_row_groups(), 0);
1351 Ok(())
1352 }
1353
1354 #[test]
1355 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1356 let test_file = get_test_file("alltypes_plain.parquet");
1357 let origin_reader = SerializedFileReader::new(test_file)?;
1358 let metadata = origin_reader.metadata();
1360 assert_eq!(metadata.num_row_groups(), 1);
1361 let mid = get_midpoint_offset(metadata.row_group(0));
1362
1363 let test_file = get_test_file("alltypes_plain.parquet");
1364 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1365 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1366 let metadata = reader.metadata();
1367 assert_eq!(metadata.num_row_groups(), 1);
1368
1369 let test_file = get_test_file("alltypes_plain.parquet");
1370 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1371 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1372 let metadata = reader.metadata();
1373 assert_eq!(metadata.num_row_groups(), 0);
1374 Ok(())
1375 }
1376
1377 #[test]
1378 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1379 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1380 let origin_reader = SerializedFileReader::new(test_file)?;
1381 let metadata = origin_reader.metadata();
1382 let mid = get_midpoint_offset(metadata.row_group(0));
1383
1384 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1386 let read_options = ReadOptionsBuilder::new()
1387 .with_page_index()
1388 .with_predicate(Box::new(|_, _| true))
1389 .with_range(mid, mid + 1)
1390 .build();
1391 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1392 let metadata = reader.metadata();
1393 assert_eq!(metadata.num_row_groups(), 1);
1394 assert_eq!(metadata.column_index().unwrap().len(), 1);
1395 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1396
1397 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1399 let read_options = ReadOptionsBuilder::new()
1400 .with_page_index()
1401 .with_predicate(Box::new(|_, _| true))
1402 .with_range(0, mid)
1403 .build();
1404 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1405 let metadata = reader.metadata();
1406 assert_eq!(metadata.num_row_groups(), 0);
1407 assert!(metadata.column_index().is_none());
1408 assert!(metadata.offset_index().is_none());
1409
1410 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1412 let read_options = ReadOptionsBuilder::new()
1413 .with_page_index()
1414 .with_predicate(Box::new(|_, _| false))
1415 .with_range(mid, mid + 1)
1416 .build();
1417 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1418 let metadata = reader.metadata();
1419 assert_eq!(metadata.num_row_groups(), 0);
1420 assert!(metadata.column_index().is_none());
1421 assert!(metadata.offset_index().is_none());
1422
1423 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1425 let read_options = ReadOptionsBuilder::new()
1426 .with_page_index()
1427 .with_predicate(Box::new(|_, _| false))
1428 .with_range(0, mid)
1429 .build();
1430 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1431 let metadata = reader.metadata();
1432 assert_eq!(metadata.num_row_groups(), 0);
1433 assert!(metadata.column_index().is_none());
1434 assert!(metadata.offset_index().is_none());
1435 Ok(())
1436 }
1437
1438 #[test]
1439 fn test_file_reader_invalid_metadata() {
1440 let data = [
1441 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1442 80, 65, 82, 49,
1443 ];
1444 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1445 assert_eq!(
1446 ret.err().unwrap().to_string(),
1447 "Parquet error: Could not parse metadata: bad data"
1448 );
1449 }
1450
1451 #[test]
1452 fn test_page_index_reader() {
1469 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1470 let builder = ReadOptionsBuilder::new();
1471 let options = builder.with_page_index().build();
1473 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1474 let reader = reader_result.unwrap();
1475
1476 let metadata = reader.metadata();
1478 assert_eq!(metadata.num_row_groups(), 1);
1479
1480 let column_index = metadata.column_index().unwrap();
1481
1482 assert_eq!(column_index.len(), 1);
1484 let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1485 index
1486 } else {
1487 unreachable!()
1488 };
1489
1490 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1491 let index_in_pages = &index.indexes;
1492
1493 assert_eq!(index_in_pages.len(), 1);
1495
1496 let page0 = &index_in_pages[0];
1497 let min = page0.min.as_ref().unwrap();
1498 let max = page0.max.as_ref().unwrap();
1499 assert_eq!(b"Hello", min.as_bytes());
1500 assert_eq!(b"today", max.as_bytes());
1501
1502 let offset_indexes = metadata.offset_index().unwrap();
1503 assert_eq!(offset_indexes.len(), 1);
1505 let offset_index = &offset_indexes[0];
1506 let page_offset = &offset_index[0].page_locations()[0];
1507
1508 assert_eq!(4, page_offset.offset);
1509 assert_eq!(152, page_offset.compressed_page_size);
1510 assert_eq!(0, page_offset.first_row_index);
1511 }
1512
1513 #[test]
1514 fn test_page_index_reader_out_of_order() {
1515 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1516 let options = ReadOptionsBuilder::new().with_page_index().build();
1517 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1518 let metadata = reader.metadata();
1519
1520 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1521 let columns = metadata.row_group(0).columns();
1522 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1523
1524 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1525 let mut b = read_columns_indexes(&test_file, &reversed)
1526 .unwrap()
1527 .unwrap();
1528 b.reverse();
1529 assert_eq!(a, b);
1530
1531 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1532 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1533 b.reverse();
1534 assert_eq!(a, b);
1535 }
1536
1537 #[test]
1538 fn test_page_index_reader_all_type() {
1539 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1540 let builder = ReadOptionsBuilder::new();
1541 let options = builder.with_page_index().build();
1543 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1544 let reader = reader_result.unwrap();
1545
1546 let metadata = reader.metadata();
1548 assert_eq!(metadata.num_row_groups(), 1);
1549
1550 let column_index = metadata.column_index().unwrap();
1551 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1552
1553 assert_eq!(column_index.len(), 1);
1555 let row_group_metadata = metadata.row_group(0);
1556
1557 assert!(!&column_index[0][0].is_sorted());
1559 let boundary_order = &column_index[0][0].get_boundary_order();
1560 assert!(boundary_order.is_some());
1561 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1562 if let Index::INT32(index) = &column_index[0][0] {
1563 check_native_page_index(
1564 index,
1565 325,
1566 get_row_group_min_max_bytes(row_group_metadata, 0),
1567 BoundaryOrder::UNORDERED,
1568 );
1569 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1570 } else {
1571 unreachable!()
1572 };
1573 assert!(&column_index[0][1].is_sorted());
1575 if let Index::BOOLEAN(index) = &column_index[0][1] {
1576 assert_eq!(index.indexes.len(), 82);
1577 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1578 } else {
1579 unreachable!()
1580 };
1581 assert!(&column_index[0][2].is_sorted());
1583 if let Index::INT32(index) = &column_index[0][2] {
1584 check_native_page_index(
1585 index,
1586 325,
1587 get_row_group_min_max_bytes(row_group_metadata, 2),
1588 BoundaryOrder::ASCENDING,
1589 );
1590 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1591 } else {
1592 unreachable!()
1593 };
1594 assert!(&column_index[0][3].is_sorted());
1596 if let Index::INT32(index) = &column_index[0][3] {
1597 check_native_page_index(
1598 index,
1599 325,
1600 get_row_group_min_max_bytes(row_group_metadata, 3),
1601 BoundaryOrder::ASCENDING,
1602 );
1603 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1604 } else {
1605 unreachable!()
1606 };
1607 assert!(&column_index[0][4].is_sorted());
1609 if let Index::INT32(index) = &column_index[0][4] {
1610 check_native_page_index(
1611 index,
1612 325,
1613 get_row_group_min_max_bytes(row_group_metadata, 4),
1614 BoundaryOrder::ASCENDING,
1615 );
1616 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1617 } else {
1618 unreachable!()
1619 };
1620 assert!(!&column_index[0][5].is_sorted());
1622 if let Index::INT64(index) = &column_index[0][5] {
1623 check_native_page_index(
1624 index,
1625 528,
1626 get_row_group_min_max_bytes(row_group_metadata, 5),
1627 BoundaryOrder::UNORDERED,
1628 );
1629 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
1630 } else {
1631 unreachable!()
1632 };
1633 assert!(&column_index[0][6].is_sorted());
1635 if let Index::FLOAT(index) = &column_index[0][6] {
1636 check_native_page_index(
1637 index,
1638 325,
1639 get_row_group_min_max_bytes(row_group_metadata, 6),
1640 BoundaryOrder::ASCENDING,
1641 );
1642 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
1643 } else {
1644 unreachable!()
1645 };
1646 assert!(!&column_index[0][7].is_sorted());
1648 if let Index::DOUBLE(index) = &column_index[0][7] {
1649 check_native_page_index(
1650 index,
1651 528,
1652 get_row_group_min_max_bytes(row_group_metadata, 7),
1653 BoundaryOrder::UNORDERED,
1654 );
1655 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
1656 } else {
1657 unreachable!()
1658 };
1659 assert!(!&column_index[0][8].is_sorted());
1661 if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
1662 check_native_page_index(
1663 index,
1664 974,
1665 get_row_group_min_max_bytes(row_group_metadata, 8),
1666 BoundaryOrder::UNORDERED,
1667 );
1668 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
1669 } else {
1670 unreachable!()
1671 };
1672 assert!(&column_index[0][9].is_sorted());
1674 if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
1675 check_native_page_index(
1676 index,
1677 352,
1678 get_row_group_min_max_bytes(row_group_metadata, 9),
1679 BoundaryOrder::ASCENDING,
1680 );
1681 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
1682 } else {
1683 unreachable!()
1684 };
1685 assert!(!&column_index[0][10].is_sorted());
1688 if let Index::NONE = &column_index[0][10] {
1689 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
1690 } else {
1691 unreachable!()
1692 };
1693 assert!(&column_index[0][11].is_sorted());
1695 if let Index::INT32(index) = &column_index[0][11] {
1696 check_native_page_index(
1697 index,
1698 325,
1699 get_row_group_min_max_bytes(row_group_metadata, 11),
1700 BoundaryOrder::ASCENDING,
1701 );
1702 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
1703 } else {
1704 unreachable!()
1705 };
1706 assert!(!&column_index[0][12].is_sorted());
1708 if let Index::INT32(index) = &column_index[0][12] {
1709 check_native_page_index(
1710 index,
1711 325,
1712 get_row_group_min_max_bytes(row_group_metadata, 12),
1713 BoundaryOrder::UNORDERED,
1714 );
1715 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
1716 } else {
1717 unreachable!()
1718 };
1719 }
1720
1721 fn check_native_page_index<T: ParquetValueType>(
1722 row_group_index: &NativeIndex<T>,
1723 page_size: usize,
1724 min_max: (&[u8], &[u8]),
1725 boundary_order: BoundaryOrder,
1726 ) {
1727 assert_eq!(row_group_index.indexes.len(), page_size);
1728 assert_eq!(row_group_index.boundary_order, boundary_order);
1729 row_group_index.indexes.iter().all(|x| {
1730 x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
1731 && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
1732 });
1733 }
1734
1735 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
1736 let statistics = r.column(col_num).statistics().unwrap();
1737 (
1738 statistics.min_bytes_opt().unwrap_or_default(),
1739 statistics.max_bytes_opt().unwrap_or_default(),
1740 )
1741 }
1742
1743 #[test]
1744 fn test_skip_page_with_offset_index() {
1745 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1746 let builder = ReadOptionsBuilder::new();
1747 let options = builder.with_page_index().build();
1749 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1750 let reader = reader_result.unwrap();
1751
1752 let row_group_reader = reader.get_row_group(0).unwrap();
1753
1754 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1756
1757 let mut vec = vec![];
1758
1759 for i in 0..325 {
1760 if i % 2 == 0 {
1761 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1762 } else {
1763 column_page_reader.skip_next_page().unwrap();
1764 }
1765 }
1766 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1768 assert!(column_page_reader.get_next_page().unwrap().is_none());
1769
1770 assert_eq!(vec.len(), 163);
1771 }
1772
1773 #[test]
1774 fn test_skip_page_without_offset_index() {
1775 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1776
1777 let reader_result = SerializedFileReader::new(test_file);
1779 let reader = reader_result.unwrap();
1780
1781 let row_group_reader = reader.get_row_group(0).unwrap();
1782
1783 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1785
1786 let mut vec = vec![];
1787
1788 for i in 0..325 {
1789 if i % 2 == 0 {
1790 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1791 } else {
1792 column_page_reader.peek_next_page().unwrap().unwrap();
1793 column_page_reader.skip_next_page().unwrap();
1794 }
1795 }
1796 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1798 assert!(column_page_reader.get_next_page().unwrap().is_none());
1799
1800 assert_eq!(vec.len(), 163);
1801 }
1802
1803 #[test]
1804 fn test_peek_page_with_dictionary_page() {
1805 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1806 let builder = ReadOptionsBuilder::new();
1807 let options = builder.with_page_index().build();
1809 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1810 let reader = reader_result.unwrap();
1811 let row_group_reader = reader.get_row_group(0).unwrap();
1812
1813 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1815
1816 let mut vec = vec![];
1817
1818 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1819 assert!(meta.is_dict);
1820 let page = column_page_reader.get_next_page().unwrap().unwrap();
1821 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1822
1823 for i in 0..352 {
1824 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1825 if i != 351 {
1828 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
1829 } else {
1830 assert_eq!(meta.num_rows, Some(10));
1833 }
1834 assert!(!meta.is_dict);
1835 vec.push(meta);
1836 let page = column_page_reader.get_next_page().unwrap().unwrap();
1837 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1838 }
1839
1840 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1842 assert!(column_page_reader.get_next_page().unwrap().is_none());
1843
1844 assert_eq!(vec.len(), 352);
1845 }
1846
1847 #[test]
1848 fn test_peek_page_with_dictionary_page_without_offset_index() {
1849 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1850
1851 let reader_result = SerializedFileReader::new(test_file);
1852 let reader = reader_result.unwrap();
1853 let row_group_reader = reader.get_row_group(0).unwrap();
1854
1855 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1857
1858 let mut vec = vec![];
1859
1860 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1861 assert!(meta.is_dict);
1862 let page = column_page_reader.get_next_page().unwrap().unwrap();
1863 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1864
1865 for i in 0..352 {
1866 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1867 if i != 351 {
1870 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
1871 } else {
1872 assert_eq!(meta.num_levels, Some(10));
1875 }
1876 assert!(!meta.is_dict);
1877 vec.push(meta);
1878 let page = column_page_reader.get_next_page().unwrap().unwrap();
1879 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1880 }
1881
1882 assert!(column_page_reader.peek_next_page().unwrap().is_none());
1884 assert!(column_page_reader.get_next_page().unwrap().is_none());
1885
1886 assert_eq!(vec.len(), 352);
1887 }
1888
1889 #[test]
1890 fn test_fixed_length_index() {
1891 let message_type = "
1892 message test_schema {
1893 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
1894 }
1895 ";
1896
1897 let schema = parse_message_type(message_type).unwrap();
1898 let mut out = Vec::with_capacity(1024);
1899 let mut writer =
1900 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
1901
1902 let mut r = writer.next_row_group().unwrap();
1903 let mut c = r.next_column().unwrap().unwrap();
1904 c.typed::<FixedLenByteArrayType>()
1905 .write_batch(
1906 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
1907 Some(&[1, 1, 0, 1]),
1908 None,
1909 )
1910 .unwrap();
1911 c.close().unwrap();
1912 r.close().unwrap();
1913 writer.close().unwrap();
1914
1915 let b = Bytes::from(out);
1916 let options = ReadOptionsBuilder::new().with_page_index().build();
1917 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
1918 let index = reader.metadata().column_index().unwrap();
1919
1920 assert_eq!(index.len(), 1);
1922 let c = &index[0];
1923 assert_eq!(c.len(), 1);
1925
1926 match &c[0] {
1927 Index::FIXED_LEN_BYTE_ARRAY(v) => {
1928 assert_eq!(v.indexes.len(), 1);
1929 let page_idx = &v.indexes[0];
1930 assert_eq!(page_idx.null_count.unwrap(), 1);
1931 assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
1932 assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
1933 }
1934 _ => unreachable!(),
1935 }
1936 }
1937
1938 #[test]
1939 fn test_multi_gz() {
1940 let file = get_test_file("concatenated_gzip_members.parquet");
1941 let reader = SerializedFileReader::new(file).unwrap();
1942 let row_group_reader = reader.get_row_group(0).unwrap();
1943 match row_group_reader.get_column_reader(0).unwrap() {
1944 ColumnReader::Int64ColumnReader(mut reader) => {
1945 let mut buffer = Vec::with_capacity(1024);
1946 let mut def_levels = Vec::with_capacity(1024);
1947 let (num_records, num_values, num_levels) = reader
1948 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
1949 .unwrap();
1950
1951 assert_eq!(num_records, 513);
1952 assert_eq!(num_values, 513);
1953 assert_eq!(num_levels, 513);
1954
1955 let expected: Vec<i64> = (1..514).collect();
1956 assert_eq!(&buffer, &expected);
1957 }
1958 _ => unreachable!(),
1959 }
1960 }
1961
1962 #[test]
1963 fn test_byte_stream_split_extended() {
1964 let path = format!(
1965 "{}/byte_stream_split_extended.gzip.parquet",
1966 arrow::util::test_util::parquet_test_data(),
1967 );
1968 let file = File::open(path).unwrap();
1969 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
1970
1971 let mut iter = reader
1973 .get_row_iter(None)
1974 .expect("Failed to create row iterator");
1975
1976 let mut start = 0;
1977 let end = reader.metadata().file_metadata().num_rows();
1978
1979 let check_row = |row: Result<Row, ParquetError>| {
1980 assert!(row.is_ok());
1981 let r = row.unwrap();
1982 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
1983 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
1984 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
1985 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
1986 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
1987 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
1988 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
1989 };
1990
1991 while start < end {
1992 match iter.next() {
1993 Some(row) => check_row(row),
1994 None => break,
1995 };
1996 start += 1;
1997 }
1998 }
1999
2000 #[test]
2001 fn test_filtered_rowgroup_metadata() {
2002 let message_type = "
2003 message test_schema {
2004 REQUIRED INT32 a;
2005 }
2006 ";
2007 let schema = Arc::new(parse_message_type(message_type).unwrap());
2008 let props = Arc::new(
2009 WriterProperties::builder()
2010 .set_statistics_enabled(EnabledStatistics::Page)
2011 .build(),
2012 );
2013 let mut file: File = tempfile::tempfile().unwrap();
2014 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2015 let data = [1, 2, 3, 4, 5];
2016
2017 for idx in 0..5 {
2019 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2020 let mut row_group_writer = file_writer.next_row_group().unwrap();
2021 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2022 writer
2023 .typed::<Int32Type>()
2024 .write_batch(data_i.as_slice(), None, None)
2025 .unwrap();
2026 writer.close().unwrap();
2027 }
2028 row_group_writer.close().unwrap();
2029 file_writer.flushed_row_groups();
2030 }
2031 let file_metadata = file_writer.close().unwrap();
2032
2033 assert_eq!(file_metadata.num_rows, 25);
2034 assert_eq!(file_metadata.row_groups.len(), 5);
2035
2036 let read_options = ReadOptionsBuilder::new()
2038 .with_page_index()
2039 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2040 .build();
2041 let reader =
2042 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2043 .unwrap();
2044 let metadata = reader.metadata();
2045
2046 assert_eq!(metadata.num_row_groups(), 1);
2048 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2049
2050 assert!(metadata.column_index().is_some());
2052 assert!(metadata.offset_index().is_some());
2053 assert_eq!(metadata.column_index().unwrap().len(), 1);
2054 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2055 let col_idx = metadata.column_index().unwrap();
2056 let off_idx = metadata.offset_index().unwrap();
2057 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2058 let pg_idx = &col_idx[0][0];
2059 let off_idx_i = &off_idx[0][0];
2060
2061 match pg_idx {
2063 Index::INT32(int_idx) => {
2064 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2065 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2066 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2067 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2068 }
2069 _ => panic!("wrong stats type"),
2070 }
2071
2072 assert_eq!(
2074 off_idx_i.page_locations[0].offset,
2075 metadata.row_group(0).column(0).data_page_offset()
2076 );
2077
2078 let read_options = ReadOptionsBuilder::new()
2080 .with_page_index()
2081 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2082 .build();
2083 let reader =
2084 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2085 .unwrap();
2086 let metadata = reader.metadata();
2087
2088 assert_eq!(metadata.num_row_groups(), 2);
2090 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2091 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2092
2093 assert!(metadata.column_index().is_some());
2095 assert!(metadata.offset_index().is_some());
2096 assert_eq!(metadata.column_index().unwrap().len(), 2);
2097 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2098 let col_idx = metadata.column_index().unwrap();
2099 let off_idx = metadata.offset_index().unwrap();
2100
2101 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2102 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2103 let pg_idx = &col_idx_i[0];
2104 let off_idx_i = &off_idx[i][0];
2105
2106 match pg_idx {
2108 Index::INT32(int_idx) => {
2109 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2110 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2111 assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2112 assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2113 }
2114 _ => panic!("wrong stats type"),
2115 }
2116
2117 assert_eq!(
2119 off_idx_i.page_locations[0].offset,
2120 metadata.row_group(i).column(0).data_page_offset()
2121 );
2122 }
2123 }
2124}