1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{build_array_reader, RowGroups};
42use crate::arrow::arrow_reader::{
43 apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
44 ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
57
58mod metadata;
59pub use metadata::*;
60
61#[cfg(feature = "object_store")]
62mod store;
63
64use crate::arrow::schema::ParquetField;
65#[cfg(feature = "object_store")]
66pub use store::*;
67
68pub trait AsyncFileReader: Send {
82 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
84
85 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
87 async move {
88 let mut result = Vec::with_capacity(ranges.len());
89
90 for range in ranges.into_iter() {
91 let data = self.get_bytes(range).await?;
92 result.push(data);
93 }
94
95 Ok(result)
96 }
97 .boxed()
98 }
99
100 fn get_metadata<'a>(
117 &'a mut self,
118 options: Option<&'a ArrowReaderOptions>,
119 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
120}
121
122impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
124 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
125 self.as_mut().get_bytes(range)
126 }
127
128 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
129 self.as_mut().get_byte_ranges(ranges)
130 }
131
132 fn get_metadata<'a>(
133 &'a mut self,
134 options: Option<&'a ArrowReaderOptions>,
135 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
136 self.as_mut().get_metadata(options)
137 }
138}
139
140impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
141 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
142 async move {
143 self.seek(SeekFrom::End(-(suffix as i64))).await?;
144 let mut buf = Vec::with_capacity(suffix);
145 self.take(suffix as _).read_to_end(&mut buf).await?;
146 Ok(buf.into())
147 }
148 .boxed()
149 }
150}
151
152impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
153 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
154 async move {
155 self.seek(SeekFrom::Start(range.start)).await?;
156
157 let to_read = range.end - range.start;
158 let mut buffer = Vec::with_capacity(to_read.try_into()?);
159 let read = self.take(to_read).read_to_end(&mut buffer).await?;
160 if read as u64 != to_read {
161 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
162 }
163
164 Ok(buffer.into())
165 }
166 .boxed()
167 }
168
169 fn get_metadata<'a>(
170 &'a mut self,
171 options: Option<&'a ArrowReaderOptions>,
172 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
173 async move {
174 let metadata_reader = ParquetMetaDataReader::new()
175 .with_page_indexes(options.is_some_and(|o| o.page_index));
176
177 #[cfg(feature = "encryption")]
178 let metadata_reader = metadata_reader.with_decryption_properties(
179 options.and_then(|o| o.file_decryption_properties.as_ref()),
180 );
181
182 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183 Ok(Arc::new(parquet_metadata))
184 }
185 .boxed()
186 }
187}
188
189impl ArrowReaderMetadata {
190 pub async fn load_async<T: AsyncFileReader>(
194 input: &mut T,
195 options: ArrowReaderOptions,
196 ) -> Result<Self> {
197 let metadata = input.get_metadata(Some(&options)).await?;
198 Self::try_new(metadata, options)
199 }
200}
201
202#[doc(hidden)]
203pub struct AsyncReader<T>(T);
208
209pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224 pub async fn new(input: T) -> Result<Self> {
355 Self::new_with_options(input, Default::default()).await
356 }
357
358 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362 Ok(Self::new_with_metadata(input, metadata))
363 }
364
365 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411 Self::new_builder(AsyncReader(input), metadata)
412 }
413
414 pub async fn get_row_group_column_bloom_filter(
420 &mut self,
421 row_group_idx: usize,
422 column_idx: usize,
423 ) -> Result<Option<Sbbf>> {
424 let metadata = self.metadata.row_group(row_group_idx);
425 let column_metadata = metadata.column(column_idx);
426
427 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428 offset
429 .try_into()
430 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431 } else {
432 return Ok(None);
433 };
434
435 let buffer = match column_metadata.bloom_filter_length() {
436 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437 None => self
438 .input
439 .0
440 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441 }
442 .await?;
443
444 let (header, bitset_offset) =
445 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447 match header.algorithm {
448 BloomFilterAlgorithm::BLOCK(_) => {
449 }
451 }
452 match header.compression {
453 BloomFilterCompression::UNCOMPRESSED(_) => {
454 }
456 }
457 match header.hash {
458 BloomFilterHash::XXHASH(_) => {
459 }
461 }
462
463 let bitset = match column_metadata.bloom_filter_length() {
464 Some(_) => buffer.slice(
465 (TryInto::<usize>::try_into(bitset_offset).unwrap()
466 - TryInto::<usize>::try_into(offset).unwrap())..,
467 ),
468 None => {
469 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470 ParquetError::General("Bloom filter length is invalid".to_string())
471 })?;
472 self.input
473 .0
474 .get_bytes(bitset_offset..bitset_offset + bitset_length)
475 .await?
476 }
477 };
478 Ok(Some(Sbbf::new(&bitset)))
479 }
480
481 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485 let num_row_groups = self.metadata.row_groups().len();
486
487 let row_groups = match self.row_groups {
488 Some(row_groups) => {
489 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
490 return Err(general_err!(
491 "row group {} out of bounds 0..{}",
492 col,
493 num_row_groups
494 ));
495 }
496 row_groups.into()
497 }
498 None => (0..self.metadata.row_groups().len()).collect(),
499 };
500
501 let batch_size = self
503 .batch_size
504 .min(self.metadata.file_metadata().num_rows() as usize);
505 let reader_factory = ReaderFactory {
506 input: self.input.0,
507 filter: self.filter,
508 metadata: self.metadata.clone(),
509 fields: self.fields,
510 limit: self.limit,
511 offset: self.offset,
512 };
513
514 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
517 Some(DataType::Struct(fields)) => {
518 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
519 }
520 None => Fields::empty(),
521 _ => unreachable!("Must be Struct for root type"),
522 };
523 let schema = Arc::new(Schema::new(projected_fields));
524
525 Ok(ParquetRecordBatchStream {
526 metadata: self.metadata,
527 batch_size,
528 row_groups,
529 projection: self.projection,
530 selection: self.selection,
531 schema,
532 reader_factory: Some(reader_factory),
533 state: StreamState::Init,
534 })
535 }
536}
537
538type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
539
540struct ReaderFactory<T> {
543 metadata: Arc<ParquetMetaData>,
544
545 fields: Option<Arc<ParquetField>>,
546
547 input: T,
548
549 filter: Option<RowFilter>,
550
551 limit: Option<usize>,
552
553 offset: Option<usize>,
554}
555
556impl<T> ReaderFactory<T>
557where
558 T: AsyncFileReader + Send,
559{
560 async fn read_row_group(
564 mut self,
565 row_group_idx: usize,
566 mut selection: Option<RowSelection>,
567 projection: ProjectionMask,
568 batch_size: usize,
569 ) -> ReadResult<T> {
570 let meta = self.metadata.row_group(row_group_idx);
573 let offset_index = self
574 .metadata
575 .offset_index()
576 .filter(|index| !index.is_empty())
578 .map(|x| x[row_group_idx].as_slice());
579
580 let mut row_group = InMemoryRowGroup {
581 row_count: meta.num_rows() as usize,
583 column_chunks: vec![None; meta.columns().len()],
584 offset_index,
585 row_group_idx,
586 metadata: self.metadata.as_ref(),
587 };
588
589 if let Some(filter) = self.filter.as_mut() {
591 for predicate in filter.predicates.iter_mut() {
592 if !selects_any(selection.as_ref()) {
593 return Ok((self, None));
594 }
595
596 let predicate_projection = predicate.projection();
597 row_group
598 .fetch(&mut self.input, predicate_projection, selection.as_ref())
599 .await?;
600
601 let array_reader =
602 build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
603
604 selection = Some(evaluate_predicate(
605 batch_size,
606 array_reader,
607 selection,
608 predicate.as_mut(),
609 )?);
610 }
611 }
612
613 let rows_before = selection
615 .as_ref()
616 .map(|s| s.row_count())
617 .unwrap_or(row_group.row_count);
618
619 if rows_before == 0 {
620 return Ok((self, None));
621 }
622
623 selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
624
625 let rows_after = selection
627 .as_ref()
628 .map(|s| s.row_count())
629 .unwrap_or(row_group.row_count);
630
631 if let Some(offset) = &mut self.offset {
633 *offset = offset.saturating_sub(rows_before - rows_after)
636 }
637
638 if rows_after == 0 {
639 return Ok((self, None));
640 }
641
642 if let Some(limit) = &mut self.limit {
643 *limit -= rows_after;
644 }
645
646 row_group
647 .fetch(&mut self.input, &projection, selection.as_ref())
648 .await?;
649
650 let reader = ParquetRecordBatchReader::new(
651 batch_size,
652 build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
653 selection,
654 );
655
656 Ok((self, Some(reader)))
657 }
658}
659
660enum StreamState<T> {
661 Init,
663 Decoding(ParquetRecordBatchReader),
665 Reading(BoxFuture<'static, ReadResult<T>>),
667 Error,
669}
670
671impl<T> std::fmt::Debug for StreamState<T> {
672 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
673 match self {
674 StreamState::Init => write!(f, "StreamState::Init"),
675 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
676 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
677 StreamState::Error => write!(f, "StreamState::Error"),
678 }
679 }
680}
681
682pub struct ParquetRecordBatchStream<T> {
700 metadata: Arc<ParquetMetaData>,
701
702 schema: SchemaRef,
703
704 row_groups: VecDeque<usize>,
705
706 projection: ProjectionMask,
707
708 batch_size: usize,
709
710 selection: Option<RowSelection>,
711
712 reader_factory: Option<ReaderFactory<T>>,
714
715 state: StreamState<T>,
716}
717
718impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
719 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
720 f.debug_struct("ParquetRecordBatchStream")
721 .field("metadata", &self.metadata)
722 .field("schema", &self.schema)
723 .field("batch_size", &self.batch_size)
724 .field("projection", &self.projection)
725 .field("state", &self.state)
726 .finish()
727 }
728}
729
730impl<T> ParquetRecordBatchStream<T> {
731 pub fn schema(&self) -> &SchemaRef {
736 &self.schema
737 }
738}
739
740impl<T> ParquetRecordBatchStream<T>
741where
742 T: AsyncFileReader + Unpin + Send + 'static,
743{
744 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
758 loop {
759 match &mut self.state {
760 StreamState::Decoding(_) | StreamState::Reading(_) => {
761 return Err(ParquetError::General(
762 "Cannot combine the use of next_row_group with the Stream API".to_string(),
763 ))
764 }
765 StreamState::Init => {
766 let row_group_idx = match self.row_groups.pop_front() {
767 Some(idx) => idx,
768 None => return Ok(None),
769 };
770
771 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
772
773 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
774
775 let reader_factory = self.reader_factory.take().expect("lost reader factory");
776
777 let (reader_factory, maybe_reader) = reader_factory
778 .read_row_group(
779 row_group_idx,
780 selection,
781 self.projection.clone(),
782 self.batch_size,
783 )
784 .await
785 .inspect_err(|_| {
786 self.state = StreamState::Error;
787 })?;
788 self.reader_factory = Some(reader_factory);
789
790 if let Some(reader) = maybe_reader {
791 return Ok(Some(reader));
792 } else {
793 continue;
795 }
796 }
797 StreamState::Error => return Ok(None), }
799 }
800 }
801}
802
803impl<T> Stream for ParquetRecordBatchStream<T>
804where
805 T: AsyncFileReader + Unpin + Send + 'static,
806{
807 type Item = Result<RecordBatch>;
808
809 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
810 loop {
811 match &mut self.state {
812 StreamState::Decoding(batch_reader) => match batch_reader.next() {
813 Some(Ok(batch)) => {
814 return Poll::Ready(Some(Ok(batch)));
815 }
816 Some(Err(e)) => {
817 self.state = StreamState::Error;
818 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
819 }
820 None => self.state = StreamState::Init,
821 },
822 StreamState::Init => {
823 let row_group_idx = match self.row_groups.pop_front() {
824 Some(idx) => idx,
825 None => return Poll::Ready(None),
826 };
827
828 let reader = self.reader_factory.take().expect("lost reader factory");
829
830 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
831
832 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
833
834 let fut = reader
835 .read_row_group(
836 row_group_idx,
837 selection,
838 self.projection.clone(),
839 self.batch_size,
840 )
841 .boxed();
842
843 self.state = StreamState::Reading(fut)
844 }
845 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
846 Ok((reader_factory, maybe_reader)) => {
847 self.reader_factory = Some(reader_factory);
848 match maybe_reader {
849 Some(reader) => self.state = StreamState::Decoding(reader),
851 None => self.state = StreamState::Init,
853 }
854 }
855 Err(e) => {
856 self.state = StreamState::Error;
857 return Poll::Ready(Some(Err(e)));
858 }
859 },
860 StreamState::Error => return Poll::Ready(None), }
862 }
863 }
864}
865
866struct InMemoryRowGroup<'a> {
868 offset_index: Option<&'a [OffsetIndexMetaData]>,
869 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
871 row_count: usize,
872 row_group_idx: usize,
873 metadata: &'a ParquetMetaData,
874}
875
876impl InMemoryRowGroup<'_> {
877 async fn fetch<T: AsyncFileReader + Send>(
883 &mut self,
884 input: &mut T,
885 projection: &ProjectionMask,
886 selection: Option<&RowSelection>,
887 ) -> Result<()> {
888 let metadata = self.metadata.row_group(self.row_group_idx);
889 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
890 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
893
894 let fetch_ranges = self
895 .column_chunks
896 .iter()
897 .zip(metadata.columns())
898 .enumerate()
899 .filter(|&(idx, (chunk, _chunk_meta))| {
900 chunk.is_none() && projection.leaf_included(idx)
901 })
902 .flat_map(|(idx, (_chunk, chunk_meta))| {
903 let mut ranges: Vec<Range<u64>> = vec![];
906 let (start, _len) = chunk_meta.byte_range();
907 match offset_index[idx].page_locations.first() {
908 Some(first) if first.offset as u64 != start => {
909 ranges.push(start..first.offset as u64);
910 }
911 _ => (),
912 }
913
914 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
915 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
916
917 ranges
918 })
919 .collect();
920
921 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
922 let mut page_start_offsets = page_start_offsets.into_iter();
923
924 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
925 if chunk.is_some() || !projection.leaf_included(idx) {
926 continue;
927 }
928
929 if let Some(offsets) = page_start_offsets.next() {
930 let mut chunks = Vec::with_capacity(offsets.len());
931 for _ in 0..offsets.len() {
932 chunks.push(chunk_data.next().unwrap());
933 }
934
935 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
936 length: metadata.column(idx).byte_range().1 as usize,
937 data: offsets
938 .into_iter()
939 .map(|x| x as usize)
940 .zip(chunks.into_iter())
941 .collect(),
942 }))
943 }
944 }
945 } else {
946 let fetch_ranges = self
947 .column_chunks
948 .iter()
949 .enumerate()
950 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
951 .map(|(idx, _chunk)| {
952 let column = metadata.column(idx);
953 let (start, length) = column.byte_range();
954 start..(start + length)
955 })
956 .collect();
957
958 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
959
960 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
961 if chunk.is_some() || !projection.leaf_included(idx) {
962 continue;
963 }
964
965 if let Some(data) = chunk_data.next() {
966 *chunk = Some(Arc::new(ColumnChunkData::Dense {
967 offset: metadata.column(idx).byte_range().0 as usize,
968 data,
969 }));
970 }
971 }
972 }
973
974 Ok(())
975 }
976}
977
978impl RowGroups for InMemoryRowGroup<'_> {
979 fn num_rows(&self) -> usize {
980 self.row_count
981 }
982
983 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
985 match &self.column_chunks[i] {
986 None => Err(ParquetError::General(format!(
987 "Invalid column index {i}, column was not fetched"
988 ))),
989 Some(data) => {
990 let page_locations = self
991 .offset_index
992 .filter(|index| !index.is_empty())
994 .map(|index| index[i].page_locations.clone());
995 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
996 let page_reader = SerializedPageReader::new(
997 data.clone(),
998 column_chunk_metadata,
999 self.row_count,
1000 page_locations,
1001 )?;
1002 let page_reader = page_reader.add_crypto_context(
1003 self.row_group_idx,
1004 i,
1005 self.metadata,
1006 column_chunk_metadata,
1007 )?;
1008
1009 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1010
1011 Ok(Box::new(ColumnChunkIterator {
1012 reader: Some(Ok(page_reader)),
1013 }))
1014 }
1015 }
1016 }
1017}
1018
1019#[derive(Clone)]
1021enum ColumnChunkData {
1022 Sparse {
1024 length: usize,
1026 data: Vec<(usize, Bytes)>,
1031 },
1032 Dense { offset: usize, data: Bytes },
1034}
1035
1036impl ColumnChunkData {
1037 fn get(&self, start: u64) -> Result<Bytes> {
1039 match &self {
1040 ColumnChunkData::Sparse { data, .. } => data
1041 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1042 .map(|idx| data[idx].1.clone())
1043 .map_err(|_| {
1044 ParquetError::General(format!(
1045 "Invalid offset in sparse column chunk data: {start}"
1046 ))
1047 }),
1048 ColumnChunkData::Dense { offset, data } => {
1049 let start = start as usize - *offset;
1050 Ok(data.slice(start..))
1051 }
1052 }
1053 }
1054}
1055
1056impl Length for ColumnChunkData {
1057 fn len(&self) -> u64 {
1059 match &self {
1060 ColumnChunkData::Sparse { length, .. } => *length as u64,
1061 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1062 }
1063 }
1064}
1065
1066impl ChunkReader for ColumnChunkData {
1067 type T = bytes::buf::Reader<Bytes>;
1068
1069 fn get_read(&self, start: u64) -> Result<Self::T> {
1070 Ok(self.get(start)?.reader())
1071 }
1072
1073 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1074 Ok(self.get(start)?.slice(..length))
1075 }
1076}
1077
1078struct ColumnChunkIterator {
1080 reader: Option<Result<Box<dyn PageReader>>>,
1081}
1082
1083impl Iterator for ColumnChunkIterator {
1084 type Item = Result<Box<dyn PageReader>>;
1085
1086 fn next(&mut self) -> Option<Self::Item> {
1087 self.reader.take()
1088 }
1089}
1090
1091impl PageIterator for ColumnChunkIterator {}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096 use crate::arrow::arrow_reader::{
1097 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1098 };
1099 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1100 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1101 use crate::arrow::ArrowWriter;
1102 use crate::file::metadata::ParquetMetaDataReader;
1103 use crate::file::properties::WriterProperties;
1104 use arrow::compute::kernels::cmp::eq;
1105 use arrow::error::Result as ArrowResult;
1106 use arrow_array::builder::{ListBuilder, StringBuilder};
1107 use arrow_array::cast::AsArray;
1108 use arrow_array::types::Int32Type;
1109 use arrow_array::{
1110 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1111 StructArray, UInt64Array,
1112 };
1113 use arrow_schema::{DataType, Field, Schema};
1114 use futures::{StreamExt, TryStreamExt};
1115 use rand::{rng, Rng};
1116 use std::collections::HashMap;
1117 use std::sync::{Arc, Mutex};
1118 use tempfile::tempfile;
1119
1120 #[derive(Clone)]
1121 struct TestReader {
1122 data: Bytes,
1123 metadata: Option<Arc<ParquetMetaData>>,
1124 requests: Arc<Mutex<Vec<Range<usize>>>>,
1125 }
1126
1127 impl AsyncFileReader for TestReader {
1128 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1129 let range = range.clone();
1130 self.requests
1131 .lock()
1132 .unwrap()
1133 .push(range.start as usize..range.end as usize);
1134 futures::future::ready(Ok(self
1135 .data
1136 .slice(range.start as usize..range.end as usize)))
1137 .boxed()
1138 }
1139
1140 fn get_metadata<'a>(
1141 &'a mut self,
1142 options: Option<&'a ArrowReaderOptions>,
1143 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1144 let metadata_reader = ParquetMetaDataReader::new()
1145 .with_page_indexes(options.is_some_and(|o| o.page_index));
1146 self.metadata = Some(Arc::new(
1147 metadata_reader.parse_and_finish(&self.data).unwrap(),
1148 ));
1149 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1150 }
1151 }
1152
1153 #[tokio::test]
1154 async fn test_async_reader() {
1155 let testdata = arrow::util::test_util::parquet_test_data();
1156 let path = format!("{testdata}/alltypes_plain.parquet");
1157 let data = Bytes::from(std::fs::read(path).unwrap());
1158
1159 let async_reader = TestReader {
1160 data: data.clone(),
1161 metadata: Default::default(),
1162 requests: Default::default(),
1163 };
1164
1165 let requests = async_reader.requests.clone();
1166 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1167 .await
1168 .unwrap();
1169
1170 let metadata = builder.metadata().clone();
1171 assert_eq!(metadata.num_row_groups(), 1);
1172
1173 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1174 let stream = builder
1175 .with_projection(mask.clone())
1176 .with_batch_size(1024)
1177 .build()
1178 .unwrap();
1179
1180 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1181
1182 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1183 .unwrap()
1184 .with_projection(mask)
1185 .with_batch_size(104)
1186 .build()
1187 .unwrap()
1188 .collect::<ArrowResult<Vec<_>>>()
1189 .unwrap();
1190
1191 assert_eq!(async_batches, sync_batches);
1192
1193 let requests = requests.lock().unwrap();
1194 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1195 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1196
1197 assert_eq!(
1198 &requests[..],
1199 &[
1200 offset_1 as usize..(offset_1 + length_1) as usize,
1201 offset_2 as usize..(offset_2 + length_2) as usize
1202 ]
1203 );
1204 }
1205
1206 #[tokio::test]
1207 async fn test_async_reader_with_next_row_group() {
1208 let testdata = arrow::util::test_util::parquet_test_data();
1209 let path = format!("{testdata}/alltypes_plain.parquet");
1210 let data = Bytes::from(std::fs::read(path).unwrap());
1211
1212 let async_reader = TestReader {
1213 data: data.clone(),
1214 metadata: Default::default(),
1215 requests: Default::default(),
1216 };
1217
1218 let requests = async_reader.requests.clone();
1219 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1220 .await
1221 .unwrap();
1222
1223 let metadata = builder.metadata().clone();
1224 assert_eq!(metadata.num_row_groups(), 1);
1225
1226 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1227 let mut stream = builder
1228 .with_projection(mask.clone())
1229 .with_batch_size(1024)
1230 .build()
1231 .unwrap();
1232
1233 let mut readers = vec![];
1234 while let Some(reader) = stream.next_row_group().await.unwrap() {
1235 readers.push(reader);
1236 }
1237
1238 let async_batches: Vec<_> = readers
1239 .into_iter()
1240 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1241 .collect();
1242
1243 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1244 .unwrap()
1245 .with_projection(mask)
1246 .with_batch_size(104)
1247 .build()
1248 .unwrap()
1249 .collect::<ArrowResult<Vec<_>>>()
1250 .unwrap();
1251
1252 assert_eq!(async_batches, sync_batches);
1253
1254 let requests = requests.lock().unwrap();
1255 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1256 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1257
1258 assert_eq!(
1259 &requests[..],
1260 &[
1261 offset_1 as usize..(offset_1 + length_1) as usize,
1262 offset_2 as usize..(offset_2 + length_2) as usize
1263 ]
1264 );
1265 }
1266
1267 #[tokio::test]
1268 async fn test_async_reader_with_index() {
1269 let testdata = arrow::util::test_util::parquet_test_data();
1270 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1271 let data = Bytes::from(std::fs::read(path).unwrap());
1272
1273 let async_reader = TestReader {
1274 data: data.clone(),
1275 metadata: Default::default(),
1276 requests: Default::default(),
1277 };
1278
1279 let options = ArrowReaderOptions::new().with_page_index(true);
1280 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1281 .await
1282 .unwrap();
1283
1284 let metadata_with_index = builder.metadata();
1286 assert_eq!(metadata_with_index.num_row_groups(), 1);
1287
1288 let offset_index = metadata_with_index.offset_index().unwrap();
1290 let column_index = metadata_with_index.column_index().unwrap();
1291
1292 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1293 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1294
1295 let num_columns = metadata_with_index
1296 .file_metadata()
1297 .schema_descr()
1298 .num_columns();
1299
1300 offset_index
1302 .iter()
1303 .for_each(|x| assert_eq!(x.len(), num_columns));
1304 column_index
1305 .iter()
1306 .for_each(|x| assert_eq!(x.len(), num_columns));
1307
1308 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1309 let stream = builder
1310 .with_projection(mask.clone())
1311 .with_batch_size(1024)
1312 .build()
1313 .unwrap();
1314
1315 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1316
1317 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1318 .unwrap()
1319 .with_projection(mask)
1320 .with_batch_size(1024)
1321 .build()
1322 .unwrap()
1323 .collect::<ArrowResult<Vec<_>>>()
1324 .unwrap();
1325
1326 assert_eq!(async_batches, sync_batches);
1327 }
1328
1329 #[tokio::test]
1330 async fn test_async_reader_with_limit() {
1331 let testdata = arrow::util::test_util::parquet_test_data();
1332 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1333 let data = Bytes::from(std::fs::read(path).unwrap());
1334
1335 let metadata = ParquetMetaDataReader::new()
1336 .parse_and_finish(&data)
1337 .unwrap();
1338 let metadata = Arc::new(metadata);
1339
1340 assert_eq!(metadata.num_row_groups(), 1);
1341
1342 let async_reader = TestReader {
1343 data: data.clone(),
1344 metadata: Default::default(),
1345 requests: Default::default(),
1346 };
1347
1348 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1349 .await
1350 .unwrap();
1351
1352 assert_eq!(builder.metadata().num_row_groups(), 1);
1353
1354 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1355 let stream = builder
1356 .with_projection(mask.clone())
1357 .with_batch_size(1024)
1358 .with_limit(1)
1359 .build()
1360 .unwrap();
1361
1362 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1363
1364 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1365 .unwrap()
1366 .with_projection(mask)
1367 .with_batch_size(1024)
1368 .with_limit(1)
1369 .build()
1370 .unwrap()
1371 .collect::<ArrowResult<Vec<_>>>()
1372 .unwrap();
1373
1374 assert_eq!(async_batches, sync_batches);
1375 }
1376
1377 #[tokio::test]
1378 async fn test_async_reader_skip_pages() {
1379 let testdata = arrow::util::test_util::parquet_test_data();
1380 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1381 let data = Bytes::from(std::fs::read(path).unwrap());
1382
1383 let async_reader = TestReader {
1384 data: data.clone(),
1385 metadata: Default::default(),
1386 requests: Default::default(),
1387 };
1388
1389 let options = ArrowReaderOptions::new().with_page_index(true);
1390 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1391 .await
1392 .unwrap();
1393
1394 assert_eq!(builder.metadata().num_row_groups(), 1);
1395
1396 let selection = RowSelection::from(vec![
1397 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1406
1407 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1408
1409 let stream = builder
1410 .with_projection(mask.clone())
1411 .with_row_selection(selection.clone())
1412 .build()
1413 .expect("building stream");
1414
1415 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1416
1417 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1418 .unwrap()
1419 .with_projection(mask)
1420 .with_batch_size(1024)
1421 .with_row_selection(selection)
1422 .build()
1423 .unwrap()
1424 .collect::<ArrowResult<Vec<_>>>()
1425 .unwrap();
1426
1427 assert_eq!(async_batches, sync_batches);
1428 }
1429
1430 #[tokio::test]
1431 async fn test_fuzz_async_reader_selection() {
1432 let testdata = arrow::util::test_util::parquet_test_data();
1433 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1434 let data = Bytes::from(std::fs::read(path).unwrap());
1435
1436 let mut rand = rng();
1437
1438 for _ in 0..100 {
1439 let mut expected_rows = 0;
1440 let mut total_rows = 0;
1441 let mut skip = false;
1442 let mut selectors = vec![];
1443
1444 while total_rows < 7300 {
1445 let row_count: usize = rand.random_range(1..100);
1446
1447 let row_count = row_count.min(7300 - total_rows);
1448
1449 selectors.push(RowSelector { row_count, skip });
1450
1451 total_rows += row_count;
1452 if !skip {
1453 expected_rows += row_count;
1454 }
1455
1456 skip = !skip;
1457 }
1458
1459 let selection = RowSelection::from(selectors);
1460
1461 let async_reader = TestReader {
1462 data: data.clone(),
1463 metadata: Default::default(),
1464 requests: Default::default(),
1465 };
1466
1467 let options = ArrowReaderOptions::new().with_page_index(true);
1468 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1469 .await
1470 .unwrap();
1471
1472 assert_eq!(builder.metadata().num_row_groups(), 1);
1473
1474 let col_idx: usize = rand.random_range(0..13);
1475 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1476
1477 let stream = builder
1478 .with_projection(mask.clone())
1479 .with_row_selection(selection.clone())
1480 .build()
1481 .expect("building stream");
1482
1483 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1484
1485 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1486
1487 assert_eq!(actual_rows, expected_rows);
1488 }
1489 }
1490
1491 #[tokio::test]
1492 async fn test_async_reader_zero_row_selector() {
1493 let testdata = arrow::util::test_util::parquet_test_data();
1495 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1496 let data = Bytes::from(std::fs::read(path).unwrap());
1497
1498 let mut rand = rng();
1499
1500 let mut expected_rows = 0;
1501 let mut total_rows = 0;
1502 let mut skip = false;
1503 let mut selectors = vec![];
1504
1505 selectors.push(RowSelector {
1506 row_count: 0,
1507 skip: false,
1508 });
1509
1510 while total_rows < 7300 {
1511 let row_count: usize = rand.random_range(1..100);
1512
1513 let row_count = row_count.min(7300 - total_rows);
1514
1515 selectors.push(RowSelector { row_count, skip });
1516
1517 total_rows += row_count;
1518 if !skip {
1519 expected_rows += row_count;
1520 }
1521
1522 skip = !skip;
1523 }
1524
1525 let selection = RowSelection::from(selectors);
1526
1527 let async_reader = TestReader {
1528 data: data.clone(),
1529 metadata: Default::default(),
1530 requests: Default::default(),
1531 };
1532
1533 let options = ArrowReaderOptions::new().with_page_index(true);
1534 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1535 .await
1536 .unwrap();
1537
1538 assert_eq!(builder.metadata().num_row_groups(), 1);
1539
1540 let col_idx: usize = rand.random_range(0..13);
1541 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1542
1543 let stream = builder
1544 .with_projection(mask.clone())
1545 .with_row_selection(selection.clone())
1546 .build()
1547 .expect("building stream");
1548
1549 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1550
1551 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1552
1553 assert_eq!(actual_rows, expected_rows);
1554 }
1555
1556 #[tokio::test]
1557 async fn test_row_filter() {
1558 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1559 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1560 let c = Int32Array::from_iter(0..6);
1561 let data = RecordBatch::try_from_iter([
1562 ("a", Arc::new(a) as ArrayRef),
1563 ("b", Arc::new(b) as ArrayRef),
1564 ("c", Arc::new(c) as ArrayRef),
1565 ])
1566 .unwrap();
1567
1568 let mut buf = Vec::with_capacity(1024);
1569 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1570 writer.write(&data).unwrap();
1571 writer.close().unwrap();
1572
1573 let data: Bytes = buf.into();
1574 let metadata = ParquetMetaDataReader::new()
1575 .parse_and_finish(&data)
1576 .unwrap();
1577 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1578
1579 let test = TestReader {
1580 data,
1581 metadata: Default::default(),
1582 requests: Default::default(),
1583 };
1584 let requests = test.requests.clone();
1585
1586 let a_scalar = StringArray::from_iter_values(["b"]);
1587 let a_filter = ArrowPredicateFn::new(
1588 ProjectionMask::leaves(&parquet_schema, vec![0]),
1589 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1590 );
1591
1592 let b_scalar = StringArray::from_iter_values(["4"]);
1593 let b_filter = ArrowPredicateFn::new(
1594 ProjectionMask::leaves(&parquet_schema, vec![1]),
1595 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1596 );
1597
1598 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1599
1600 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1601 let stream = ParquetRecordBatchStreamBuilder::new(test)
1602 .await
1603 .unwrap()
1604 .with_projection(mask.clone())
1605 .with_batch_size(1024)
1606 .with_row_filter(filter)
1607 .build()
1608 .unwrap();
1609
1610 let batches: Vec<_> = stream.try_collect().await.unwrap();
1611 assert_eq!(batches.len(), 1);
1612
1613 let batch = &batches[0];
1614 assert_eq!(batch.num_rows(), 1);
1615 assert_eq!(batch.num_columns(), 2);
1616
1617 let col = batch.column(0);
1618 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1619 assert_eq!(val, "b");
1620
1621 let col = batch.column(1);
1622 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1623 assert_eq!(val, 3);
1624
1625 assert_eq!(requests.lock().unwrap().len(), 3);
1627 }
1628
1629 #[tokio::test]
1630 async fn test_limit_multiple_row_groups() {
1631 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1632 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1633 let c = Int32Array::from_iter(0..6);
1634 let data = RecordBatch::try_from_iter([
1635 ("a", Arc::new(a) as ArrayRef),
1636 ("b", Arc::new(b) as ArrayRef),
1637 ("c", Arc::new(c) as ArrayRef),
1638 ])
1639 .unwrap();
1640
1641 let mut buf = Vec::with_capacity(1024);
1642 let props = WriterProperties::builder()
1643 .set_max_row_group_size(3)
1644 .build();
1645 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1646 writer.write(&data).unwrap();
1647 writer.close().unwrap();
1648
1649 let data: Bytes = buf.into();
1650 let metadata = ParquetMetaDataReader::new()
1651 .parse_and_finish(&data)
1652 .unwrap();
1653
1654 assert_eq!(metadata.num_row_groups(), 2);
1655
1656 let test = TestReader {
1657 data,
1658 metadata: Default::default(),
1659 requests: Default::default(),
1660 };
1661
1662 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1663 .await
1664 .unwrap()
1665 .with_batch_size(1024)
1666 .with_limit(4)
1667 .build()
1668 .unwrap();
1669
1670 let batches: Vec<_> = stream.try_collect().await.unwrap();
1671 assert_eq!(batches.len(), 2);
1673
1674 let batch = &batches[0];
1675 assert_eq!(batch.num_rows(), 3);
1677 assert_eq!(batch.num_columns(), 3);
1678 let col2 = batch.column(2).as_primitive::<Int32Type>();
1679 assert_eq!(col2.values(), &[0, 1, 2]);
1680
1681 let batch = &batches[1];
1682 assert_eq!(batch.num_rows(), 1);
1684 assert_eq!(batch.num_columns(), 3);
1685 let col2 = batch.column(2).as_primitive::<Int32Type>();
1686 assert_eq!(col2.values(), &[3]);
1687
1688 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1689 .await
1690 .unwrap()
1691 .with_offset(2)
1692 .with_limit(3)
1693 .build()
1694 .unwrap();
1695
1696 let batches: Vec<_> = stream.try_collect().await.unwrap();
1697 assert_eq!(batches.len(), 2);
1699
1700 let batch = &batches[0];
1701 assert_eq!(batch.num_rows(), 1);
1703 assert_eq!(batch.num_columns(), 3);
1704 let col2 = batch.column(2).as_primitive::<Int32Type>();
1705 assert_eq!(col2.values(), &[2]);
1706
1707 let batch = &batches[1];
1708 assert_eq!(batch.num_rows(), 2);
1710 assert_eq!(batch.num_columns(), 3);
1711 let col2 = batch.column(2).as_primitive::<Int32Type>();
1712 assert_eq!(col2.values(), &[3, 4]);
1713
1714 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1715 .await
1716 .unwrap()
1717 .with_offset(4)
1718 .with_limit(20)
1719 .build()
1720 .unwrap();
1721
1722 let batches: Vec<_> = stream.try_collect().await.unwrap();
1723 assert_eq!(batches.len(), 1);
1725
1726 let batch = &batches[0];
1727 assert_eq!(batch.num_rows(), 2);
1729 assert_eq!(batch.num_columns(), 3);
1730 let col2 = batch.column(2).as_primitive::<Int32Type>();
1731 assert_eq!(col2.values(), &[4, 5]);
1732 }
1733
1734 #[tokio::test]
1735 async fn test_row_filter_with_index() {
1736 let testdata = arrow::util::test_util::parquet_test_data();
1737 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1738 let data = Bytes::from(std::fs::read(path).unwrap());
1739
1740 let metadata = ParquetMetaDataReader::new()
1741 .parse_and_finish(&data)
1742 .unwrap();
1743 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1744
1745 assert_eq!(metadata.num_row_groups(), 1);
1746
1747 let async_reader = TestReader {
1748 data: data.clone(),
1749 metadata: Default::default(),
1750 requests: Default::default(),
1751 };
1752
1753 let a_filter =
1754 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1755 Ok(batch.column(0).as_boolean().clone())
1756 });
1757
1758 let b_scalar = Int8Array::from(vec![2]);
1759 let b_filter = ArrowPredicateFn::new(
1760 ProjectionMask::leaves(&parquet_schema, vec![2]),
1761 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1762 );
1763
1764 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1765
1766 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1767
1768 let options = ArrowReaderOptions::new().with_page_index(true);
1769 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1770 .await
1771 .unwrap()
1772 .with_projection(mask.clone())
1773 .with_batch_size(1024)
1774 .with_row_filter(filter)
1775 .build()
1776 .unwrap();
1777
1778 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1779
1780 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1781
1782 assert_eq!(total_rows, 730);
1783 }
1784
1785 #[tokio::test]
1786 async fn test_in_memory_row_group_sparse() {
1787 let testdata = arrow::util::test_util::parquet_test_data();
1788 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1789 let data = Bytes::from(std::fs::read(path).unwrap());
1790
1791 let metadata = ParquetMetaDataReader::new()
1792 .with_page_indexes(true)
1793 .parse_and_finish(&data)
1794 .unwrap();
1795
1796 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1797
1798 let mut metadata_builder = metadata.into_builder();
1799 let mut row_groups = metadata_builder.take_row_groups();
1800 row_groups.truncate(1);
1801 let row_group_meta = row_groups.pop().unwrap();
1802
1803 let metadata = metadata_builder
1804 .add_row_group(row_group_meta)
1805 .set_column_index(None)
1806 .set_offset_index(Some(vec![offset_index.clone()]))
1807 .build();
1808
1809 let metadata = Arc::new(metadata);
1810
1811 let num_rows = metadata.row_group(0).num_rows();
1812
1813 assert_eq!(metadata.num_row_groups(), 1);
1814
1815 let async_reader = TestReader {
1816 data: data.clone(),
1817 metadata: Default::default(),
1818 requests: Default::default(),
1819 };
1820
1821 let requests = async_reader.requests.clone();
1822 let (_, fields) = parquet_to_arrow_schema_and_fields(
1823 metadata.file_metadata().schema_descr(),
1824 ProjectionMask::all(),
1825 None,
1826 )
1827 .unwrap();
1828
1829 let _schema_desc = metadata.file_metadata().schema_descr();
1830
1831 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1832
1833 let reader_factory = ReaderFactory {
1834 metadata,
1835 fields: fields.map(Arc::new),
1836 input: async_reader,
1837 filter: None,
1838 limit: None,
1839 offset: None,
1840 };
1841
1842 let mut skip = true;
1843 let mut pages = offset_index[0].page_locations.iter().peekable();
1844
1845 let mut selectors = vec![];
1847 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1848 while let Some(page) = pages.next() {
1849 let num_rows = if let Some(next_page) = pages.peek() {
1850 next_page.first_row_index - page.first_row_index
1851 } else {
1852 num_rows - page.first_row_index
1853 };
1854
1855 if skip {
1856 selectors.push(RowSelector::skip(num_rows as usize));
1857 } else {
1858 selectors.push(RowSelector::select(num_rows as usize));
1859 let start = page.offset as usize;
1860 let end = start + page.compressed_page_size as usize;
1861 expected_page_requests.push(start..end);
1862 }
1863 skip = !skip;
1864 }
1865
1866 let selection = RowSelection::from(selectors);
1867
1868 let (_factory, _reader) = reader_factory
1869 .read_row_group(0, Some(selection), projection.clone(), 48)
1870 .await
1871 .expect("reading row group");
1872
1873 let requests = requests.lock().unwrap();
1874
1875 assert_eq!(&requests[..], &expected_page_requests)
1876 }
1877
1878 #[tokio::test]
1879 async fn test_batch_size_overallocate() {
1880 let testdata = arrow::util::test_util::parquet_test_data();
1881 let path = format!("{testdata}/alltypes_plain.parquet");
1883 let data = Bytes::from(std::fs::read(path).unwrap());
1884
1885 let async_reader = TestReader {
1886 data: data.clone(),
1887 metadata: Default::default(),
1888 requests: Default::default(),
1889 };
1890
1891 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1892 .await
1893 .unwrap();
1894
1895 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1896
1897 let stream = builder
1898 .with_projection(ProjectionMask::all())
1899 .with_batch_size(1024)
1900 .build()
1901 .unwrap();
1902 assert_ne!(1024, file_rows);
1903 assert_eq!(stream.batch_size, file_rows);
1904 }
1905
1906 #[tokio::test]
1907 async fn test_get_row_group_column_bloom_filter_without_length() {
1908 let testdata = arrow::util::test_util::parquet_test_data();
1909 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1910 let data = Bytes::from(std::fs::read(path).unwrap());
1911 test_get_row_group_column_bloom_filter(data, false).await;
1912 }
1913
1914 #[tokio::test]
1915 async fn test_parquet_record_batch_stream_schema() {
1916 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1917 schema.flattened_fields().iter().map(|f| f.name()).collect()
1918 }
1919
1920 let mut metadata = HashMap::with_capacity(1);
1929 metadata.insert("key".to_string(), "value".to_string());
1930
1931 let nested_struct_array = StructArray::from(vec![
1932 (
1933 Arc::new(Field::new("d", DataType::Utf8, true)),
1934 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1935 ),
1936 (
1937 Arc::new(Field::new("e", DataType::Utf8, true)),
1938 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1939 ),
1940 ]);
1941 let struct_array = StructArray::from(vec![
1942 (
1943 Arc::new(Field::new("a", DataType::Int32, true)),
1944 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1945 ),
1946 (
1947 Arc::new(Field::new("b", DataType::UInt64, true)),
1948 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1949 ),
1950 (
1951 Arc::new(Field::new(
1952 "c",
1953 nested_struct_array.data_type().clone(),
1954 true,
1955 )),
1956 Arc::new(nested_struct_array) as ArrayRef,
1957 ),
1958 ]);
1959
1960 let schema =
1961 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1962 let record_batch = RecordBatch::from(struct_array)
1963 .with_schema(schema.clone())
1964 .unwrap();
1965
1966 let mut file = tempfile().unwrap();
1968 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1969 writer.write(&record_batch).unwrap();
1970 writer.close().unwrap();
1971
1972 let all_fields = ["a", "b", "c", "d", "e"];
1973 let projections = [
1975 (vec![], vec![]),
1976 (vec![0], vec!["a"]),
1977 (vec![0, 1], vec!["a", "b"]),
1978 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1979 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1980 ];
1981
1982 for (indices, expected_projected_names) in projections {
1984 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1985 assert_eq!(get_all_field_names(&builder), all_fields);
1987 assert_eq!(builder.metadata, metadata);
1988 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1990 assert_eq!(reader.metadata, HashMap::default());
1991 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1992 assert_eq!(batch.metadata, HashMap::default());
1993 };
1994
1995 let builder =
1996 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1997 let sync_builder_schema = builder.schema().clone();
1998 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1999 let mut reader = builder.with_projection(mask).build().unwrap();
2000 let sync_reader_schema = reader.schema();
2001 let batch = reader.next().unwrap().unwrap();
2002 let sync_batch_schema = batch.schema();
2003 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2004
2005 let file = tokio::fs::File::from(file.try_clone().unwrap());
2007 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2008 let async_builder_schema = builder.schema().clone();
2009 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2010 let mut reader = builder.with_projection(mask).build().unwrap();
2011 let async_reader_schema = reader.schema().clone();
2012 let batch = reader.next().await.unwrap().unwrap();
2013 let async_batch_schema = batch.schema();
2014 assert_schemas(
2015 async_builder_schema,
2016 async_reader_schema,
2017 async_batch_schema,
2018 );
2019 }
2020 }
2021
2022 #[tokio::test]
2023 async fn test_get_row_group_column_bloom_filter_with_length() {
2024 let testdata = arrow::util::test_util::parquet_test_data();
2026 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2027 let data = Bytes::from(std::fs::read(path).unwrap());
2028 let async_reader = TestReader {
2029 data: data.clone(),
2030 metadata: Default::default(),
2031 requests: Default::default(),
2032 };
2033 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2034 .await
2035 .unwrap();
2036 let schema = builder.schema().clone();
2037 let stream = builder.build().unwrap();
2038 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2039
2040 let mut parquet_data = Vec::new();
2041 let props = WriterProperties::builder()
2042 .set_bloom_filter_enabled(true)
2043 .build();
2044 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2045 for batch in batches {
2046 writer.write(&batch).unwrap();
2047 }
2048 writer.close().unwrap();
2049
2050 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2052 }
2053
2054 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2055 let async_reader = TestReader {
2056 data: data.clone(),
2057 metadata: Default::default(),
2058 requests: Default::default(),
2059 };
2060
2061 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2062 .await
2063 .unwrap();
2064
2065 let metadata = builder.metadata();
2066 assert_eq!(metadata.num_row_groups(), 1);
2067 let row_group = metadata.row_group(0);
2068 let column = row_group.column(0);
2069 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2070
2071 let sbbf = builder
2072 .get_row_group_column_bloom_filter(0, 0)
2073 .await
2074 .unwrap()
2075 .unwrap();
2076 assert!(sbbf.check(&"Hello"));
2077 assert!(!sbbf.check(&"Hello_Not_Exists"));
2078 }
2079
2080 #[tokio::test]
2081 async fn test_nested_skip() {
2082 let schema = Arc::new(Schema::new(vec![
2083 Field::new("col_1", DataType::UInt64, false),
2084 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2085 ]));
2086
2087 let props = WriterProperties::builder()
2089 .set_data_page_row_count_limit(256)
2090 .set_write_batch_size(256)
2091 .set_max_row_group_size(1024);
2092
2093 let mut file = tempfile().unwrap();
2095 let mut writer =
2096 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2097
2098 let mut builder = ListBuilder::new(StringBuilder::new());
2099 for id in 0..1024 {
2100 match id % 3 {
2101 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2102 1 => builder.append_value([Some(format!("id_{id}"))]),
2103 _ => builder.append_null(),
2104 }
2105 }
2106 let refs = vec![
2107 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2108 Arc::new(builder.finish()) as ArrayRef,
2109 ];
2110
2111 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2112 writer.write(&batch).unwrap();
2113 writer.close().unwrap();
2114
2115 let selections = [
2116 RowSelection::from(vec![
2117 RowSelector::skip(313),
2118 RowSelector::select(1),
2119 RowSelector::skip(709),
2120 RowSelector::select(1),
2121 ]),
2122 RowSelection::from(vec![
2123 RowSelector::skip(255),
2124 RowSelector::select(1),
2125 RowSelector::skip(767),
2126 RowSelector::select(1),
2127 ]),
2128 RowSelection::from(vec![
2129 RowSelector::select(255),
2130 RowSelector::skip(1),
2131 RowSelector::select(767),
2132 RowSelector::skip(1),
2133 ]),
2134 RowSelection::from(vec![
2135 RowSelector::skip(254),
2136 RowSelector::select(1),
2137 RowSelector::select(1),
2138 RowSelector::skip(767),
2139 RowSelector::select(1),
2140 ]),
2141 ];
2142
2143 for selection in selections {
2144 let expected = selection.row_count();
2145 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2147 tokio::fs::File::from_std(file.try_clone().unwrap()),
2148 ArrowReaderOptions::new().with_page_index(true),
2149 )
2150 .await
2151 .unwrap();
2152
2153 reader = reader.with_row_selection(selection);
2154
2155 let mut stream = reader.build().unwrap();
2156
2157 let mut total_rows = 0;
2158 while let Some(rb) = stream.next().await {
2159 let rb = rb.unwrap();
2160 total_rows += rb.num_rows();
2161 }
2162 assert_eq!(total_rows, expected);
2163 }
2164 }
2165
2166 #[tokio::test]
2167 async fn test_row_filter_nested() {
2168 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2169 let b = StructArray::from(vec![
2170 (
2171 Arc::new(Field::new("aa", DataType::Utf8, true)),
2172 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2173 ),
2174 (
2175 Arc::new(Field::new("bb", DataType::Utf8, true)),
2176 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2177 ),
2178 ]);
2179 let c = Int32Array::from_iter(0..6);
2180 let data = RecordBatch::try_from_iter([
2181 ("a", Arc::new(a) as ArrayRef),
2182 ("b", Arc::new(b) as ArrayRef),
2183 ("c", Arc::new(c) as ArrayRef),
2184 ])
2185 .unwrap();
2186
2187 let mut buf = Vec::with_capacity(1024);
2188 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2189 writer.write(&data).unwrap();
2190 writer.close().unwrap();
2191
2192 let data: Bytes = buf.into();
2193 let metadata = ParquetMetaDataReader::new()
2194 .parse_and_finish(&data)
2195 .unwrap();
2196 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2197
2198 let test = TestReader {
2199 data,
2200 metadata: Default::default(),
2201 requests: Default::default(),
2202 };
2203 let requests = test.requests.clone();
2204
2205 let a_scalar = StringArray::from_iter_values(["b"]);
2206 let a_filter = ArrowPredicateFn::new(
2207 ProjectionMask::leaves(&parquet_schema, vec![0]),
2208 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2209 );
2210
2211 let b_scalar = StringArray::from_iter_values(["4"]);
2212 let b_filter = ArrowPredicateFn::new(
2213 ProjectionMask::leaves(&parquet_schema, vec![2]),
2214 move |batch| {
2215 let struct_array = batch
2217 .column(0)
2218 .as_any()
2219 .downcast_ref::<StructArray>()
2220 .unwrap();
2221 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2222 },
2223 );
2224
2225 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2226
2227 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2228 let stream = ParquetRecordBatchStreamBuilder::new(test)
2229 .await
2230 .unwrap()
2231 .with_projection(mask.clone())
2232 .with_batch_size(1024)
2233 .with_row_filter(filter)
2234 .build()
2235 .unwrap();
2236
2237 let batches: Vec<_> = stream.try_collect().await.unwrap();
2238 assert_eq!(batches.len(), 1);
2239
2240 let batch = &batches[0];
2241 assert_eq!(batch.num_rows(), 1);
2242 assert_eq!(batch.num_columns(), 2);
2243
2244 let col = batch.column(0);
2245 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2246 assert_eq!(val, "b");
2247
2248 let col = batch.column(1);
2249 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2250 assert_eq!(val, 3);
2251
2252 assert_eq!(requests.lock().unwrap().len(), 3);
2254 }
2255
2256 #[tokio::test]
2257 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2258 use tokio::fs::File;
2259 let testdata = arrow::util::test_util::parquet_test_data();
2260 let path = format!("{testdata}/alltypes_plain.parquet");
2261 let mut file = File::open(&path).await.unwrap();
2262 let file_size = file.metadata().await.unwrap().len();
2263 let mut metadata = ParquetMetaDataReader::new()
2264 .with_page_indexes(true)
2265 .load_and_finish(&mut file, file_size)
2266 .await
2267 .unwrap();
2268
2269 metadata.set_offset_index(Some(vec![]));
2270 let options = ArrowReaderOptions::new().with_page_index(true);
2271 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2272 let reader =
2273 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2274 .build()
2275 .unwrap();
2276
2277 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2278 assert_eq!(result.len(), 1);
2279 }
2280
2281 #[tokio::test]
2282 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2283 use tokio::fs::File;
2284 let testdata = arrow::util::test_util::parquet_test_data();
2285 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2286 let mut file = File::open(&path).await.unwrap();
2287 let file_size = file.metadata().await.unwrap().len();
2288 let metadata = ParquetMetaDataReader::new()
2289 .with_page_indexes(true)
2290 .load_and_finish(&mut file, file_size)
2291 .await
2292 .unwrap();
2293
2294 let options = ArrowReaderOptions::new().with_page_index(true);
2295 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2296 let reader =
2297 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2298 .build()
2299 .unwrap();
2300
2301 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2302 assert_eq!(result.len(), 8);
2303 }
2304
2305 #[tokio::test]
2306 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2307 use tempfile::TempDir;
2308 use tokio::fs::File;
2309 fn write_metadata_to_local_file(
2310 metadata: ParquetMetaData,
2311 file: impl AsRef<std::path::Path>,
2312 ) {
2313 use crate::file::metadata::ParquetMetaDataWriter;
2314 use std::fs::File;
2315 let file = File::create(file).unwrap();
2316 ParquetMetaDataWriter::new(file, &metadata)
2317 .finish()
2318 .unwrap()
2319 }
2320
2321 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2322 use std::fs::File;
2323 let file = File::open(file).unwrap();
2324 ParquetMetaDataReader::new()
2325 .with_page_indexes(true)
2326 .parse_and_finish(&file)
2327 .unwrap()
2328 }
2329
2330 let testdata = arrow::util::test_util::parquet_test_data();
2331 let path = format!("{testdata}/alltypes_plain.parquet");
2332 let mut file = File::open(&path).await.unwrap();
2333 let file_size = file.metadata().await.unwrap().len();
2334 let metadata = ParquetMetaDataReader::new()
2335 .with_page_indexes(true)
2336 .load_and_finish(&mut file, file_size)
2337 .await
2338 .unwrap();
2339
2340 let tempdir = TempDir::new().unwrap();
2341 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2342 write_metadata_to_local_file(metadata, &metadata_path);
2343 let metadata = read_metadata_from_local_file(&metadata_path);
2344
2345 let options = ArrowReaderOptions::new().with_page_index(true);
2346 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2347 let reader =
2348 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2349 .build()
2350 .unwrap();
2351
2352 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2354 assert_eq!(result.len(), 1);
2355 }
2356}