1use std::collections::VecDeque;
27use std::fmt::Formatter;
28use std::io::SeekFrom;
29use std::ops::Range;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33
34use bytes::{Buf, Bytes};
35use futures::future::{BoxFuture, FutureExt};
36use futures::ready;
37use futures::stream::Stream;
38use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
39
40use arrow_array::RecordBatch;
41use arrow_schema::{DataType, Fields, Schema, SchemaRef};
42
43use crate::arrow::array_reader::{build_array_reader, RowGroups};
44use crate::arrow::arrow_reader::{
45 apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
46 ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::file::FOOTER_SIZE;
59use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
60
61mod metadata;
62pub use metadata::*;
63
64#[cfg(feature = "object_store")]
65mod store;
66
67use crate::arrow::schema::ParquetField;
68#[cfg(feature = "object_store")]
69pub use store::*;
70
71pub trait AsyncFileReader: Send {
85 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
87
88 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
90 async move {
91 let mut result = Vec::with_capacity(ranges.len());
92
93 for range in ranges.into_iter() {
94 let data = self.get_bytes(range).await?;
95 result.push(data);
96 }
97
98 Ok(result)
99 }
100 .boxed()
101 }
102
103 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
107}
108
109impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
111 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
112 self.as_mut().get_bytes(range)
113 }
114
115 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
116 self.as_mut().get_byte_ranges(ranges)
117 }
118
119 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
120 self.as_mut().get_metadata()
121 }
122}
123
124impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
125 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
126 async move {
127 self.seek(SeekFrom::Start(range.start as u64)).await?;
128
129 let to_read = range.end - range.start;
130 let mut buffer = Vec::with_capacity(to_read);
131 let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
132 if read != to_read {
133 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
134 }
135
136 Ok(buffer.into())
137 }
138 .boxed()
139 }
140
141 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
142 const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
143 async move {
144 self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
145
146 let mut buf = [0_u8; FOOTER_SIZE];
147 self.read_exact(&mut buf).await?;
148
149 let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?;
150 self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
151 .await?;
152
153 let mut buf = Vec::with_capacity(metadata_len);
154 self.take(metadata_len as _).read_to_end(&mut buf).await?;
155
156 Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
157 }
158 .boxed()
159 }
160}
161
162impl ArrowReaderMetadata {
163 pub async fn load_async<T: AsyncFileReader>(
173 input: &mut T,
174 options: ArrowReaderOptions,
175 ) -> Result<Self> {
176 let mut metadata = input.get_metadata().await?;
179
180 if options.page_index
181 && metadata.column_index().is_none()
182 && metadata.offset_index().is_none()
183 {
184 let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
185 let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
186 reader.load_page_index(input).await?;
187 metadata = Arc::new(reader.finish()?)
188 }
189 Self::try_new(metadata, options)
190 }
191}
192
193#[doc(hidden)]
194pub struct AsyncReader<T>(T);
199
200pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
210
211impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
212 pub async fn new(input: T) -> Result<Self> {
343 Self::new_with_options(input, Default::default()).await
344 }
345
346 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
349 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
350 Ok(Self::new_with_metadata(input, metadata))
351 }
352
353 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
399 Self::new_builder(AsyncReader(input), metadata)
400 }
401
402 pub async fn get_row_group_column_bloom_filter(
408 &mut self,
409 row_group_idx: usize,
410 column_idx: usize,
411 ) -> Result<Option<Sbbf>> {
412 let metadata = self.metadata.row_group(row_group_idx);
413 let column_metadata = metadata.column(column_idx);
414
415 let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
416 offset
417 .try_into()
418 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
419 } else {
420 return Ok(None);
421 };
422
423 let buffer = match column_metadata.bloom_filter_length() {
424 Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
425 None => self
426 .input
427 .0
428 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
429 }
430 .await?;
431
432 let (header, bitset_offset) =
433 chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;
434
435 match header.algorithm {
436 BloomFilterAlgorithm::BLOCK(_) => {
437 }
439 }
440 match header.compression {
441 BloomFilterCompression::UNCOMPRESSED(_) => {
442 }
444 }
445 match header.hash {
446 BloomFilterHash::XXHASH(_) => {
447 }
449 }
450
451 let bitset = match column_metadata.bloom_filter_length() {
452 Some(_) => buffer.slice((bitset_offset as usize - offset)..),
453 None => {
454 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
455 ParquetError::General("Bloom filter length is invalid".to_string())
456 })?;
457 self.input
458 .0
459 .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length)
460 .await?
461 }
462 };
463 Ok(Some(Sbbf::new(&bitset)))
464 }
465
466 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
470 let num_row_groups = self.metadata.row_groups().len();
471
472 let row_groups = match self.row_groups {
473 Some(row_groups) => {
474 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
475 return Err(general_err!(
476 "row group {} out of bounds 0..{}",
477 col,
478 num_row_groups
479 ));
480 }
481 row_groups.into()
482 }
483 None => (0..self.metadata.row_groups().len()).collect(),
484 };
485
486 let batch_size = self
488 .batch_size
489 .min(self.metadata.file_metadata().num_rows() as usize);
490 let reader = ReaderFactory {
491 input: self.input.0,
492 filter: self.filter,
493 metadata: self.metadata.clone(),
494 fields: self.fields,
495 limit: self.limit,
496 offset: self.offset,
497 };
498
499 let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
502 Some(DataType::Struct(fields)) => {
503 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
504 }
505 None => Fields::empty(),
506 _ => unreachable!("Must be Struct for root type"),
507 };
508 let schema = Arc::new(Schema::new(projected_fields));
509
510 Ok(ParquetRecordBatchStream {
511 metadata: self.metadata,
512 batch_size,
513 row_groups,
514 projection: self.projection,
515 selection: self.selection,
516 schema,
517 reader: Some(reader),
518 state: StreamState::Init,
519 })
520 }
521}
522
523type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
524
525struct ReaderFactory<T> {
528 metadata: Arc<ParquetMetaData>,
529
530 fields: Option<Arc<ParquetField>>,
531
532 input: T,
533
534 filter: Option<RowFilter>,
535
536 limit: Option<usize>,
537
538 offset: Option<usize>,
539}
540
541impl<T> ReaderFactory<T>
542where
543 T: AsyncFileReader + Send,
544{
545 async fn read_row_group(
549 mut self,
550 row_group_idx: usize,
551 mut selection: Option<RowSelection>,
552 projection: ProjectionMask,
553 batch_size: usize,
554 ) -> ReadResult<T> {
555 let meta = self.metadata.row_group(row_group_idx);
558 let offset_index = self
559 .metadata
560 .offset_index()
561 .filter(|index| !index.is_empty())
563 .map(|x| x[row_group_idx].as_slice());
564
565 let mut row_group = InMemoryRowGroup {
566 metadata: meta,
567 row_count: meta.num_rows() as usize,
569 column_chunks: vec![None; meta.columns().len()],
570 offset_index,
571 };
572
573 if let Some(filter) = self.filter.as_mut() {
574 for predicate in filter.predicates.iter_mut() {
575 if !selects_any(selection.as_ref()) {
576 return Ok((self, None));
577 }
578
579 let predicate_projection = predicate.projection();
580 row_group
581 .fetch(&mut self.input, predicate_projection, selection.as_ref())
582 .await?;
583
584 let array_reader =
585 build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
586
587 selection = Some(evaluate_predicate(
588 batch_size,
589 array_reader,
590 selection,
591 predicate.as_mut(),
592 )?);
593 }
594 }
595
596 let rows_before = selection
598 .as_ref()
599 .map(|s| s.row_count())
600 .unwrap_or(row_group.row_count);
601
602 if rows_before == 0 {
603 return Ok((self, None));
604 }
605
606 selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
607
608 let rows_after = selection
610 .as_ref()
611 .map(|s| s.row_count())
612 .unwrap_or(row_group.row_count);
613
614 if let Some(offset) = &mut self.offset {
616 *offset = offset.saturating_sub(rows_before - rows_after)
619 }
620
621 if rows_after == 0 {
622 return Ok((self, None));
623 }
624
625 if let Some(limit) = &mut self.limit {
626 *limit -= rows_after;
627 }
628
629 row_group
630 .fetch(&mut self.input, &projection, selection.as_ref())
631 .await?;
632
633 let reader = ParquetRecordBatchReader::new(
634 batch_size,
635 build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
636 selection,
637 );
638
639 Ok((self, Some(reader)))
640 }
641}
642
643enum StreamState<T> {
644 Init,
646 Decoding(ParquetRecordBatchReader),
648 Reading(BoxFuture<'static, ReadResult<T>>),
650 Error,
652}
653
654impl<T> std::fmt::Debug for StreamState<T> {
655 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
656 match self {
657 StreamState::Init => write!(f, "StreamState::Init"),
658 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
659 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
660 StreamState::Error => write!(f, "StreamState::Error"),
661 }
662 }
663}
664
665pub struct ParquetRecordBatchStream<T> {
683 metadata: Arc<ParquetMetaData>,
684
685 schema: SchemaRef,
686
687 row_groups: VecDeque<usize>,
688
689 projection: ProjectionMask,
690
691 batch_size: usize,
692
693 selection: Option<RowSelection>,
694
695 reader: Option<ReaderFactory<T>>,
697
698 state: StreamState<T>,
699}
700
701impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
702 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
703 f.debug_struct("ParquetRecordBatchStream")
704 .field("metadata", &self.metadata)
705 .field("schema", &self.schema)
706 .field("batch_size", &self.batch_size)
707 .field("projection", &self.projection)
708 .field("state", &self.state)
709 .finish()
710 }
711}
712
713impl<T> ParquetRecordBatchStream<T> {
714 pub fn schema(&self) -> &SchemaRef {
719 &self.schema
720 }
721}
722
723impl<T> ParquetRecordBatchStream<T>
724where
725 T: AsyncFileReader + Unpin + Send + 'static,
726{
727 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
741 loop {
742 match &mut self.state {
743 StreamState::Decoding(_) | StreamState::Reading(_) => {
744 return Err(ParquetError::General(
745 "Cannot combine the use of next_row_group with the Stream API".to_string(),
746 ))
747 }
748 StreamState::Init => {
749 let row_group_idx = match self.row_groups.pop_front() {
750 Some(idx) => idx,
751 None => return Ok(None),
752 };
753
754 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
755
756 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
757
758 let reader_factory = self.reader.take().expect("lost reader");
759
760 let (reader_factory, maybe_reader) = reader_factory
761 .read_row_group(
762 row_group_idx,
763 selection,
764 self.projection.clone(),
765 self.batch_size,
766 )
767 .await
768 .map_err(|err| {
769 self.state = StreamState::Error;
770 err
771 })?;
772 self.reader = Some(reader_factory);
773
774 if let Some(reader) = maybe_reader {
775 return Ok(Some(reader));
776 } else {
777 continue;
779 }
780 }
781 StreamState::Error => return Ok(None), }
783 }
784 }
785}
786
787impl<T> Stream for ParquetRecordBatchStream<T>
788where
789 T: AsyncFileReader + Unpin + Send + 'static,
790{
791 type Item = Result<RecordBatch>;
792
793 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
794 loop {
795 match &mut self.state {
796 StreamState::Decoding(batch_reader) => match batch_reader.next() {
797 Some(Ok(batch)) => {
798 return Poll::Ready(Some(Ok(batch)));
799 }
800 Some(Err(e)) => {
801 self.state = StreamState::Error;
802 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
803 }
804 None => self.state = StreamState::Init,
805 },
806 StreamState::Init => {
807 let row_group_idx = match self.row_groups.pop_front() {
808 Some(idx) => idx,
809 None => return Poll::Ready(None),
810 };
811
812 let reader = self.reader.take().expect("lost reader");
813
814 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
815
816 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
817
818 let fut = reader
819 .read_row_group(
820 row_group_idx,
821 selection,
822 self.projection.clone(),
823 self.batch_size,
824 )
825 .boxed();
826
827 self.state = StreamState::Reading(fut)
828 }
829 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
830 Ok((reader_factory, maybe_reader)) => {
831 self.reader = Some(reader_factory);
832 match maybe_reader {
833 Some(reader) => self.state = StreamState::Decoding(reader),
835 None => self.state = StreamState::Init,
837 }
838 }
839 Err(e) => {
840 self.state = StreamState::Error;
841 return Poll::Ready(Some(Err(e)));
842 }
843 },
844 StreamState::Error => return Poll::Ready(None), }
846 }
847 }
848}
849
850struct InMemoryRowGroup<'a> {
852 metadata: &'a RowGroupMetaData,
853 offset_index: Option<&'a [OffsetIndexMetaData]>,
854 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
855 row_count: usize,
856}
857
858impl InMemoryRowGroup<'_> {
859 async fn fetch<T: AsyncFileReader + Send>(
861 &mut self,
862 input: &mut T,
863 projection: &ProjectionMask,
864 selection: Option<&RowSelection>,
865 ) -> Result<()> {
866 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
867 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
870
871 let fetch_ranges = self
872 .column_chunks
873 .iter()
874 .zip(self.metadata.columns())
875 .enumerate()
876 .filter(|&(idx, (chunk, _chunk_meta))| {
877 chunk.is_none() && projection.leaf_included(idx)
878 })
879 .flat_map(|(idx, (_chunk, chunk_meta))| {
880 let mut ranges = vec![];
883 let (start, _len) = chunk_meta.byte_range();
884 match offset_index[idx].page_locations.first() {
885 Some(first) if first.offset as u64 != start => {
886 ranges.push(start as usize..first.offset as usize);
887 }
888 _ => (),
889 }
890
891 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
892 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
893
894 ranges
895 })
896 .collect();
897
898 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
899 let mut page_start_offsets = page_start_offsets.into_iter();
900
901 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
902 if chunk.is_some() || !projection.leaf_included(idx) {
903 continue;
904 }
905
906 if let Some(offsets) = page_start_offsets.next() {
907 let mut chunks = Vec::with_capacity(offsets.len());
908 for _ in 0..offsets.len() {
909 chunks.push(chunk_data.next().unwrap());
910 }
911
912 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
913 length: self.metadata.column(idx).byte_range().1 as usize,
914 data: offsets.into_iter().zip(chunks.into_iter()).collect(),
915 }))
916 }
917 }
918 } else {
919 let fetch_ranges = self
920 .column_chunks
921 .iter()
922 .enumerate()
923 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
924 .map(|(idx, _chunk)| {
925 let column = self.metadata.column(idx);
926 let (start, length) = column.byte_range();
927 start as usize..(start + length) as usize
928 })
929 .collect();
930
931 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
932
933 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
934 if chunk.is_some() || !projection.leaf_included(idx) {
935 continue;
936 }
937
938 if let Some(data) = chunk_data.next() {
939 *chunk = Some(Arc::new(ColumnChunkData::Dense {
940 offset: self.metadata.column(idx).byte_range().0 as usize,
941 data,
942 }));
943 }
944 }
945 }
946
947 Ok(())
948 }
949}
950
951impl RowGroups for InMemoryRowGroup<'_> {
952 fn num_rows(&self) -> usize {
953 self.row_count
954 }
955
956 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
957 match &self.column_chunks[i] {
958 None => Err(ParquetError::General(format!(
959 "Invalid column index {i}, column was not fetched"
960 ))),
961 Some(data) => {
962 let page_locations = self
963 .offset_index
964 .filter(|index| !index.is_empty())
966 .map(|index| index[i].page_locations.clone());
967 let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
968 data.clone(),
969 self.metadata.column(i),
970 self.row_count,
971 page_locations,
972 )?);
973
974 Ok(Box::new(ColumnChunkIterator {
975 reader: Some(Ok(page_reader)),
976 }))
977 }
978 }
979 }
980}
981
982#[derive(Clone)]
984enum ColumnChunkData {
985 Sparse {
987 length: usize,
989 data: Vec<(usize, Bytes)>,
992 },
993 Dense { offset: usize, data: Bytes },
995}
996
997impl ColumnChunkData {
998 fn get(&self, start: u64) -> Result<Bytes> {
999 match &self {
1000 ColumnChunkData::Sparse { data, .. } => data
1001 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1002 .map(|idx| data[idx].1.clone())
1003 .map_err(|_| {
1004 ParquetError::General(format!(
1005 "Invalid offset in sparse column chunk data: {start}"
1006 ))
1007 }),
1008 ColumnChunkData::Dense { offset, data } => {
1009 let start = start as usize - *offset;
1010 Ok(data.slice(start..))
1011 }
1012 }
1013 }
1014}
1015
1016impl Length for ColumnChunkData {
1017 fn len(&self) -> u64 {
1018 match &self {
1019 ColumnChunkData::Sparse { length, .. } => *length as u64,
1020 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1021 }
1022 }
1023}
1024
1025impl ChunkReader for ColumnChunkData {
1026 type T = bytes::buf::Reader<Bytes>;
1027
1028 fn get_read(&self, start: u64) -> Result<Self::T> {
1029 Ok(self.get(start)?.reader())
1030 }
1031
1032 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1033 Ok(self.get(start)?.slice(..length))
1034 }
1035}
1036
1037struct ColumnChunkIterator {
1039 reader: Option<Result<Box<dyn PageReader>>>,
1040}
1041
1042impl Iterator for ColumnChunkIterator {
1043 type Item = Result<Box<dyn PageReader>>;
1044
1045 fn next(&mut self) -> Option<Self::Item> {
1046 self.reader.take()
1047 }
1048}
1049
1050impl PageIterator for ColumnChunkIterator {}
1051
1052#[cfg(test)]
1053mod tests {
1054 use super::*;
1055 use crate::arrow::arrow_reader::{
1056 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1057 };
1058 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1059 use crate::arrow::ArrowWriter;
1060 use crate::file::metadata::ParquetMetaDataReader;
1061 use crate::file::properties::WriterProperties;
1062 use arrow::compute::kernels::cmp::eq;
1063 use arrow::error::Result as ArrowResult;
1064 use arrow_array::builder::{ListBuilder, StringBuilder};
1065 use arrow_array::cast::AsArray;
1066 use arrow_array::types::Int32Type;
1067 use arrow_array::{
1068 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1069 StructArray, UInt64Array,
1070 };
1071 use arrow_schema::{DataType, Field, Schema};
1072 use futures::{StreamExt, TryStreamExt};
1073 use rand::{thread_rng, Rng};
1074 use std::collections::HashMap;
1075 use std::sync::{Arc, Mutex};
1076 use tempfile::tempfile;
1077
1078 #[derive(Clone)]
1079 struct TestReader {
1080 data: Bytes,
1081 metadata: Arc<ParquetMetaData>,
1082 requests: Arc<Mutex<Vec<Range<usize>>>>,
1083 }
1084
1085 impl AsyncFileReader for TestReader {
1086 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1087 self.requests.lock().unwrap().push(range.clone());
1088 futures::future::ready(Ok(self.data.slice(range))).boxed()
1089 }
1090
1091 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
1092 futures::future::ready(Ok(self.metadata.clone())).boxed()
1093 }
1094 }
1095
1096 #[tokio::test]
1097 async fn test_async_reader() {
1098 let testdata = arrow::util::test_util::parquet_test_data();
1099 let path = format!("{testdata}/alltypes_plain.parquet");
1100 let data = Bytes::from(std::fs::read(path).unwrap());
1101
1102 let metadata = ParquetMetaDataReader::new()
1103 .parse_and_finish(&data)
1104 .unwrap();
1105 let metadata = Arc::new(metadata);
1106
1107 assert_eq!(metadata.num_row_groups(), 1);
1108
1109 let async_reader = TestReader {
1110 data: data.clone(),
1111 metadata: metadata.clone(),
1112 requests: Default::default(),
1113 };
1114
1115 let requests = async_reader.requests.clone();
1116 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1117 .await
1118 .unwrap();
1119
1120 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1121 let stream = builder
1122 .with_projection(mask.clone())
1123 .with_batch_size(1024)
1124 .build()
1125 .unwrap();
1126
1127 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1128
1129 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1130 .unwrap()
1131 .with_projection(mask)
1132 .with_batch_size(104)
1133 .build()
1134 .unwrap()
1135 .collect::<ArrowResult<Vec<_>>>()
1136 .unwrap();
1137
1138 assert_eq!(async_batches, sync_batches);
1139
1140 let requests = requests.lock().unwrap();
1141 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1142 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1143
1144 assert_eq!(
1145 &requests[..],
1146 &[
1147 offset_1 as usize..(offset_1 + length_1) as usize,
1148 offset_2 as usize..(offset_2 + length_2) as usize
1149 ]
1150 );
1151 }
1152
1153 #[tokio::test]
1154 async fn test_async_reader_with_next_row_group() {
1155 let testdata = arrow::util::test_util::parquet_test_data();
1156 let path = format!("{testdata}/alltypes_plain.parquet");
1157 let data = Bytes::from(std::fs::read(path).unwrap());
1158
1159 let metadata = ParquetMetaDataReader::new()
1160 .parse_and_finish(&data)
1161 .unwrap();
1162 let metadata = Arc::new(metadata);
1163
1164 assert_eq!(metadata.num_row_groups(), 1);
1165
1166 let async_reader = TestReader {
1167 data: data.clone(),
1168 metadata: metadata.clone(),
1169 requests: Default::default(),
1170 };
1171
1172 let requests = async_reader.requests.clone();
1173 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1174 .await
1175 .unwrap();
1176
1177 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1178 let mut stream = builder
1179 .with_projection(mask.clone())
1180 .with_batch_size(1024)
1181 .build()
1182 .unwrap();
1183
1184 let mut readers = vec![];
1185 while let Some(reader) = stream.next_row_group().await.unwrap() {
1186 readers.push(reader);
1187 }
1188
1189 let async_batches: Vec<_> = readers
1190 .into_iter()
1191 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1192 .collect();
1193
1194 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1195 .unwrap()
1196 .with_projection(mask)
1197 .with_batch_size(104)
1198 .build()
1199 .unwrap()
1200 .collect::<ArrowResult<Vec<_>>>()
1201 .unwrap();
1202
1203 assert_eq!(async_batches, sync_batches);
1204
1205 let requests = requests.lock().unwrap();
1206 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1207 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1208
1209 assert_eq!(
1210 &requests[..],
1211 &[
1212 offset_1 as usize..(offset_1 + length_1) as usize,
1213 offset_2 as usize..(offset_2 + length_2) as usize
1214 ]
1215 );
1216 }
1217
1218 #[tokio::test]
1219 async fn test_async_reader_with_index() {
1220 let testdata = arrow::util::test_util::parquet_test_data();
1221 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1222 let data = Bytes::from(std::fs::read(path).unwrap());
1223
1224 let metadata = ParquetMetaDataReader::new()
1225 .parse_and_finish(&data)
1226 .unwrap();
1227 let metadata = Arc::new(metadata);
1228
1229 assert_eq!(metadata.num_row_groups(), 1);
1230
1231 let async_reader = TestReader {
1232 data: data.clone(),
1233 metadata: metadata.clone(),
1234 requests: Default::default(),
1235 };
1236
1237 let options = ArrowReaderOptions::new().with_page_index(true);
1238 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1239 .await
1240 .unwrap();
1241
1242 let metadata_with_index = builder.metadata();
1244
1245 let offset_index = metadata_with_index.offset_index().unwrap();
1247 let column_index = metadata_with_index.column_index().unwrap();
1248
1249 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1250 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1251
1252 let num_columns = metadata_with_index
1253 .file_metadata()
1254 .schema_descr()
1255 .num_columns();
1256
1257 offset_index
1259 .iter()
1260 .for_each(|x| assert_eq!(x.len(), num_columns));
1261 column_index
1262 .iter()
1263 .for_each(|x| assert_eq!(x.len(), num_columns));
1264
1265 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1266 let stream = builder
1267 .with_projection(mask.clone())
1268 .with_batch_size(1024)
1269 .build()
1270 .unwrap();
1271
1272 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1273
1274 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1275 .unwrap()
1276 .with_projection(mask)
1277 .with_batch_size(1024)
1278 .build()
1279 .unwrap()
1280 .collect::<ArrowResult<Vec<_>>>()
1281 .unwrap();
1282
1283 assert_eq!(async_batches, sync_batches);
1284 }
1285
1286 #[tokio::test]
1287 async fn test_async_reader_with_limit() {
1288 let testdata = arrow::util::test_util::parquet_test_data();
1289 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1290 let data = Bytes::from(std::fs::read(path).unwrap());
1291
1292 let metadata = ParquetMetaDataReader::new()
1293 .parse_and_finish(&data)
1294 .unwrap();
1295 let metadata = Arc::new(metadata);
1296
1297 assert_eq!(metadata.num_row_groups(), 1);
1298
1299 let async_reader = TestReader {
1300 data: data.clone(),
1301 metadata: metadata.clone(),
1302 requests: Default::default(),
1303 };
1304
1305 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1306 .await
1307 .unwrap();
1308
1309 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1310 let stream = builder
1311 .with_projection(mask.clone())
1312 .with_batch_size(1024)
1313 .with_limit(1)
1314 .build()
1315 .unwrap();
1316
1317 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1318
1319 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1320 .unwrap()
1321 .with_projection(mask)
1322 .with_batch_size(1024)
1323 .with_limit(1)
1324 .build()
1325 .unwrap()
1326 .collect::<ArrowResult<Vec<_>>>()
1327 .unwrap();
1328
1329 assert_eq!(async_batches, sync_batches);
1330 }
1331
1332 #[tokio::test]
1333 async fn test_async_reader_skip_pages() {
1334 let testdata = arrow::util::test_util::parquet_test_data();
1335 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1336 let data = Bytes::from(std::fs::read(path).unwrap());
1337
1338 let metadata = ParquetMetaDataReader::new()
1339 .parse_and_finish(&data)
1340 .unwrap();
1341 let metadata = Arc::new(metadata);
1342
1343 assert_eq!(metadata.num_row_groups(), 1);
1344
1345 let async_reader = TestReader {
1346 data: data.clone(),
1347 metadata: metadata.clone(),
1348 requests: Default::default(),
1349 };
1350
1351 let options = ArrowReaderOptions::new().with_page_index(true);
1352 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1353 .await
1354 .unwrap();
1355
1356 let selection = RowSelection::from(vec![
1357 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1366
1367 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1368
1369 let stream = builder
1370 .with_projection(mask.clone())
1371 .with_row_selection(selection.clone())
1372 .build()
1373 .expect("building stream");
1374
1375 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1376
1377 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1378 .unwrap()
1379 .with_projection(mask)
1380 .with_batch_size(1024)
1381 .with_row_selection(selection)
1382 .build()
1383 .unwrap()
1384 .collect::<ArrowResult<Vec<_>>>()
1385 .unwrap();
1386
1387 assert_eq!(async_batches, sync_batches);
1388 }
1389
1390 #[tokio::test]
1391 async fn test_fuzz_async_reader_selection() {
1392 let testdata = arrow::util::test_util::parquet_test_data();
1393 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1394 let data = Bytes::from(std::fs::read(path).unwrap());
1395
1396 let metadata = ParquetMetaDataReader::new()
1397 .parse_and_finish(&data)
1398 .unwrap();
1399 let metadata = Arc::new(metadata);
1400
1401 assert_eq!(metadata.num_row_groups(), 1);
1402
1403 let mut rand = thread_rng();
1404
1405 for _ in 0..100 {
1406 let mut expected_rows = 0;
1407 let mut total_rows = 0;
1408 let mut skip = false;
1409 let mut selectors = vec![];
1410
1411 while total_rows < 7300 {
1412 let row_count: usize = rand.gen_range(1..100);
1413
1414 let row_count = row_count.min(7300 - total_rows);
1415
1416 selectors.push(RowSelector { row_count, skip });
1417
1418 total_rows += row_count;
1419 if !skip {
1420 expected_rows += row_count;
1421 }
1422
1423 skip = !skip;
1424 }
1425
1426 let selection = RowSelection::from(selectors);
1427
1428 let async_reader = TestReader {
1429 data: data.clone(),
1430 metadata: metadata.clone(),
1431 requests: Default::default(),
1432 };
1433
1434 let options = ArrowReaderOptions::new().with_page_index(true);
1435 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1436 .await
1437 .unwrap();
1438
1439 let col_idx: usize = rand.gen_range(0..13);
1440 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1441
1442 let stream = builder
1443 .with_projection(mask.clone())
1444 .with_row_selection(selection.clone())
1445 .build()
1446 .expect("building stream");
1447
1448 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1449
1450 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1451
1452 assert_eq!(actual_rows, expected_rows);
1453 }
1454 }
1455
1456 #[tokio::test]
1457 async fn test_async_reader_zero_row_selector() {
1458 let testdata = arrow::util::test_util::parquet_test_data();
1460 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1461 let data = Bytes::from(std::fs::read(path).unwrap());
1462
1463 let metadata = ParquetMetaDataReader::new()
1464 .parse_and_finish(&data)
1465 .unwrap();
1466 let metadata = Arc::new(metadata);
1467
1468 assert_eq!(metadata.num_row_groups(), 1);
1469
1470 let mut rand = thread_rng();
1471
1472 let mut expected_rows = 0;
1473 let mut total_rows = 0;
1474 let mut skip = false;
1475 let mut selectors = vec![];
1476
1477 selectors.push(RowSelector {
1478 row_count: 0,
1479 skip: false,
1480 });
1481
1482 while total_rows < 7300 {
1483 let row_count: usize = rand.gen_range(1..100);
1484
1485 let row_count = row_count.min(7300 - total_rows);
1486
1487 selectors.push(RowSelector { row_count, skip });
1488
1489 total_rows += row_count;
1490 if !skip {
1491 expected_rows += row_count;
1492 }
1493
1494 skip = !skip;
1495 }
1496
1497 let selection = RowSelection::from(selectors);
1498
1499 let async_reader = TestReader {
1500 data: data.clone(),
1501 metadata: metadata.clone(),
1502 requests: Default::default(),
1503 };
1504
1505 let options = ArrowReaderOptions::new().with_page_index(true);
1506 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1507 .await
1508 .unwrap();
1509
1510 let col_idx: usize = rand.gen_range(0..13);
1511 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1512
1513 let stream = builder
1514 .with_projection(mask.clone())
1515 .with_row_selection(selection.clone())
1516 .build()
1517 .expect("building stream");
1518
1519 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1520
1521 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1522
1523 assert_eq!(actual_rows, expected_rows);
1524 }
1525
1526 #[tokio::test]
1527 async fn test_row_filter() {
1528 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1529 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1530 let c = Int32Array::from_iter(0..6);
1531 let data = RecordBatch::try_from_iter([
1532 ("a", Arc::new(a) as ArrayRef),
1533 ("b", Arc::new(b) as ArrayRef),
1534 ("c", Arc::new(c) as ArrayRef),
1535 ])
1536 .unwrap();
1537
1538 let mut buf = Vec::with_capacity(1024);
1539 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1540 writer.write(&data).unwrap();
1541 writer.close().unwrap();
1542
1543 let data: Bytes = buf.into();
1544 let metadata = ParquetMetaDataReader::new()
1545 .parse_and_finish(&data)
1546 .unwrap();
1547 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1548
1549 let test = TestReader {
1550 data,
1551 metadata: Arc::new(metadata),
1552 requests: Default::default(),
1553 };
1554 let requests = test.requests.clone();
1555
1556 let a_scalar = StringArray::from_iter_values(["b"]);
1557 let a_filter = ArrowPredicateFn::new(
1558 ProjectionMask::leaves(&parquet_schema, vec![0]),
1559 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1560 );
1561
1562 let b_scalar = StringArray::from_iter_values(["4"]);
1563 let b_filter = ArrowPredicateFn::new(
1564 ProjectionMask::leaves(&parquet_schema, vec![1]),
1565 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1566 );
1567
1568 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1569
1570 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1571 let stream = ParquetRecordBatchStreamBuilder::new(test)
1572 .await
1573 .unwrap()
1574 .with_projection(mask.clone())
1575 .with_batch_size(1024)
1576 .with_row_filter(filter)
1577 .build()
1578 .unwrap();
1579
1580 let batches: Vec<_> = stream.try_collect().await.unwrap();
1581 assert_eq!(batches.len(), 1);
1582
1583 let batch = &batches[0];
1584 assert_eq!(batch.num_rows(), 1);
1585 assert_eq!(batch.num_columns(), 2);
1586
1587 let col = batch.column(0);
1588 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1589 assert_eq!(val, "b");
1590
1591 let col = batch.column(1);
1592 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1593 assert_eq!(val, 3);
1594
1595 assert_eq!(requests.lock().unwrap().len(), 3);
1597 }
1598
1599 #[tokio::test]
1600 async fn test_limit_multiple_row_groups() {
1601 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1602 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1603 let c = Int32Array::from_iter(0..6);
1604 let data = RecordBatch::try_from_iter([
1605 ("a", Arc::new(a) as ArrayRef),
1606 ("b", Arc::new(b) as ArrayRef),
1607 ("c", Arc::new(c) as ArrayRef),
1608 ])
1609 .unwrap();
1610
1611 let mut buf = Vec::with_capacity(1024);
1612 let props = WriterProperties::builder()
1613 .set_max_row_group_size(3)
1614 .build();
1615 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1616 writer.write(&data).unwrap();
1617 writer.close().unwrap();
1618
1619 let data: Bytes = buf.into();
1620 let metadata = ParquetMetaDataReader::new()
1621 .parse_and_finish(&data)
1622 .unwrap();
1623
1624 assert_eq!(metadata.num_row_groups(), 2);
1625
1626 let test = TestReader {
1627 data,
1628 metadata: Arc::new(metadata),
1629 requests: Default::default(),
1630 };
1631
1632 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1633 .await
1634 .unwrap()
1635 .with_batch_size(1024)
1636 .with_limit(4)
1637 .build()
1638 .unwrap();
1639
1640 let batches: Vec<_> = stream.try_collect().await.unwrap();
1641 assert_eq!(batches.len(), 2);
1643
1644 let batch = &batches[0];
1645 assert_eq!(batch.num_rows(), 3);
1647 assert_eq!(batch.num_columns(), 3);
1648 let col2 = batch.column(2).as_primitive::<Int32Type>();
1649 assert_eq!(col2.values(), &[0, 1, 2]);
1650
1651 let batch = &batches[1];
1652 assert_eq!(batch.num_rows(), 1);
1654 assert_eq!(batch.num_columns(), 3);
1655 let col2 = batch.column(2).as_primitive::<Int32Type>();
1656 assert_eq!(col2.values(), &[3]);
1657
1658 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1659 .await
1660 .unwrap()
1661 .with_offset(2)
1662 .with_limit(3)
1663 .build()
1664 .unwrap();
1665
1666 let batches: Vec<_> = stream.try_collect().await.unwrap();
1667 assert_eq!(batches.len(), 2);
1669
1670 let batch = &batches[0];
1671 assert_eq!(batch.num_rows(), 1);
1673 assert_eq!(batch.num_columns(), 3);
1674 let col2 = batch.column(2).as_primitive::<Int32Type>();
1675 assert_eq!(col2.values(), &[2]);
1676
1677 let batch = &batches[1];
1678 assert_eq!(batch.num_rows(), 2);
1680 assert_eq!(batch.num_columns(), 3);
1681 let col2 = batch.column(2).as_primitive::<Int32Type>();
1682 assert_eq!(col2.values(), &[3, 4]);
1683
1684 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1685 .await
1686 .unwrap()
1687 .with_offset(4)
1688 .with_limit(20)
1689 .build()
1690 .unwrap();
1691
1692 let batches: Vec<_> = stream.try_collect().await.unwrap();
1693 assert_eq!(batches.len(), 1);
1695
1696 let batch = &batches[0];
1697 assert_eq!(batch.num_rows(), 2);
1699 assert_eq!(batch.num_columns(), 3);
1700 let col2 = batch.column(2).as_primitive::<Int32Type>();
1701 assert_eq!(col2.values(), &[4, 5]);
1702 }
1703
1704 #[tokio::test]
1705 async fn test_row_filter_with_index() {
1706 let testdata = arrow::util::test_util::parquet_test_data();
1707 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1708 let data = Bytes::from(std::fs::read(path).unwrap());
1709
1710 let metadata = ParquetMetaDataReader::new()
1711 .parse_and_finish(&data)
1712 .unwrap();
1713 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1714 let metadata = Arc::new(metadata);
1715
1716 assert_eq!(metadata.num_row_groups(), 1);
1717
1718 let async_reader = TestReader {
1719 data: data.clone(),
1720 metadata: metadata.clone(),
1721 requests: Default::default(),
1722 };
1723
1724 let a_filter =
1725 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1726 Ok(batch.column(0).as_boolean().clone())
1727 });
1728
1729 let b_scalar = Int8Array::from(vec![2]);
1730 let b_filter = ArrowPredicateFn::new(
1731 ProjectionMask::leaves(&parquet_schema, vec![2]),
1732 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1733 );
1734
1735 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1736
1737 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1738
1739 let options = ArrowReaderOptions::new().with_page_index(true);
1740 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1741 .await
1742 .unwrap()
1743 .with_projection(mask.clone())
1744 .with_batch_size(1024)
1745 .with_row_filter(filter)
1746 .build()
1747 .unwrap();
1748
1749 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1750
1751 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1752
1753 assert_eq!(total_rows, 730);
1754 }
1755
1756 #[tokio::test]
1757 async fn test_in_memory_row_group_sparse() {
1758 let testdata = arrow::util::test_util::parquet_test_data();
1759 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1760 let data = Bytes::from(std::fs::read(path).unwrap());
1761
1762 let metadata = ParquetMetaDataReader::new()
1763 .with_page_indexes(true)
1764 .parse_and_finish(&data)
1765 .unwrap();
1766
1767 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1768
1769 let mut metadata_builder = metadata.into_builder();
1770 let mut row_groups = metadata_builder.take_row_groups();
1771 row_groups.truncate(1);
1772 let row_group_meta = row_groups.pop().unwrap();
1773
1774 let metadata = metadata_builder
1775 .add_row_group(row_group_meta)
1776 .set_column_index(None)
1777 .set_offset_index(Some(vec![offset_index.clone()]))
1778 .build();
1779
1780 let metadata = Arc::new(metadata);
1781
1782 let num_rows = metadata.row_group(0).num_rows();
1783
1784 assert_eq!(metadata.num_row_groups(), 1);
1785
1786 let async_reader = TestReader {
1787 data: data.clone(),
1788 metadata: metadata.clone(),
1789 requests: Default::default(),
1790 };
1791
1792 let requests = async_reader.requests.clone();
1793 let (_, fields) = parquet_to_arrow_schema_and_fields(
1794 metadata.file_metadata().schema_descr(),
1795 ProjectionMask::all(),
1796 None,
1797 )
1798 .unwrap();
1799
1800 let _schema_desc = metadata.file_metadata().schema_descr();
1801
1802 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1803
1804 let reader_factory = ReaderFactory {
1805 metadata,
1806 fields: fields.map(Arc::new),
1807 input: async_reader,
1808 filter: None,
1809 limit: None,
1810 offset: None,
1811 };
1812
1813 let mut skip = true;
1814 let mut pages = offset_index[0].page_locations.iter().peekable();
1815
1816 let mut selectors = vec![];
1818 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1819 while let Some(page) = pages.next() {
1820 let num_rows = if let Some(next_page) = pages.peek() {
1821 next_page.first_row_index - page.first_row_index
1822 } else {
1823 num_rows - page.first_row_index
1824 };
1825
1826 if skip {
1827 selectors.push(RowSelector::skip(num_rows as usize));
1828 } else {
1829 selectors.push(RowSelector::select(num_rows as usize));
1830 let start = page.offset as usize;
1831 let end = start + page.compressed_page_size as usize;
1832 expected_page_requests.push(start..end);
1833 }
1834 skip = !skip;
1835 }
1836
1837 let selection = RowSelection::from(selectors);
1838
1839 let (_factory, _reader) = reader_factory
1840 .read_row_group(0, Some(selection), projection.clone(), 48)
1841 .await
1842 .expect("reading row group");
1843
1844 let requests = requests.lock().unwrap();
1845
1846 assert_eq!(&requests[..], &expected_page_requests)
1847 }
1848
1849 #[tokio::test]
1850 async fn test_batch_size_overallocate() {
1851 let testdata = arrow::util::test_util::parquet_test_data();
1852 let path = format!("{testdata}/alltypes_plain.parquet");
1854 let data = Bytes::from(std::fs::read(path).unwrap());
1855
1856 let metadata = ParquetMetaDataReader::new()
1857 .parse_and_finish(&data)
1858 .unwrap();
1859 let file_rows = metadata.file_metadata().num_rows() as usize;
1860 let metadata = Arc::new(metadata);
1861
1862 let async_reader = TestReader {
1863 data: data.clone(),
1864 metadata: metadata.clone(),
1865 requests: Default::default(),
1866 };
1867
1868 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1869 .await
1870 .unwrap();
1871
1872 let stream = builder
1873 .with_projection(ProjectionMask::all())
1874 .with_batch_size(1024)
1875 .build()
1876 .unwrap();
1877 assert_ne!(1024, file_rows);
1878 assert_eq!(stream.batch_size, file_rows);
1879 }
1880
1881 #[tokio::test]
1882 async fn test_get_row_group_column_bloom_filter_without_length() {
1883 let testdata = arrow::util::test_util::parquet_test_data();
1884 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1885 let data = Bytes::from(std::fs::read(path).unwrap());
1886 test_get_row_group_column_bloom_filter(data, false).await;
1887 }
1888
1889 #[tokio::test]
1890 async fn test_parquet_record_batch_stream_schema() {
1891 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1892 schema.flattened_fields().iter().map(|f| f.name()).collect()
1893 }
1894
1895 let mut metadata = HashMap::with_capacity(1);
1904 metadata.insert("key".to_string(), "value".to_string());
1905
1906 let nested_struct_array = StructArray::from(vec![
1907 (
1908 Arc::new(Field::new("d", DataType::Utf8, true)),
1909 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1910 ),
1911 (
1912 Arc::new(Field::new("e", DataType::Utf8, true)),
1913 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1914 ),
1915 ]);
1916 let struct_array = StructArray::from(vec![
1917 (
1918 Arc::new(Field::new("a", DataType::Int32, true)),
1919 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1920 ),
1921 (
1922 Arc::new(Field::new("b", DataType::UInt64, true)),
1923 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1924 ),
1925 (
1926 Arc::new(Field::new(
1927 "c",
1928 nested_struct_array.data_type().clone(),
1929 true,
1930 )),
1931 Arc::new(nested_struct_array) as ArrayRef,
1932 ),
1933 ]);
1934
1935 let schema =
1936 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1937 let record_batch = RecordBatch::from(struct_array)
1938 .with_schema(schema.clone())
1939 .unwrap();
1940
1941 let mut file = tempfile().unwrap();
1943 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1944 writer.write(&record_batch).unwrap();
1945 writer.close().unwrap();
1946
1947 let all_fields = ["a", "b", "c", "d", "e"];
1948 let projections = [
1950 (vec![], vec![]),
1951 (vec![0], vec!["a"]),
1952 (vec![0, 1], vec!["a", "b"]),
1953 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1954 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1955 ];
1956
1957 for (indices, expected_projected_names) in projections {
1959 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1960 assert_eq!(get_all_field_names(&builder), all_fields);
1962 assert_eq!(builder.metadata, metadata);
1963 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1965 assert_eq!(reader.metadata, HashMap::default());
1966 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1967 assert_eq!(batch.metadata, HashMap::default());
1968 };
1969
1970 let builder =
1971 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1972 let sync_builder_schema = builder.schema().clone();
1973 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1974 let mut reader = builder.with_projection(mask).build().unwrap();
1975 let sync_reader_schema = reader.schema();
1976 let batch = reader.next().unwrap().unwrap();
1977 let sync_batch_schema = batch.schema();
1978 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1979
1980 let file = tokio::fs::File::from(file.try_clone().unwrap());
1982 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1983 let async_builder_schema = builder.schema().clone();
1984 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1985 let mut reader = builder.with_projection(mask).build().unwrap();
1986 let async_reader_schema = reader.schema().clone();
1987 let batch = reader.next().await.unwrap().unwrap();
1988 let async_batch_schema = batch.schema();
1989 assert_schemas(
1990 async_builder_schema,
1991 async_reader_schema,
1992 async_batch_schema,
1993 );
1994 }
1995 }
1996
1997 #[tokio::test]
1998 async fn test_get_row_group_column_bloom_filter_with_length() {
1999 let testdata = arrow::util::test_util::parquet_test_data();
2001 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2002 let data = Bytes::from(std::fs::read(path).unwrap());
2003 let metadata = ParquetMetaDataReader::new()
2004 .parse_and_finish(&data)
2005 .unwrap();
2006 let metadata = Arc::new(metadata);
2007 let async_reader = TestReader {
2008 data: data.clone(),
2009 metadata: metadata.clone(),
2010 requests: Default::default(),
2011 };
2012 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2013 .await
2014 .unwrap();
2015 let schema = builder.schema().clone();
2016 let stream = builder.build().unwrap();
2017 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2018
2019 let mut parquet_data = Vec::new();
2020 let props = WriterProperties::builder()
2021 .set_bloom_filter_enabled(true)
2022 .build();
2023 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2024 for batch in batches {
2025 writer.write(&batch).unwrap();
2026 }
2027 writer.close().unwrap();
2028
2029 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2031 }
2032
2033 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2034 let metadata = ParquetMetaDataReader::new()
2035 .parse_and_finish(&data)
2036 .unwrap();
2037 let metadata = Arc::new(metadata);
2038
2039 assert_eq!(metadata.num_row_groups(), 1);
2040 let row_group = metadata.row_group(0);
2041 let column = row_group.column(0);
2042 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2043
2044 let async_reader = TestReader {
2045 data: data.clone(),
2046 metadata: metadata.clone(),
2047 requests: Default::default(),
2048 };
2049
2050 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2051 .await
2052 .unwrap();
2053
2054 let sbbf = builder
2055 .get_row_group_column_bloom_filter(0, 0)
2056 .await
2057 .unwrap()
2058 .unwrap();
2059 assert!(sbbf.check(&"Hello"));
2060 assert!(!sbbf.check(&"Hello_Not_Exists"));
2061 }
2062
2063 #[tokio::test]
2064 async fn test_nested_skip() {
2065 let schema = Arc::new(Schema::new(vec![
2066 Field::new("col_1", DataType::UInt64, false),
2067 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2068 ]));
2069
2070 let props = WriterProperties::builder()
2072 .set_data_page_row_count_limit(256)
2073 .set_write_batch_size(256)
2074 .set_max_row_group_size(1024);
2075
2076 let mut file = tempfile().unwrap();
2078 let mut writer =
2079 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2080
2081 let mut builder = ListBuilder::new(StringBuilder::new());
2082 for id in 0..1024 {
2083 match id % 3 {
2084 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2085 1 => builder.append_value([Some(format!("id_{id}"))]),
2086 _ => builder.append_null(),
2087 }
2088 }
2089 let refs = vec![
2090 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2091 Arc::new(builder.finish()) as ArrayRef,
2092 ];
2093
2094 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2095 writer.write(&batch).unwrap();
2096 writer.close().unwrap();
2097
2098 let selections = [
2099 RowSelection::from(vec![
2100 RowSelector::skip(313),
2101 RowSelector::select(1),
2102 RowSelector::skip(709),
2103 RowSelector::select(1),
2104 ]),
2105 RowSelection::from(vec![
2106 RowSelector::skip(255),
2107 RowSelector::select(1),
2108 RowSelector::skip(767),
2109 RowSelector::select(1),
2110 ]),
2111 RowSelection::from(vec![
2112 RowSelector::select(255),
2113 RowSelector::skip(1),
2114 RowSelector::select(767),
2115 RowSelector::skip(1),
2116 ]),
2117 RowSelection::from(vec![
2118 RowSelector::skip(254),
2119 RowSelector::select(1),
2120 RowSelector::select(1),
2121 RowSelector::skip(767),
2122 RowSelector::select(1),
2123 ]),
2124 ];
2125
2126 for selection in selections {
2127 let expected = selection.row_count();
2128 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2130 tokio::fs::File::from_std(file.try_clone().unwrap()),
2131 ArrowReaderOptions::new().with_page_index(true),
2132 )
2133 .await
2134 .unwrap();
2135
2136 reader = reader.with_row_selection(selection);
2137
2138 let mut stream = reader.build().unwrap();
2139
2140 let mut total_rows = 0;
2141 while let Some(rb) = stream.next().await {
2142 let rb = rb.unwrap();
2143 total_rows += rb.num_rows();
2144 }
2145 assert_eq!(total_rows, expected);
2146 }
2147 }
2148
2149 #[tokio::test]
2150 async fn test_row_filter_nested() {
2151 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2152 let b = StructArray::from(vec![
2153 (
2154 Arc::new(Field::new("aa", DataType::Utf8, true)),
2155 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2156 ),
2157 (
2158 Arc::new(Field::new("bb", DataType::Utf8, true)),
2159 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2160 ),
2161 ]);
2162 let c = Int32Array::from_iter(0..6);
2163 let data = RecordBatch::try_from_iter([
2164 ("a", Arc::new(a) as ArrayRef),
2165 ("b", Arc::new(b) as ArrayRef),
2166 ("c", Arc::new(c) as ArrayRef),
2167 ])
2168 .unwrap();
2169
2170 let mut buf = Vec::with_capacity(1024);
2171 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2172 writer.write(&data).unwrap();
2173 writer.close().unwrap();
2174
2175 let data: Bytes = buf.into();
2176 let metadata = ParquetMetaDataReader::new()
2177 .parse_and_finish(&data)
2178 .unwrap();
2179 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2180
2181 let test = TestReader {
2182 data,
2183 metadata: Arc::new(metadata),
2184 requests: Default::default(),
2185 };
2186 let requests = test.requests.clone();
2187
2188 let a_scalar = StringArray::from_iter_values(["b"]);
2189 let a_filter = ArrowPredicateFn::new(
2190 ProjectionMask::leaves(&parquet_schema, vec![0]),
2191 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2192 );
2193
2194 let b_scalar = StringArray::from_iter_values(["4"]);
2195 let b_filter = ArrowPredicateFn::new(
2196 ProjectionMask::leaves(&parquet_schema, vec![2]),
2197 move |batch| {
2198 let struct_array = batch
2200 .column(0)
2201 .as_any()
2202 .downcast_ref::<StructArray>()
2203 .unwrap();
2204 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2205 },
2206 );
2207
2208 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2209
2210 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2211 let stream = ParquetRecordBatchStreamBuilder::new(test)
2212 .await
2213 .unwrap()
2214 .with_projection(mask.clone())
2215 .with_batch_size(1024)
2216 .with_row_filter(filter)
2217 .build()
2218 .unwrap();
2219
2220 let batches: Vec<_> = stream.try_collect().await.unwrap();
2221 assert_eq!(batches.len(), 1);
2222
2223 let batch = &batches[0];
2224 assert_eq!(batch.num_rows(), 1);
2225 assert_eq!(batch.num_columns(), 2);
2226
2227 let col = batch.column(0);
2228 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2229 assert_eq!(val, "b");
2230
2231 let col = batch.column(1);
2232 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2233 assert_eq!(val, 3);
2234
2235 assert_eq!(requests.lock().unwrap().len(), 3);
2237 }
2238
2239 #[tokio::test]
2240 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2241 use tokio::fs::File;
2242 let testdata = arrow::util::test_util::parquet_test_data();
2243 let path = format!("{testdata}/alltypes_plain.parquet");
2244 let mut file = File::open(&path).await.unwrap();
2245 let file_size = file.metadata().await.unwrap().len();
2246 let mut metadata = ParquetMetaDataReader::new()
2247 .with_page_indexes(true)
2248 .load_and_finish(&mut file, file_size as usize)
2249 .await
2250 .unwrap();
2251
2252 metadata.set_offset_index(Some(vec![]));
2253 let options = ArrowReaderOptions::new().with_page_index(true);
2254 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2255 let reader =
2256 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2257 .build()
2258 .unwrap();
2259
2260 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2261 assert_eq!(result.len(), 1);
2262 }
2263
2264 #[tokio::test]
2265 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2266 use tokio::fs::File;
2267 let testdata = arrow::util::test_util::parquet_test_data();
2268 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2269 let mut file = File::open(&path).await.unwrap();
2270 let file_size = file.metadata().await.unwrap().len();
2271 let metadata = ParquetMetaDataReader::new()
2272 .with_page_indexes(true)
2273 .load_and_finish(&mut file, file_size as usize)
2274 .await
2275 .unwrap();
2276
2277 let options = ArrowReaderOptions::new().with_page_index(true);
2278 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2279 let reader =
2280 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2281 .build()
2282 .unwrap();
2283
2284 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2285 assert_eq!(result.len(), 8);
2286 }
2287
2288 #[tokio::test]
2289 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2290 use tempfile::TempDir;
2291 use tokio::fs::File;
2292 fn write_metadata_to_local_file(
2293 metadata: ParquetMetaData,
2294 file: impl AsRef<std::path::Path>,
2295 ) {
2296 use crate::file::metadata::ParquetMetaDataWriter;
2297 use std::fs::File;
2298 let file = File::create(file).unwrap();
2299 ParquetMetaDataWriter::new(file, &metadata)
2300 .finish()
2301 .unwrap()
2302 }
2303
2304 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2305 use std::fs::File;
2306 let file = File::open(file).unwrap();
2307 ParquetMetaDataReader::new()
2308 .with_page_indexes(true)
2309 .parse_and_finish(&file)
2310 .unwrap()
2311 }
2312
2313 let testdata = arrow::util::test_util::parquet_test_data();
2314 let path = format!("{testdata}/alltypes_plain.parquet");
2315 let mut file = File::open(&path).await.unwrap();
2316 let file_size = file.metadata().await.unwrap().len();
2317 let metadata = ParquetMetaDataReader::new()
2318 .with_page_indexes(true)
2319 .load_and_finish(&mut file, file_size as usize)
2320 .await
2321 .unwrap();
2322
2323 let tempdir = TempDir::new().unwrap();
2324 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2325 write_metadata_to_local_file(metadata, &metadata_path);
2326 let metadata = read_metadata_from_local_file(&metadata_path);
2327
2328 let options = ArrowReaderOptions::new().with_page_index(true);
2329 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2330 let reader =
2331 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2332 .build()
2333 .unwrap();
2334
2335 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2337 assert_eq!(result.len(), 1);
2338 }
2339}