1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups};
42use crate::arrow::arrow_reader::{
43 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
44 RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
57
58mod metadata;
59pub use metadata::*;
60
61#[cfg(feature = "object_store")]
62mod store;
63
64use crate::arrow::arrow_reader::ReadPlanBuilder;
65use crate::arrow::schema::ParquetField;
66#[cfg(feature = "object_store")]
67pub use store::*;
68
69pub trait AsyncFileReader: Send {
83 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
85
86 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
88 async move {
89 let mut result = Vec::with_capacity(ranges.len());
90
91 for range in ranges.into_iter() {
92 let data = self.get_bytes(range).await?;
93 result.push(data);
94 }
95
96 Ok(result)
97 }
98 .boxed()
99 }
100
101 fn get_metadata<'a>(
118 &'a mut self,
119 options: Option<&'a ArrowReaderOptions>,
120 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
121}
122
123impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
125 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
126 self.as_mut().get_bytes(range)
127 }
128
129 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
130 self.as_mut().get_byte_ranges(ranges)
131 }
132
133 fn get_metadata<'a>(
134 &'a mut self,
135 options: Option<&'a ArrowReaderOptions>,
136 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
137 self.as_mut().get_metadata(options)
138 }
139}
140
141impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
142 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
143 async move {
144 self.seek(SeekFrom::End(-(suffix as i64))).await?;
145 let mut buf = Vec::with_capacity(suffix);
146 self.take(suffix as _).read_to_end(&mut buf).await?;
147 Ok(buf.into())
148 }
149 .boxed()
150 }
151}
152
153impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
154 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
155 async move {
156 self.seek(SeekFrom::Start(range.start)).await?;
157
158 let to_read = range.end - range.start;
159 let mut buffer = Vec::with_capacity(to_read.try_into()?);
160 let read = self.take(to_read).read_to_end(&mut buffer).await?;
161 if read as u64 != to_read {
162 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
163 }
164
165 Ok(buffer.into())
166 }
167 .boxed()
168 }
169
170 fn get_metadata<'a>(
171 &'a mut self,
172 options: Option<&'a ArrowReaderOptions>,
173 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
174 async move {
175 let metadata_reader = ParquetMetaDataReader::new()
176 .with_page_indexes(options.is_some_and(|o| o.page_index));
177
178 #[cfg(feature = "encryption")]
179 let metadata_reader = metadata_reader.with_decryption_properties(
180 options.and_then(|o| o.file_decryption_properties.as_ref()),
181 );
182
183 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
184 Ok(Arc::new(parquet_metadata))
185 }
186 .boxed()
187 }
188}
189
190impl ArrowReaderMetadata {
191 pub async fn load_async<T: AsyncFileReader>(
195 input: &mut T,
196 options: ArrowReaderOptions,
197 ) -> Result<Self> {
198 let metadata = input.get_metadata(Some(&options)).await?;
199 Self::try_new(metadata, options)
200 }
201}
202
203#[doc(hidden)]
204pub struct AsyncReader<T>(T);
209
210pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
223
224impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
225 pub async fn new(input: T) -> Result<Self> {
356 Self::new_with_options(input, Default::default()).await
357 }
358
359 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
362 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
363 Ok(Self::new_with_metadata(input, metadata))
364 }
365
366 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
412 Self::new_builder(AsyncReader(input), metadata)
413 }
414
415 pub async fn get_row_group_column_bloom_filter(
421 &mut self,
422 row_group_idx: usize,
423 column_idx: usize,
424 ) -> Result<Option<Sbbf>> {
425 let metadata = self.metadata.row_group(row_group_idx);
426 let column_metadata = metadata.column(column_idx);
427
428 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
429 offset
430 .try_into()
431 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
432 } else {
433 return Ok(None);
434 };
435
436 let buffer = match column_metadata.bloom_filter_length() {
437 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
438 None => self
439 .input
440 .0
441 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
442 }
443 .await?;
444
445 let (header, bitset_offset) =
446 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
447
448 match header.algorithm {
449 BloomFilterAlgorithm::BLOCK(_) => {
450 }
452 }
453 match header.compression {
454 BloomFilterCompression::UNCOMPRESSED(_) => {
455 }
457 }
458 match header.hash {
459 BloomFilterHash::XXHASH(_) => {
460 }
462 }
463
464 let bitset = match column_metadata.bloom_filter_length() {
465 Some(_) => buffer.slice(
466 (TryInto::<usize>::try_into(bitset_offset).unwrap()
467 - TryInto::<usize>::try_into(offset).unwrap())..,
468 ),
469 None => {
470 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
471 ParquetError::General("Bloom filter length is invalid".to_string())
472 })?;
473 self.input
474 .0
475 .get_bytes(bitset_offset..bitset_offset + bitset_length)
476 .await?
477 }
478 };
479 Ok(Some(Sbbf::new(&bitset)))
480 }
481
482 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
486 let num_row_groups = self.metadata.row_groups().len();
487
488 let row_groups = match self.row_groups {
489 Some(row_groups) => {
490 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
491 return Err(general_err!(
492 "row group {} out of bounds 0..{}",
493 col,
494 num_row_groups
495 ));
496 }
497 row_groups.into()
498 }
499 None => (0..self.metadata.row_groups().len()).collect(),
500 };
501
502 let batch_size = self
504 .batch_size
505 .min(self.metadata.file_metadata().num_rows() as usize);
506 let reader_factory = ReaderFactory {
507 input: self.input.0,
508 filter: self.filter,
509 metadata: self.metadata.clone(),
510 fields: self.fields,
511 limit: self.limit,
512 offset: self.offset,
513 };
514
515 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
518 Some(DataType::Struct(fields)) => {
519 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
520 }
521 None => Fields::empty(),
522 _ => unreachable!("Must be Struct for root type"),
523 };
524 let schema = Arc::new(Schema::new(projected_fields));
525
526 Ok(ParquetRecordBatchStream {
527 metadata: self.metadata,
528 batch_size,
529 row_groups,
530 projection: self.projection,
531 selection: self.selection,
532 schema,
533 reader_factory: Some(reader_factory),
534 state: StreamState::Init,
535 })
536 }
537}
538
539type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
544
545struct ReaderFactory<T> {
548 metadata: Arc<ParquetMetaData>,
549
550 fields: Option<Arc<ParquetField>>,
552
553 input: T,
554
555 filter: Option<RowFilter>,
557
558 limit: Option<usize>,
560
561 offset: Option<usize>,
563}
564
565impl<T> ReaderFactory<T>
566where
567 T: AsyncFileReader + Send,
568{
569 async fn read_row_group(
575 mut self,
576 row_group_idx: usize,
577 selection: Option<RowSelection>,
578 projection: ProjectionMask,
579 batch_size: usize,
580 ) -> ReadResult<T> {
581 let meta = self.metadata.row_group(row_group_idx);
584 let offset_index = self
585 .metadata
586 .offset_index()
587 .filter(|index| !index.is_empty())
589 .map(|x| x[row_group_idx].as_slice());
590
591 let mut row_group = InMemoryRowGroup {
592 row_count: meta.num_rows() as usize,
594 column_chunks: vec![None; meta.columns().len()],
595 offset_index,
596 row_group_idx,
597 metadata: self.metadata.as_ref(),
598 };
599
600 let filter = self.filter.as_mut();
601 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
602
603 if let Some(filter) = filter {
605 for predicate in filter.predicates.iter_mut() {
606 if !plan_builder.selects_any() {
607 return Ok((self, None)); }
609
610 let selection = plan_builder.selection();
612 row_group
613 .fetch(&mut self.input, predicate.projection(), selection)
614 .await?;
615
616 let array_reader = ArrayReaderBuilder::new(&row_group)
617 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
618
619 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
620 }
621 }
622
623 let rows_before = plan_builder
625 .num_rows_selected()
626 .unwrap_or(row_group.row_count);
627
628 if rows_before == 0 {
629 return Ok((self, None)); }
631
632 let plan_builder = plan_builder
634 .limited(row_group.row_count)
635 .with_offset(self.offset)
636 .with_limit(self.limit)
637 .build_limited();
638
639 let rows_after = plan_builder
640 .num_rows_selected()
641 .unwrap_or(row_group.row_count);
642
643 if let Some(offset) = &mut self.offset {
645 *offset = offset.saturating_sub(rows_before - rows_after)
648 }
649
650 if rows_after == 0 {
651 return Ok((self, None)); }
653
654 if let Some(limit) = &mut self.limit {
655 *limit -= rows_after;
656 }
657 row_group
659 .fetch(&mut self.input, &projection, plan_builder.selection())
660 .await?;
661
662 let plan = plan_builder.build();
663
664 let array_reader = ArrayReaderBuilder::new(&row_group)
665 .build_array_reader(self.fields.as_deref(), &projection)?;
666
667 let reader = ParquetRecordBatchReader::new(array_reader, plan);
668
669 Ok((self, Some(reader)))
670 }
671}
672
673enum StreamState<T> {
674 Init,
676 Decoding(ParquetRecordBatchReader),
678 Reading(BoxFuture<'static, ReadResult<T>>),
680 Error,
682}
683
684impl<T> std::fmt::Debug for StreamState<T> {
685 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
686 match self {
687 StreamState::Init => write!(f, "StreamState::Init"),
688 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
689 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
690 StreamState::Error => write!(f, "StreamState::Error"),
691 }
692 }
693}
694
695pub struct ParquetRecordBatchStream<T> {
713 metadata: Arc<ParquetMetaData>,
714
715 schema: SchemaRef,
716
717 row_groups: VecDeque<usize>,
718
719 projection: ProjectionMask,
720
721 batch_size: usize,
722
723 selection: Option<RowSelection>,
724
725 reader_factory: Option<ReaderFactory<T>>,
727
728 state: StreamState<T>,
729}
730
731impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
732 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
733 f.debug_struct("ParquetRecordBatchStream")
734 .field("metadata", &self.metadata)
735 .field("schema", &self.schema)
736 .field("batch_size", &self.batch_size)
737 .field("projection", &self.projection)
738 .field("state", &self.state)
739 .finish()
740 }
741}
742
743impl<T> ParquetRecordBatchStream<T> {
744 pub fn schema(&self) -> &SchemaRef {
749 &self.schema
750 }
751}
752
753impl<T> ParquetRecordBatchStream<T>
754where
755 T: AsyncFileReader + Unpin + Send + 'static,
756{
757 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
771 loop {
772 match &mut self.state {
773 StreamState::Decoding(_) | StreamState::Reading(_) => {
774 return Err(ParquetError::General(
775 "Cannot combine the use of next_row_group with the Stream API".to_string(),
776 ))
777 }
778 StreamState::Init => {
779 let row_group_idx = match self.row_groups.pop_front() {
780 Some(idx) => idx,
781 None => return Ok(None),
782 };
783
784 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
785
786 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
787
788 let reader_factory = self.reader_factory.take().expect("lost reader factory");
789
790 let (reader_factory, maybe_reader) = reader_factory
791 .read_row_group(
792 row_group_idx,
793 selection,
794 self.projection.clone(),
795 self.batch_size,
796 )
797 .await
798 .inspect_err(|_| {
799 self.state = StreamState::Error;
800 })?;
801 self.reader_factory = Some(reader_factory);
802
803 if let Some(reader) = maybe_reader {
804 return Ok(Some(reader));
805 } else {
806 continue;
808 }
809 }
810 StreamState::Error => return Ok(None), }
812 }
813 }
814}
815
816impl<T> Stream for ParquetRecordBatchStream<T>
817where
818 T: AsyncFileReader + Unpin + Send + 'static,
819{
820 type Item = Result<RecordBatch>;
821
822 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
823 loop {
824 match &mut self.state {
825 StreamState::Decoding(batch_reader) => match batch_reader.next() {
826 Some(Ok(batch)) => {
827 return Poll::Ready(Some(Ok(batch)));
828 }
829 Some(Err(e)) => {
830 self.state = StreamState::Error;
831 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
832 }
833 None => self.state = StreamState::Init,
834 },
835 StreamState::Init => {
836 let row_group_idx = match self.row_groups.pop_front() {
837 Some(idx) => idx,
838 None => return Poll::Ready(None),
839 };
840
841 let reader = self.reader_factory.take().expect("lost reader factory");
842
843 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
844
845 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
846
847 let fut = reader
848 .read_row_group(
849 row_group_idx,
850 selection,
851 self.projection.clone(),
852 self.batch_size,
853 )
854 .boxed();
855
856 self.state = StreamState::Reading(fut)
857 }
858 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
859 Ok((reader_factory, maybe_reader)) => {
860 self.reader_factory = Some(reader_factory);
861 match maybe_reader {
862 Some(reader) => self.state = StreamState::Decoding(reader),
864 None => self.state = StreamState::Init,
866 }
867 }
868 Err(e) => {
869 self.state = StreamState::Error;
870 return Poll::Ready(Some(Err(e)));
871 }
872 },
873 StreamState::Error => return Poll::Ready(None), }
875 }
876 }
877}
878
879struct InMemoryRowGroup<'a> {
881 offset_index: Option<&'a [OffsetIndexMetaData]>,
882 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
884 row_count: usize,
885 row_group_idx: usize,
886 metadata: &'a ParquetMetaData,
887}
888
889impl InMemoryRowGroup<'_> {
890 async fn fetch<T: AsyncFileReader + Send>(
896 &mut self,
897 input: &mut T,
898 projection: &ProjectionMask,
899 selection: Option<&RowSelection>,
900 ) -> Result<()> {
901 let metadata = self.metadata.row_group(self.row_group_idx);
902 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
903 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
906
907 let fetch_ranges = self
908 .column_chunks
909 .iter()
910 .zip(metadata.columns())
911 .enumerate()
912 .filter(|&(idx, (chunk, _chunk_meta))| {
913 chunk.is_none() && projection.leaf_included(idx)
914 })
915 .flat_map(|(idx, (_chunk, chunk_meta))| {
916 let mut ranges: Vec<Range<u64>> = vec![];
919 let (start, _len) = chunk_meta.byte_range();
920 match offset_index[idx].page_locations.first() {
921 Some(first) if first.offset as u64 != start => {
922 ranges.push(start..first.offset as u64);
923 }
924 _ => (),
925 }
926
927 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
928 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
929
930 ranges
931 })
932 .collect();
933
934 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
935 let mut page_start_offsets = page_start_offsets.into_iter();
936
937 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
938 if chunk.is_some() || !projection.leaf_included(idx) {
939 continue;
940 }
941
942 if let Some(offsets) = page_start_offsets.next() {
943 let mut chunks = Vec::with_capacity(offsets.len());
944 for _ in 0..offsets.len() {
945 chunks.push(chunk_data.next().unwrap());
946 }
947
948 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
949 length: metadata.column(idx).byte_range().1 as usize,
950 data: offsets
951 .into_iter()
952 .map(|x| x as usize)
953 .zip(chunks.into_iter())
954 .collect(),
955 }))
956 }
957 }
958 } else {
959 let fetch_ranges = self
960 .column_chunks
961 .iter()
962 .enumerate()
963 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
964 .map(|(idx, _chunk)| {
965 let column = metadata.column(idx);
966 let (start, length) = column.byte_range();
967 start..(start + length)
968 })
969 .collect();
970
971 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
972
973 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
974 if chunk.is_some() || !projection.leaf_included(idx) {
975 continue;
976 }
977
978 if let Some(data) = chunk_data.next() {
979 *chunk = Some(Arc::new(ColumnChunkData::Dense {
980 offset: metadata.column(idx).byte_range().0 as usize,
981 data,
982 }));
983 }
984 }
985 }
986
987 Ok(())
988 }
989}
990
991impl RowGroups for InMemoryRowGroup<'_> {
992 fn num_rows(&self) -> usize {
993 self.row_count
994 }
995
996 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
998 match &self.column_chunks[i] {
999 None => Err(ParquetError::General(format!(
1000 "Invalid column index {i}, column was not fetched"
1001 ))),
1002 Some(data) => {
1003 let page_locations = self
1004 .offset_index
1005 .filter(|index| !index.is_empty())
1007 .map(|index| index[i].page_locations.clone());
1008 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1009 let page_reader = SerializedPageReader::new(
1010 data.clone(),
1011 column_chunk_metadata,
1012 self.row_count,
1013 page_locations,
1014 )?;
1015 let page_reader = page_reader.add_crypto_context(
1016 self.row_group_idx,
1017 i,
1018 self.metadata,
1019 column_chunk_metadata,
1020 )?;
1021
1022 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1023
1024 Ok(Box::new(ColumnChunkIterator {
1025 reader: Some(Ok(page_reader)),
1026 }))
1027 }
1028 }
1029 }
1030}
1031
1032#[derive(Clone)]
1034enum ColumnChunkData {
1035 Sparse {
1037 length: usize,
1039 data: Vec<(usize, Bytes)>,
1044 },
1045 Dense { offset: usize, data: Bytes },
1047}
1048
1049impl ColumnChunkData {
1050 fn get(&self, start: u64) -> Result<Bytes> {
1052 match &self {
1053 ColumnChunkData::Sparse { data, .. } => data
1054 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1055 .map(|idx| data[idx].1.clone())
1056 .map_err(|_| {
1057 ParquetError::General(format!(
1058 "Invalid offset in sparse column chunk data: {start}"
1059 ))
1060 }),
1061 ColumnChunkData::Dense { offset, data } => {
1062 let start = start as usize - *offset;
1063 Ok(data.slice(start..))
1064 }
1065 }
1066 }
1067}
1068
1069impl Length for ColumnChunkData {
1070 fn len(&self) -> u64 {
1072 match &self {
1073 ColumnChunkData::Sparse { length, .. } => *length as u64,
1074 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1075 }
1076 }
1077}
1078
1079impl ChunkReader for ColumnChunkData {
1080 type T = bytes::buf::Reader<Bytes>;
1081
1082 fn get_read(&self, start: u64) -> Result<Self::T> {
1083 Ok(self.get(start)?.reader())
1084 }
1085
1086 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1087 Ok(self.get(start)?.slice(..length))
1088 }
1089}
1090
1091struct ColumnChunkIterator {
1093 reader: Option<Result<Box<dyn PageReader>>>,
1094}
1095
1096impl Iterator for ColumnChunkIterator {
1097 type Item = Result<Box<dyn PageReader>>;
1098
1099 fn next(&mut self) -> Option<Self::Item> {
1100 self.reader.take()
1101 }
1102}
1103
1104impl PageIterator for ColumnChunkIterator {}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::*;
1109 use crate::arrow::arrow_reader::{
1110 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1111 };
1112 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1113 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1114 use crate::arrow::ArrowWriter;
1115 use crate::file::metadata::ParquetMetaDataReader;
1116 use crate::file::properties::WriterProperties;
1117 use arrow::compute::kernels::cmp::eq;
1118 use arrow::error::Result as ArrowResult;
1119 use arrow_array::builder::{ListBuilder, StringBuilder};
1120 use arrow_array::cast::AsArray;
1121 use arrow_array::types::Int32Type;
1122 use arrow_array::{
1123 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1124 StructArray, UInt64Array,
1125 };
1126 use arrow_schema::{DataType, Field, Schema};
1127 use futures::{StreamExt, TryStreamExt};
1128 use rand::{rng, Rng};
1129 use std::collections::HashMap;
1130 use std::sync::{Arc, Mutex};
1131 use tempfile::tempfile;
1132
1133 #[derive(Clone)]
1134 struct TestReader {
1135 data: Bytes,
1136 metadata: Option<Arc<ParquetMetaData>>,
1137 requests: Arc<Mutex<Vec<Range<usize>>>>,
1138 }
1139
1140 impl TestReader {
1141 fn new(data: Bytes) -> Self {
1142 Self {
1143 data,
1144 metadata: Default::default(),
1145 requests: Default::default(),
1146 }
1147 }
1148 }
1149
1150 impl AsyncFileReader for TestReader {
1151 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1152 let range = range.clone();
1153 self.requests
1154 .lock()
1155 .unwrap()
1156 .push(range.start as usize..range.end as usize);
1157 futures::future::ready(Ok(self
1158 .data
1159 .slice(range.start as usize..range.end as usize)))
1160 .boxed()
1161 }
1162
1163 fn get_metadata<'a>(
1164 &'a mut self,
1165 options: Option<&'a ArrowReaderOptions>,
1166 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1167 let metadata_reader = ParquetMetaDataReader::new()
1168 .with_page_indexes(options.is_some_and(|o| o.page_index));
1169 self.metadata = Some(Arc::new(
1170 metadata_reader.parse_and_finish(&self.data).unwrap(),
1171 ));
1172 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1173 }
1174 }
1175
1176 #[tokio::test]
1177 async fn test_async_reader() {
1178 let testdata = arrow::util::test_util::parquet_test_data();
1179 let path = format!("{testdata}/alltypes_plain.parquet");
1180 let data = Bytes::from(std::fs::read(path).unwrap());
1181
1182 let async_reader = TestReader::new(data.clone());
1183
1184 let requests = async_reader.requests.clone();
1185 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1186 .await
1187 .unwrap();
1188
1189 let metadata = builder.metadata().clone();
1190 assert_eq!(metadata.num_row_groups(), 1);
1191
1192 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1193 let stream = builder
1194 .with_projection(mask.clone())
1195 .with_batch_size(1024)
1196 .build()
1197 .unwrap();
1198
1199 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1200
1201 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1202 .unwrap()
1203 .with_projection(mask)
1204 .with_batch_size(104)
1205 .build()
1206 .unwrap()
1207 .collect::<ArrowResult<Vec<_>>>()
1208 .unwrap();
1209
1210 assert_eq!(async_batches, sync_batches);
1211
1212 let requests = requests.lock().unwrap();
1213 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1214 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1215
1216 assert_eq!(
1217 &requests[..],
1218 &[
1219 offset_1 as usize..(offset_1 + length_1) as usize,
1220 offset_2 as usize..(offset_2 + length_2) as usize
1221 ]
1222 );
1223 }
1224
1225 #[tokio::test]
1226 async fn test_async_reader_with_next_row_group() {
1227 let testdata = arrow::util::test_util::parquet_test_data();
1228 let path = format!("{testdata}/alltypes_plain.parquet");
1229 let data = Bytes::from(std::fs::read(path).unwrap());
1230
1231 let async_reader = TestReader::new(data.clone());
1232
1233 let requests = async_reader.requests.clone();
1234 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1235 .await
1236 .unwrap();
1237
1238 let metadata = builder.metadata().clone();
1239 assert_eq!(metadata.num_row_groups(), 1);
1240
1241 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1242 let mut stream = builder
1243 .with_projection(mask.clone())
1244 .with_batch_size(1024)
1245 .build()
1246 .unwrap();
1247
1248 let mut readers = vec![];
1249 while let Some(reader) = stream.next_row_group().await.unwrap() {
1250 readers.push(reader);
1251 }
1252
1253 let async_batches: Vec<_> = readers
1254 .into_iter()
1255 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1256 .collect();
1257
1258 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1259 .unwrap()
1260 .with_projection(mask)
1261 .with_batch_size(104)
1262 .build()
1263 .unwrap()
1264 .collect::<ArrowResult<Vec<_>>>()
1265 .unwrap();
1266
1267 assert_eq!(async_batches, sync_batches);
1268
1269 let requests = requests.lock().unwrap();
1270 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1271 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1272
1273 assert_eq!(
1274 &requests[..],
1275 &[
1276 offset_1 as usize..(offset_1 + length_1) as usize,
1277 offset_2 as usize..(offset_2 + length_2) as usize
1278 ]
1279 );
1280 }
1281
1282 #[tokio::test]
1283 async fn test_async_reader_with_index() {
1284 let testdata = arrow::util::test_util::parquet_test_data();
1285 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1286 let data = Bytes::from(std::fs::read(path).unwrap());
1287
1288 let async_reader = TestReader::new(data.clone());
1289
1290 let options = ArrowReaderOptions::new().with_page_index(true);
1291 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1292 .await
1293 .unwrap();
1294
1295 let metadata_with_index = builder.metadata();
1297 assert_eq!(metadata_with_index.num_row_groups(), 1);
1298
1299 let offset_index = metadata_with_index.offset_index().unwrap();
1301 let column_index = metadata_with_index.column_index().unwrap();
1302
1303 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1304 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1305
1306 let num_columns = metadata_with_index
1307 .file_metadata()
1308 .schema_descr()
1309 .num_columns();
1310
1311 offset_index
1313 .iter()
1314 .for_each(|x| assert_eq!(x.len(), num_columns));
1315 column_index
1316 .iter()
1317 .for_each(|x| assert_eq!(x.len(), num_columns));
1318
1319 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1320 let stream = builder
1321 .with_projection(mask.clone())
1322 .with_batch_size(1024)
1323 .build()
1324 .unwrap();
1325
1326 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1327
1328 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1329 .unwrap()
1330 .with_projection(mask)
1331 .with_batch_size(1024)
1332 .build()
1333 .unwrap()
1334 .collect::<ArrowResult<Vec<_>>>()
1335 .unwrap();
1336
1337 assert_eq!(async_batches, sync_batches);
1338 }
1339
1340 #[tokio::test]
1341 async fn test_async_reader_with_limit() {
1342 let testdata = arrow::util::test_util::parquet_test_data();
1343 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1344 let data = Bytes::from(std::fs::read(path).unwrap());
1345
1346 let metadata = ParquetMetaDataReader::new()
1347 .parse_and_finish(&data)
1348 .unwrap();
1349 let metadata = Arc::new(metadata);
1350
1351 assert_eq!(metadata.num_row_groups(), 1);
1352
1353 let async_reader = TestReader::new(data.clone());
1354
1355 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1356 .await
1357 .unwrap();
1358
1359 assert_eq!(builder.metadata().num_row_groups(), 1);
1360
1361 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1362 let stream = builder
1363 .with_projection(mask.clone())
1364 .with_batch_size(1024)
1365 .with_limit(1)
1366 .build()
1367 .unwrap();
1368
1369 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1370
1371 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1372 .unwrap()
1373 .with_projection(mask)
1374 .with_batch_size(1024)
1375 .with_limit(1)
1376 .build()
1377 .unwrap()
1378 .collect::<ArrowResult<Vec<_>>>()
1379 .unwrap();
1380
1381 assert_eq!(async_batches, sync_batches);
1382 }
1383
1384 #[tokio::test]
1385 async fn test_async_reader_skip_pages() {
1386 let testdata = arrow::util::test_util::parquet_test_data();
1387 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1388 let data = Bytes::from(std::fs::read(path).unwrap());
1389
1390 let async_reader = TestReader::new(data.clone());
1391
1392 let options = ArrowReaderOptions::new().with_page_index(true);
1393 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1394 .await
1395 .unwrap();
1396
1397 assert_eq!(builder.metadata().num_row_groups(), 1);
1398
1399 let selection = RowSelection::from(vec![
1400 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1409
1410 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1411
1412 let stream = builder
1413 .with_projection(mask.clone())
1414 .with_row_selection(selection.clone())
1415 .build()
1416 .expect("building stream");
1417
1418 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1419
1420 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1421 .unwrap()
1422 .with_projection(mask)
1423 .with_batch_size(1024)
1424 .with_row_selection(selection)
1425 .build()
1426 .unwrap()
1427 .collect::<ArrowResult<Vec<_>>>()
1428 .unwrap();
1429
1430 assert_eq!(async_batches, sync_batches);
1431 }
1432
1433 #[tokio::test]
1434 async fn test_fuzz_async_reader_selection() {
1435 let testdata = arrow::util::test_util::parquet_test_data();
1436 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1437 let data = Bytes::from(std::fs::read(path).unwrap());
1438
1439 let mut rand = rng();
1440
1441 for _ in 0..100 {
1442 let mut expected_rows = 0;
1443 let mut total_rows = 0;
1444 let mut skip = false;
1445 let mut selectors = vec![];
1446
1447 while total_rows < 7300 {
1448 let row_count: usize = rand.random_range(1..100);
1449
1450 let row_count = row_count.min(7300 - total_rows);
1451
1452 selectors.push(RowSelector { row_count, skip });
1453
1454 total_rows += row_count;
1455 if !skip {
1456 expected_rows += row_count;
1457 }
1458
1459 skip = !skip;
1460 }
1461
1462 let selection = RowSelection::from(selectors);
1463
1464 let async_reader = TestReader::new(data.clone());
1465
1466 let options = ArrowReaderOptions::new().with_page_index(true);
1467 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1468 .await
1469 .unwrap();
1470
1471 assert_eq!(builder.metadata().num_row_groups(), 1);
1472
1473 let col_idx: usize = rand.random_range(0..13);
1474 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1475
1476 let stream = builder
1477 .with_projection(mask.clone())
1478 .with_row_selection(selection.clone())
1479 .build()
1480 .expect("building stream");
1481
1482 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1483
1484 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1485
1486 assert_eq!(actual_rows, expected_rows);
1487 }
1488 }
1489
1490 #[tokio::test]
1491 async fn test_async_reader_zero_row_selector() {
1492 let testdata = arrow::util::test_util::parquet_test_data();
1494 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1495 let data = Bytes::from(std::fs::read(path).unwrap());
1496
1497 let mut rand = rng();
1498
1499 let mut expected_rows = 0;
1500 let mut total_rows = 0;
1501 let mut skip = false;
1502 let mut selectors = vec![];
1503
1504 selectors.push(RowSelector {
1505 row_count: 0,
1506 skip: false,
1507 });
1508
1509 while total_rows < 7300 {
1510 let row_count: usize = rand.random_range(1..100);
1511
1512 let row_count = row_count.min(7300 - total_rows);
1513
1514 selectors.push(RowSelector { row_count, skip });
1515
1516 total_rows += row_count;
1517 if !skip {
1518 expected_rows += row_count;
1519 }
1520
1521 skip = !skip;
1522 }
1523
1524 let selection = RowSelection::from(selectors);
1525
1526 let async_reader = TestReader::new(data.clone());
1527
1528 let options = ArrowReaderOptions::new().with_page_index(true);
1529 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1530 .await
1531 .unwrap();
1532
1533 assert_eq!(builder.metadata().num_row_groups(), 1);
1534
1535 let col_idx: usize = rand.random_range(0..13);
1536 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1537
1538 let stream = builder
1539 .with_projection(mask.clone())
1540 .with_row_selection(selection.clone())
1541 .build()
1542 .expect("building stream");
1543
1544 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1545
1546 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1547
1548 assert_eq!(actual_rows, expected_rows);
1549 }
1550
1551 #[tokio::test]
1552 async fn test_row_filter() {
1553 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1554 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1555 let data = RecordBatch::try_from_iter([
1556 ("a", Arc::new(a) as ArrayRef),
1557 ("b", Arc::new(b) as ArrayRef),
1558 ])
1559 .unwrap();
1560
1561 let mut buf = Vec::with_capacity(1024);
1562 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1563 writer.write(&data).unwrap();
1564 writer.close().unwrap();
1565
1566 let data: Bytes = buf.into();
1567 let metadata = ParquetMetaDataReader::new()
1568 .parse_and_finish(&data)
1569 .unwrap();
1570 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1571
1572 let test = TestReader::new(data);
1573 let requests = test.requests.clone();
1574
1575 let a_scalar = StringArray::from_iter_values(["b"]);
1576 let a_filter = ArrowPredicateFn::new(
1577 ProjectionMask::leaves(&parquet_schema, vec![0]),
1578 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1579 );
1580
1581 let filter = RowFilter::new(vec![Box::new(a_filter)]);
1582
1583 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1584 let stream = ParquetRecordBatchStreamBuilder::new(test)
1585 .await
1586 .unwrap()
1587 .with_projection(mask.clone())
1588 .with_batch_size(1024)
1589 .with_row_filter(filter)
1590 .build()
1591 .unwrap();
1592
1593 let batches: Vec<_> = stream.try_collect().await.unwrap();
1594 assert_eq!(batches.len(), 1);
1595
1596 let batch = &batches[0];
1597 assert_eq!(batch.num_columns(), 2);
1598
1599 assert_eq!(
1601 batch.column(0).as_ref(),
1602 &StringArray::from_iter_values(["b", "b", "b"])
1603 );
1604 assert_eq!(
1605 batch.column(1).as_ref(),
1606 &StringArray::from_iter_values(["2", "3", "4"])
1607 );
1608
1609 assert_eq!(requests.lock().unwrap().len(), 2);
1613 }
1614
1615 #[tokio::test]
1616 async fn test_two_row_filters() {
1617 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1618 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1619 let c = Int32Array::from_iter(0..6);
1620 let data = RecordBatch::try_from_iter([
1621 ("a", Arc::new(a) as ArrayRef),
1622 ("b", Arc::new(b) as ArrayRef),
1623 ("c", Arc::new(c) as ArrayRef),
1624 ])
1625 .unwrap();
1626
1627 let mut buf = Vec::with_capacity(1024);
1628 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1629 writer.write(&data).unwrap();
1630 writer.close().unwrap();
1631
1632 let data: Bytes = buf.into();
1633 let metadata = ParquetMetaDataReader::new()
1634 .parse_and_finish(&data)
1635 .unwrap();
1636 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1637
1638 let test = TestReader::new(data);
1639 let requests = test.requests.clone();
1640
1641 let a_scalar = StringArray::from_iter_values(["b"]);
1642 let a_filter = ArrowPredicateFn::new(
1643 ProjectionMask::leaves(&parquet_schema, vec![0]),
1644 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1645 );
1646
1647 let b_scalar = StringArray::from_iter_values(["4"]);
1648 let b_filter = ArrowPredicateFn::new(
1649 ProjectionMask::leaves(&parquet_schema, vec![1]),
1650 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1651 );
1652
1653 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1654
1655 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1656 let stream = ParquetRecordBatchStreamBuilder::new(test)
1657 .await
1658 .unwrap()
1659 .with_projection(mask.clone())
1660 .with_batch_size(1024)
1661 .with_row_filter(filter)
1662 .build()
1663 .unwrap();
1664
1665 let batches: Vec<_> = stream.try_collect().await.unwrap();
1666 assert_eq!(batches.len(), 1);
1667
1668 let batch = &batches[0];
1669 assert_eq!(batch.num_rows(), 1);
1670 assert_eq!(batch.num_columns(), 2);
1671
1672 let col = batch.column(0);
1673 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1674 assert_eq!(val, "b");
1675
1676 let col = batch.column(1);
1677 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1678 assert_eq!(val, 3);
1679
1680 assert_eq!(requests.lock().unwrap().len(), 3);
1685 }
1686
1687 #[tokio::test]
1688 async fn test_limit_multiple_row_groups() {
1689 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1690 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1691 let c = Int32Array::from_iter(0..6);
1692 let data = RecordBatch::try_from_iter([
1693 ("a", Arc::new(a) as ArrayRef),
1694 ("b", Arc::new(b) as ArrayRef),
1695 ("c", Arc::new(c) as ArrayRef),
1696 ])
1697 .unwrap();
1698
1699 let mut buf = Vec::with_capacity(1024);
1700 let props = WriterProperties::builder()
1701 .set_max_row_group_size(3)
1702 .build();
1703 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1704 writer.write(&data).unwrap();
1705 writer.close().unwrap();
1706
1707 let data: Bytes = buf.into();
1708 let metadata = ParquetMetaDataReader::new()
1709 .parse_and_finish(&data)
1710 .unwrap();
1711
1712 assert_eq!(metadata.num_row_groups(), 2);
1713
1714 let test = TestReader::new(data);
1715
1716 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1717 .await
1718 .unwrap()
1719 .with_batch_size(1024)
1720 .with_limit(4)
1721 .build()
1722 .unwrap();
1723
1724 let batches: Vec<_> = stream.try_collect().await.unwrap();
1725 assert_eq!(batches.len(), 2);
1727
1728 let batch = &batches[0];
1729 assert_eq!(batch.num_rows(), 3);
1731 assert_eq!(batch.num_columns(), 3);
1732 let col2 = batch.column(2).as_primitive::<Int32Type>();
1733 assert_eq!(col2.values(), &[0, 1, 2]);
1734
1735 let batch = &batches[1];
1736 assert_eq!(batch.num_rows(), 1);
1738 assert_eq!(batch.num_columns(), 3);
1739 let col2 = batch.column(2).as_primitive::<Int32Type>();
1740 assert_eq!(col2.values(), &[3]);
1741
1742 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1743 .await
1744 .unwrap()
1745 .with_offset(2)
1746 .with_limit(3)
1747 .build()
1748 .unwrap();
1749
1750 let batches: Vec<_> = stream.try_collect().await.unwrap();
1751 assert_eq!(batches.len(), 2);
1753
1754 let batch = &batches[0];
1755 assert_eq!(batch.num_rows(), 1);
1757 assert_eq!(batch.num_columns(), 3);
1758 let col2 = batch.column(2).as_primitive::<Int32Type>();
1759 assert_eq!(col2.values(), &[2]);
1760
1761 let batch = &batches[1];
1762 assert_eq!(batch.num_rows(), 2);
1764 assert_eq!(batch.num_columns(), 3);
1765 let col2 = batch.column(2).as_primitive::<Int32Type>();
1766 assert_eq!(col2.values(), &[3, 4]);
1767
1768 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1769 .await
1770 .unwrap()
1771 .with_offset(4)
1772 .with_limit(20)
1773 .build()
1774 .unwrap();
1775
1776 let batches: Vec<_> = stream.try_collect().await.unwrap();
1777 assert_eq!(batches.len(), 1);
1779
1780 let batch = &batches[0];
1781 assert_eq!(batch.num_rows(), 2);
1783 assert_eq!(batch.num_columns(), 3);
1784 let col2 = batch.column(2).as_primitive::<Int32Type>();
1785 assert_eq!(col2.values(), &[4, 5]);
1786 }
1787
1788 #[tokio::test]
1789 async fn test_row_filter_with_index() {
1790 let testdata = arrow::util::test_util::parquet_test_data();
1791 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1792 let data = Bytes::from(std::fs::read(path).unwrap());
1793
1794 let metadata = ParquetMetaDataReader::new()
1795 .parse_and_finish(&data)
1796 .unwrap();
1797 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1798
1799 assert_eq!(metadata.num_row_groups(), 1);
1800
1801 let async_reader = TestReader::new(data.clone());
1802
1803 let a_filter =
1804 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1805 Ok(batch.column(0).as_boolean().clone())
1806 });
1807
1808 let b_scalar = Int8Array::from(vec![2]);
1809 let b_filter = ArrowPredicateFn::new(
1810 ProjectionMask::leaves(&parquet_schema, vec![2]),
1811 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1812 );
1813
1814 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1815
1816 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1817
1818 let options = ArrowReaderOptions::new().with_page_index(true);
1819 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1820 .await
1821 .unwrap()
1822 .with_projection(mask.clone())
1823 .with_batch_size(1024)
1824 .with_row_filter(filter)
1825 .build()
1826 .unwrap();
1827
1828 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1829
1830 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1831
1832 assert_eq!(total_rows, 730);
1833 }
1834
1835 #[tokio::test]
1836 async fn test_in_memory_row_group_sparse() {
1837 let testdata = arrow::util::test_util::parquet_test_data();
1838 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1839 let data = Bytes::from(std::fs::read(path).unwrap());
1840
1841 let metadata = ParquetMetaDataReader::new()
1842 .with_page_indexes(true)
1843 .parse_and_finish(&data)
1844 .unwrap();
1845
1846 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1847
1848 let mut metadata_builder = metadata.into_builder();
1849 let mut row_groups = metadata_builder.take_row_groups();
1850 row_groups.truncate(1);
1851 let row_group_meta = row_groups.pop().unwrap();
1852
1853 let metadata = metadata_builder
1854 .add_row_group(row_group_meta)
1855 .set_column_index(None)
1856 .set_offset_index(Some(vec![offset_index.clone()]))
1857 .build();
1858
1859 let metadata = Arc::new(metadata);
1860
1861 let num_rows = metadata.row_group(0).num_rows();
1862
1863 assert_eq!(metadata.num_row_groups(), 1);
1864
1865 let async_reader = TestReader::new(data.clone());
1866
1867 let requests = async_reader.requests.clone();
1868 let (_, fields) = parquet_to_arrow_schema_and_fields(
1869 metadata.file_metadata().schema_descr(),
1870 ProjectionMask::all(),
1871 None,
1872 )
1873 .unwrap();
1874
1875 let _schema_desc = metadata.file_metadata().schema_descr();
1876
1877 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1878
1879 let reader_factory = ReaderFactory {
1880 metadata,
1881 fields: fields.map(Arc::new),
1882 input: async_reader,
1883 filter: None,
1884 limit: None,
1885 offset: None,
1886 };
1887
1888 let mut skip = true;
1889 let mut pages = offset_index[0].page_locations.iter().peekable();
1890
1891 let mut selectors = vec![];
1893 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1894 while let Some(page) = pages.next() {
1895 let num_rows = if let Some(next_page) = pages.peek() {
1896 next_page.first_row_index - page.first_row_index
1897 } else {
1898 num_rows - page.first_row_index
1899 };
1900
1901 if skip {
1902 selectors.push(RowSelector::skip(num_rows as usize));
1903 } else {
1904 selectors.push(RowSelector::select(num_rows as usize));
1905 let start = page.offset as usize;
1906 let end = start + page.compressed_page_size as usize;
1907 expected_page_requests.push(start..end);
1908 }
1909 skip = !skip;
1910 }
1911
1912 let selection = RowSelection::from(selectors);
1913
1914 let (_factory, _reader) = reader_factory
1915 .read_row_group(0, Some(selection), projection.clone(), 48)
1916 .await
1917 .expect("reading row group");
1918
1919 let requests = requests.lock().unwrap();
1920
1921 assert_eq!(&requests[..], &expected_page_requests)
1922 }
1923
1924 #[tokio::test]
1925 async fn test_batch_size_overallocate() {
1926 let testdata = arrow::util::test_util::parquet_test_data();
1927 let path = format!("{testdata}/alltypes_plain.parquet");
1929 let data = Bytes::from(std::fs::read(path).unwrap());
1930
1931 let async_reader = TestReader::new(data.clone());
1932
1933 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1934 .await
1935 .unwrap();
1936
1937 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1938
1939 let stream = builder
1940 .with_projection(ProjectionMask::all())
1941 .with_batch_size(1024)
1942 .build()
1943 .unwrap();
1944 assert_ne!(1024, file_rows);
1945 assert_eq!(stream.batch_size, file_rows);
1946 }
1947
1948 #[tokio::test]
1949 async fn test_get_row_group_column_bloom_filter_without_length() {
1950 let testdata = arrow::util::test_util::parquet_test_data();
1951 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1952 let data = Bytes::from(std::fs::read(path).unwrap());
1953 test_get_row_group_column_bloom_filter(data, false).await;
1954 }
1955
1956 #[tokio::test]
1957 async fn test_parquet_record_batch_stream_schema() {
1958 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1959 schema.flattened_fields().iter().map(|f| f.name()).collect()
1960 }
1961
1962 let mut metadata = HashMap::with_capacity(1);
1971 metadata.insert("key".to_string(), "value".to_string());
1972
1973 let nested_struct_array = StructArray::from(vec![
1974 (
1975 Arc::new(Field::new("d", DataType::Utf8, true)),
1976 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1977 ),
1978 (
1979 Arc::new(Field::new("e", DataType::Utf8, true)),
1980 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1981 ),
1982 ]);
1983 let struct_array = StructArray::from(vec![
1984 (
1985 Arc::new(Field::new("a", DataType::Int32, true)),
1986 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1987 ),
1988 (
1989 Arc::new(Field::new("b", DataType::UInt64, true)),
1990 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1991 ),
1992 (
1993 Arc::new(Field::new(
1994 "c",
1995 nested_struct_array.data_type().clone(),
1996 true,
1997 )),
1998 Arc::new(nested_struct_array) as ArrayRef,
1999 ),
2000 ]);
2001
2002 let schema =
2003 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2004 let record_batch = RecordBatch::from(struct_array)
2005 .with_schema(schema.clone())
2006 .unwrap();
2007
2008 let mut file = tempfile().unwrap();
2010 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2011 writer.write(&record_batch).unwrap();
2012 writer.close().unwrap();
2013
2014 let all_fields = ["a", "b", "c", "d", "e"];
2015 let projections = [
2017 (vec![], vec![]),
2018 (vec![0], vec!["a"]),
2019 (vec![0, 1], vec!["a", "b"]),
2020 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2021 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2022 ];
2023
2024 for (indices, expected_projected_names) in projections {
2026 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2027 assert_eq!(get_all_field_names(&builder), all_fields);
2029 assert_eq!(builder.metadata, metadata);
2030 assert_eq!(get_all_field_names(&reader), expected_projected_names);
2032 assert_eq!(reader.metadata, HashMap::default());
2033 assert_eq!(get_all_field_names(&batch), expected_projected_names);
2034 assert_eq!(batch.metadata, HashMap::default());
2035 };
2036
2037 let builder =
2038 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2039 let sync_builder_schema = builder.schema().clone();
2040 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2041 let mut reader = builder.with_projection(mask).build().unwrap();
2042 let sync_reader_schema = reader.schema();
2043 let batch = reader.next().unwrap().unwrap();
2044 let sync_batch_schema = batch.schema();
2045 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2046
2047 let file = tokio::fs::File::from(file.try_clone().unwrap());
2049 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2050 let async_builder_schema = builder.schema().clone();
2051 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2052 let mut reader = builder.with_projection(mask).build().unwrap();
2053 let async_reader_schema = reader.schema().clone();
2054 let batch = reader.next().await.unwrap().unwrap();
2055 let async_batch_schema = batch.schema();
2056 assert_schemas(
2057 async_builder_schema,
2058 async_reader_schema,
2059 async_batch_schema,
2060 );
2061 }
2062 }
2063
2064 #[tokio::test]
2065 async fn test_get_row_group_column_bloom_filter_with_length() {
2066 let testdata = arrow::util::test_util::parquet_test_data();
2068 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2069 let data = Bytes::from(std::fs::read(path).unwrap());
2070 let async_reader = TestReader::new(data.clone());
2071 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2072 .await
2073 .unwrap();
2074 let schema = builder.schema().clone();
2075 let stream = builder.build().unwrap();
2076 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2077
2078 let mut parquet_data = Vec::new();
2079 let props = WriterProperties::builder()
2080 .set_bloom_filter_enabled(true)
2081 .build();
2082 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2083 for batch in batches {
2084 writer.write(&batch).unwrap();
2085 }
2086 writer.close().unwrap();
2087
2088 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2090 }
2091
2092 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2093 let async_reader = TestReader::new(data.clone());
2094
2095 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2096 .await
2097 .unwrap();
2098
2099 let metadata = builder.metadata();
2100 assert_eq!(metadata.num_row_groups(), 1);
2101 let row_group = metadata.row_group(0);
2102 let column = row_group.column(0);
2103 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2104
2105 let sbbf = builder
2106 .get_row_group_column_bloom_filter(0, 0)
2107 .await
2108 .unwrap()
2109 .unwrap();
2110 assert!(sbbf.check(&"Hello"));
2111 assert!(!sbbf.check(&"Hello_Not_Exists"));
2112 }
2113
2114 #[tokio::test]
2115 async fn test_nested_skip() {
2116 let schema = Arc::new(Schema::new(vec![
2117 Field::new("col_1", DataType::UInt64, false),
2118 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2119 ]));
2120
2121 let props = WriterProperties::builder()
2123 .set_data_page_row_count_limit(256)
2124 .set_write_batch_size(256)
2125 .set_max_row_group_size(1024);
2126
2127 let mut file = tempfile().unwrap();
2129 let mut writer =
2130 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2131
2132 let mut builder = ListBuilder::new(StringBuilder::new());
2133 for id in 0..1024 {
2134 match id % 3 {
2135 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2136 1 => builder.append_value([Some(format!("id_{id}"))]),
2137 _ => builder.append_null(),
2138 }
2139 }
2140 let refs = vec![
2141 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2142 Arc::new(builder.finish()) as ArrayRef,
2143 ];
2144
2145 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2146 writer.write(&batch).unwrap();
2147 writer.close().unwrap();
2148
2149 let selections = [
2150 RowSelection::from(vec![
2151 RowSelector::skip(313),
2152 RowSelector::select(1),
2153 RowSelector::skip(709),
2154 RowSelector::select(1),
2155 ]),
2156 RowSelection::from(vec![
2157 RowSelector::skip(255),
2158 RowSelector::select(1),
2159 RowSelector::skip(767),
2160 RowSelector::select(1),
2161 ]),
2162 RowSelection::from(vec![
2163 RowSelector::select(255),
2164 RowSelector::skip(1),
2165 RowSelector::select(767),
2166 RowSelector::skip(1),
2167 ]),
2168 RowSelection::from(vec![
2169 RowSelector::skip(254),
2170 RowSelector::select(1),
2171 RowSelector::select(1),
2172 RowSelector::skip(767),
2173 RowSelector::select(1),
2174 ]),
2175 ];
2176
2177 for selection in selections {
2178 let expected = selection.row_count();
2179 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2181 tokio::fs::File::from_std(file.try_clone().unwrap()),
2182 ArrowReaderOptions::new().with_page_index(true),
2183 )
2184 .await
2185 .unwrap();
2186
2187 reader = reader.with_row_selection(selection);
2188
2189 let mut stream = reader.build().unwrap();
2190
2191 let mut total_rows = 0;
2192 while let Some(rb) = stream.next().await {
2193 let rb = rb.unwrap();
2194 total_rows += rb.num_rows();
2195 }
2196 assert_eq!(total_rows, expected);
2197 }
2198 }
2199
2200 #[tokio::test]
2201 async fn test_row_filter_nested() {
2202 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2203 let b = StructArray::from(vec![
2204 (
2205 Arc::new(Field::new("aa", DataType::Utf8, true)),
2206 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2207 ),
2208 (
2209 Arc::new(Field::new("bb", DataType::Utf8, true)),
2210 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2211 ),
2212 ]);
2213 let c = Int32Array::from_iter(0..6);
2214 let data = RecordBatch::try_from_iter([
2215 ("a", Arc::new(a) as ArrayRef),
2216 ("b", Arc::new(b) as ArrayRef),
2217 ("c", Arc::new(c) as ArrayRef),
2218 ])
2219 .unwrap();
2220
2221 let mut buf = Vec::with_capacity(1024);
2222 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2223 writer.write(&data).unwrap();
2224 writer.close().unwrap();
2225
2226 let data: Bytes = buf.into();
2227 let metadata = ParquetMetaDataReader::new()
2228 .parse_and_finish(&data)
2229 .unwrap();
2230 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2231
2232 let test = TestReader::new(data);
2233 let requests = test.requests.clone();
2234
2235 let a_scalar = StringArray::from_iter_values(["b"]);
2236 let a_filter = ArrowPredicateFn::new(
2237 ProjectionMask::leaves(&parquet_schema, vec![0]),
2238 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2239 );
2240
2241 let b_scalar = StringArray::from_iter_values(["4"]);
2242 let b_filter = ArrowPredicateFn::new(
2243 ProjectionMask::leaves(&parquet_schema, vec![2]),
2244 move |batch| {
2245 let struct_array = batch
2247 .column(0)
2248 .as_any()
2249 .downcast_ref::<StructArray>()
2250 .unwrap();
2251 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2252 },
2253 );
2254
2255 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2256
2257 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2258 let stream = ParquetRecordBatchStreamBuilder::new(test)
2259 .await
2260 .unwrap()
2261 .with_projection(mask.clone())
2262 .with_batch_size(1024)
2263 .with_row_filter(filter)
2264 .build()
2265 .unwrap();
2266
2267 let batches: Vec<_> = stream.try_collect().await.unwrap();
2268 assert_eq!(batches.len(), 1);
2269
2270 let batch = &batches[0];
2271 assert_eq!(batch.num_rows(), 1);
2272 assert_eq!(batch.num_columns(), 2);
2273
2274 let col = batch.column(0);
2275 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2276 assert_eq!(val, "b");
2277
2278 let col = batch.column(1);
2279 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2280 assert_eq!(val, 3);
2281
2282 assert_eq!(requests.lock().unwrap().len(), 3);
2287 }
2288
2289 #[tokio::test]
2290 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2291 use tokio::fs::File;
2292 let testdata = arrow::util::test_util::parquet_test_data();
2293 let path = format!("{testdata}/alltypes_plain.parquet");
2294 let mut file = File::open(&path).await.unwrap();
2295 let file_size = file.metadata().await.unwrap().len();
2296 let mut metadata = ParquetMetaDataReader::new()
2297 .with_page_indexes(true)
2298 .load_and_finish(&mut file, file_size)
2299 .await
2300 .unwrap();
2301
2302 metadata.set_offset_index(Some(vec![]));
2303 let options = ArrowReaderOptions::new().with_page_index(true);
2304 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2305 let reader =
2306 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2307 .build()
2308 .unwrap();
2309
2310 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2311 assert_eq!(result.len(), 1);
2312 }
2313
2314 #[tokio::test]
2315 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2316 use tokio::fs::File;
2317 let testdata = arrow::util::test_util::parquet_test_data();
2318 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2319 let mut file = File::open(&path).await.unwrap();
2320 let file_size = file.metadata().await.unwrap().len();
2321 let metadata = ParquetMetaDataReader::new()
2322 .with_page_indexes(true)
2323 .load_and_finish(&mut file, file_size)
2324 .await
2325 .unwrap();
2326
2327 let options = ArrowReaderOptions::new().with_page_index(true);
2328 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2329 let reader =
2330 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2331 .build()
2332 .unwrap();
2333
2334 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2335 assert_eq!(result.len(), 8);
2336 }
2337
2338 #[tokio::test]
2339 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2340 use tempfile::TempDir;
2341 use tokio::fs::File;
2342 fn write_metadata_to_local_file(
2343 metadata: ParquetMetaData,
2344 file: impl AsRef<std::path::Path>,
2345 ) {
2346 use crate::file::metadata::ParquetMetaDataWriter;
2347 use std::fs::File;
2348 let file = File::create(file).unwrap();
2349 ParquetMetaDataWriter::new(file, &metadata)
2350 .finish()
2351 .unwrap()
2352 }
2353
2354 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2355 use std::fs::File;
2356 let file = File::open(file).unwrap();
2357 ParquetMetaDataReader::new()
2358 .with_page_indexes(true)
2359 .parse_and_finish(&file)
2360 .unwrap()
2361 }
2362
2363 let testdata = arrow::util::test_util::parquet_test_data();
2364 let path = format!("{testdata}/alltypes_plain.parquet");
2365 let mut file = File::open(&path).await.unwrap();
2366 let file_size = file.metadata().await.unwrap().len();
2367 let metadata = ParquetMetaDataReader::new()
2368 .with_page_indexes(true)
2369 .load_and_finish(&mut file, file_size)
2370 .await
2371 .unwrap();
2372
2373 let tempdir = TempDir::new().unwrap();
2374 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2375 write_metadata_to_local_file(metadata, &metadata_path);
2376 let metadata = read_metadata_from_local_file(&metadata_path);
2377
2378 let options = ArrowReaderOptions::new().with_page_index(true);
2379 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2380 let reader =
2381 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2382 .build()
2383 .unwrap();
2384
2385 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2387 assert_eq!(result.len(), 1);
2388 }
2389}