1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{build_array_reader, RowGroups};
42use crate::arrow::arrow_reader::{
43 apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
44 ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::file::FOOTER_SIZE;
57use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
58
59mod metadata;
60pub use metadata::*;
61
62#[cfg(feature = "object_store")]
63mod store;
64
65use crate::arrow::schema::ParquetField;
66#[cfg(feature = "object_store")]
67pub use store::*;
68
69pub trait AsyncFileReader: Send {
83 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
85
86 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
88 async move {
89 let mut result = Vec::with_capacity(ranges.len());
90
91 for range in ranges.into_iter() {
92 let data = self.get_bytes(range).await?;
93 result.push(data);
94 }
95
96 Ok(result)
97 }
98 .boxed()
99 }
100
101 fn get_metadata<'a>(
106 &'a mut self,
107 options: Option<&'a ArrowReaderOptions>,
108 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
109}
110
111impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
113 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
114 self.as_mut().get_bytes(range)
115 }
116
117 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
118 self.as_mut().get_byte_ranges(ranges)
119 }
120
121 fn get_metadata<'a>(
122 &'a mut self,
123 options: Option<&'a ArrowReaderOptions>,
124 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
125 self.as_mut().get_metadata(options)
126 }
127}
128
129impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
130 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
131 async move {
132 self.seek(SeekFrom::Start(range.start as u64)).await?;
133
134 let to_read = range.end - range.start;
135 let mut buffer = Vec::with_capacity(to_read);
136 let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
137 if read != to_read {
138 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
139 }
140
141 Ok(buffer.into())
142 }
143 .boxed()
144 }
145
146 fn get_metadata<'a>(
147 &'a mut self,
148 options: Option<&'a ArrowReaderOptions>,
149 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
150 const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
151 async move {
152 self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
153
154 let mut buf = [0_u8; FOOTER_SIZE];
155 self.read_exact(&mut buf).await?;
156
157 let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
158 let metadata_len = footer.metadata_length();
159
160 self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
161 .await?;
162
163 let mut buf = Vec::with_capacity(metadata_len);
164 self.take(metadata_len as _).read_to_end(&mut buf).await?;
165
166 let metadata_reader = ParquetMetaDataReader::new();
167
168 #[cfg(feature = "encryption")]
169 let metadata_reader = metadata_reader.with_decryption_properties(
170 options.and_then(|o| o.file_decryption_properties.as_ref()),
171 );
172
173 let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?;
174
175 Ok(Arc::new(parquet_metadata))
176 }
177 .boxed()
178 }
179}
180
181impl ArrowReaderMetadata {
182 pub async fn load_async<T: AsyncFileReader>(
192 input: &mut T,
193 options: ArrowReaderOptions,
194 ) -> Result<Self> {
195 let mut metadata = input.get_metadata(Some(&options)).await?;
198
199 if options.page_index
200 && metadata.column_index().is_none()
201 && metadata.offset_index().is_none()
202 {
203 let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
204 let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
205
206 #[cfg(feature = "encryption")]
207 {
208 reader =
209 reader.with_decryption_properties(options.file_decryption_properties.as_ref());
210 }
211
212 reader.load_page_index(input).await?;
213 metadata = Arc::new(reader.finish()?)
214 }
215 Self::try_new(metadata, options)
216 }
217}
218
219#[doc(hidden)]
220pub struct AsyncReader<T>(T);
225
226pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
239
240impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
241 pub async fn new(input: T) -> Result<Self> {
372 Self::new_with_options(input, Default::default()).await
373 }
374
375 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
378 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
379 Ok(Self::new_with_metadata(input, metadata))
380 }
381
382 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
428 Self::new_builder(AsyncReader(input), metadata)
429 }
430
431 pub async fn get_row_group_column_bloom_filter(
437 &mut self,
438 row_group_idx: usize,
439 column_idx: usize,
440 ) -> Result<Option<Sbbf>> {
441 let metadata = self.metadata.row_group(row_group_idx);
442 let column_metadata = metadata.column(column_idx);
443
444 let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
445 offset
446 .try_into()
447 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
448 } else {
449 return Ok(None);
450 };
451
452 let buffer = match column_metadata.bloom_filter_length() {
453 Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
454 None => self
455 .input
456 .0
457 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
458 }
459 .await?;
460
461 let (header, bitset_offset) =
462 chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;
463
464 match header.algorithm {
465 BloomFilterAlgorithm::BLOCK(_) => {
466 }
468 }
469 match header.compression {
470 BloomFilterCompression::UNCOMPRESSED(_) => {
471 }
473 }
474 match header.hash {
475 BloomFilterHash::XXHASH(_) => {
476 }
478 }
479
480 let bitset = match column_metadata.bloom_filter_length() {
481 Some(_) => buffer.slice((bitset_offset as usize - offset)..),
482 None => {
483 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
484 ParquetError::General("Bloom filter length is invalid".to_string())
485 })?;
486 self.input
487 .0
488 .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length)
489 .await?
490 }
491 };
492 Ok(Some(Sbbf::new(&bitset)))
493 }
494
495 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
499 let num_row_groups = self.metadata.row_groups().len();
500
501 let row_groups = match self.row_groups {
502 Some(row_groups) => {
503 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
504 return Err(general_err!(
505 "row group {} out of bounds 0..{}",
506 col,
507 num_row_groups
508 ));
509 }
510 row_groups.into()
511 }
512 None => (0..self.metadata.row_groups().len()).collect(),
513 };
514
515 let batch_size = self
517 .batch_size
518 .min(self.metadata.file_metadata().num_rows() as usize);
519 let reader_factory = ReaderFactory {
520 input: self.input.0,
521 filter: self.filter,
522 metadata: self.metadata.clone(),
523 fields: self.fields,
524 limit: self.limit,
525 offset: self.offset,
526 };
527
528 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
531 Some(DataType::Struct(fields)) => {
532 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
533 }
534 None => Fields::empty(),
535 _ => unreachable!("Must be Struct for root type"),
536 };
537 let schema = Arc::new(Schema::new(projected_fields));
538
539 Ok(ParquetRecordBatchStream {
540 metadata: self.metadata,
541 batch_size,
542 row_groups,
543 projection: self.projection,
544 selection: self.selection,
545 schema,
546 reader_factory: Some(reader_factory),
547 state: StreamState::Init,
548 })
549 }
550}
551
552type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
553
554struct ReaderFactory<T> {
557 metadata: Arc<ParquetMetaData>,
558
559 fields: Option<Arc<ParquetField>>,
560
561 input: T,
562
563 filter: Option<RowFilter>,
564
565 limit: Option<usize>,
566
567 offset: Option<usize>,
568}
569
570impl<T> ReaderFactory<T>
571where
572 T: AsyncFileReader + Send,
573{
574 async fn read_row_group(
578 mut self,
579 row_group_idx: usize,
580 mut selection: Option<RowSelection>,
581 projection: ProjectionMask,
582 batch_size: usize,
583 ) -> ReadResult<T> {
584 let meta = self.metadata.row_group(row_group_idx);
587 let offset_index = self
588 .metadata
589 .offset_index()
590 .filter(|index| !index.is_empty())
592 .map(|x| x[row_group_idx].as_slice());
593
594 let mut row_group = InMemoryRowGroup {
595 row_count: meta.num_rows() as usize,
597 column_chunks: vec![None; meta.columns().len()],
598 offset_index,
599 row_group_idx,
600 metadata: self.metadata.as_ref(),
601 };
602
603 if let Some(filter) = self.filter.as_mut() {
604 for predicate in filter.predicates.iter_mut() {
605 if !selects_any(selection.as_ref()) {
606 return Ok((self, None));
607 }
608
609 let predicate_projection = predicate.projection();
610 row_group
611 .fetch(&mut self.input, predicate_projection, selection.as_ref())
612 .await?;
613
614 let array_reader =
615 build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
616
617 selection = Some(evaluate_predicate(
618 batch_size,
619 array_reader,
620 selection,
621 predicate.as_mut(),
622 )?);
623 }
624 }
625
626 let rows_before = selection
628 .as_ref()
629 .map(|s| s.row_count())
630 .unwrap_or(row_group.row_count);
631
632 if rows_before == 0 {
633 return Ok((self, None));
634 }
635
636 selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
637
638 let rows_after = selection
640 .as_ref()
641 .map(|s| s.row_count())
642 .unwrap_or(row_group.row_count);
643
644 if let Some(offset) = &mut self.offset {
646 *offset = offset.saturating_sub(rows_before - rows_after)
649 }
650
651 if rows_after == 0 {
652 return Ok((self, None));
653 }
654
655 if let Some(limit) = &mut self.limit {
656 *limit -= rows_after;
657 }
658
659 row_group
660 .fetch(&mut self.input, &projection, selection.as_ref())
661 .await?;
662
663 let reader = ParquetRecordBatchReader::new(
664 batch_size,
665 build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
666 selection,
667 );
668
669 Ok((self, Some(reader)))
670 }
671}
672
673enum StreamState<T> {
674 Init,
676 Decoding(ParquetRecordBatchReader),
678 Reading(BoxFuture<'static, ReadResult<T>>),
680 Error,
682}
683
684impl<T> std::fmt::Debug for StreamState<T> {
685 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
686 match self {
687 StreamState::Init => write!(f, "StreamState::Init"),
688 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
689 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
690 StreamState::Error => write!(f, "StreamState::Error"),
691 }
692 }
693}
694
695pub struct ParquetRecordBatchStream<T> {
713 metadata: Arc<ParquetMetaData>,
714
715 schema: SchemaRef,
716
717 row_groups: VecDeque<usize>,
718
719 projection: ProjectionMask,
720
721 batch_size: usize,
722
723 selection: Option<RowSelection>,
724
725 reader_factory: Option<ReaderFactory<T>>,
727
728 state: StreamState<T>,
729}
730
731impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
732 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
733 f.debug_struct("ParquetRecordBatchStream")
734 .field("metadata", &self.metadata)
735 .field("schema", &self.schema)
736 .field("batch_size", &self.batch_size)
737 .field("projection", &self.projection)
738 .field("state", &self.state)
739 .finish()
740 }
741}
742
743impl<T> ParquetRecordBatchStream<T> {
744 pub fn schema(&self) -> &SchemaRef {
749 &self.schema
750 }
751}
752
753impl<T> ParquetRecordBatchStream<T>
754where
755 T: AsyncFileReader + Unpin + Send + 'static,
756{
757 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
771 loop {
772 match &mut self.state {
773 StreamState::Decoding(_) | StreamState::Reading(_) => {
774 return Err(ParquetError::General(
775 "Cannot combine the use of next_row_group with the Stream API".to_string(),
776 ))
777 }
778 StreamState::Init => {
779 let row_group_idx = match self.row_groups.pop_front() {
780 Some(idx) => idx,
781 None => return Ok(None),
782 };
783
784 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
785
786 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
787
788 let reader_factory = self.reader_factory.take().expect("lost reader factory");
789
790 let (reader_factory, maybe_reader) = reader_factory
791 .read_row_group(
792 row_group_idx,
793 selection,
794 self.projection.clone(),
795 self.batch_size,
796 )
797 .await
798 .inspect_err(|_| {
799 self.state = StreamState::Error;
800 })?;
801 self.reader_factory = Some(reader_factory);
802
803 if let Some(reader) = maybe_reader {
804 return Ok(Some(reader));
805 } else {
806 continue;
808 }
809 }
810 StreamState::Error => return Ok(None), }
812 }
813 }
814}
815
816impl<T> Stream for ParquetRecordBatchStream<T>
817where
818 T: AsyncFileReader + Unpin + Send + 'static,
819{
820 type Item = Result<RecordBatch>;
821
822 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
823 loop {
824 match &mut self.state {
825 StreamState::Decoding(batch_reader) => match batch_reader.next() {
826 Some(Ok(batch)) => {
827 return Poll::Ready(Some(Ok(batch)));
828 }
829 Some(Err(e)) => {
830 self.state = StreamState::Error;
831 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
832 }
833 None => self.state = StreamState::Init,
834 },
835 StreamState::Init => {
836 let row_group_idx = match self.row_groups.pop_front() {
837 Some(idx) => idx,
838 None => return Poll::Ready(None),
839 };
840
841 let reader = self.reader_factory.take().expect("lost reader factory");
842
843 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
844
845 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
846
847 let fut = reader
848 .read_row_group(
849 row_group_idx,
850 selection,
851 self.projection.clone(),
852 self.batch_size,
853 )
854 .boxed();
855
856 self.state = StreamState::Reading(fut)
857 }
858 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
859 Ok((reader_factory, maybe_reader)) => {
860 self.reader_factory = Some(reader_factory);
861 match maybe_reader {
862 Some(reader) => self.state = StreamState::Decoding(reader),
864 None => self.state = StreamState::Init,
866 }
867 }
868 Err(e) => {
869 self.state = StreamState::Error;
870 return Poll::Ready(Some(Err(e)));
871 }
872 },
873 StreamState::Error => return Poll::Ready(None), }
875 }
876 }
877}
878
879struct InMemoryRowGroup<'a> {
881 offset_index: Option<&'a [OffsetIndexMetaData]>,
882 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
883 row_count: usize,
884 row_group_idx: usize,
885 metadata: &'a ParquetMetaData,
886}
887
888impl InMemoryRowGroup<'_> {
889 async fn fetch<T: AsyncFileReader + Send>(
891 &mut self,
892 input: &mut T,
893 projection: &ProjectionMask,
894 selection: Option<&RowSelection>,
895 ) -> Result<()> {
896 let metadata = self.metadata.row_group(self.row_group_idx);
897 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
898 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
901
902 let fetch_ranges = self
903 .column_chunks
904 .iter()
905 .zip(metadata.columns())
906 .enumerate()
907 .filter(|&(idx, (chunk, _chunk_meta))| {
908 chunk.is_none() && projection.leaf_included(idx)
909 })
910 .flat_map(|(idx, (_chunk, chunk_meta))| {
911 let mut ranges = vec![];
914 let (start, _len) = chunk_meta.byte_range();
915 match offset_index[idx].page_locations.first() {
916 Some(first) if first.offset as u64 != start => {
917 ranges.push(start as usize..first.offset as usize);
918 }
919 _ => (),
920 }
921
922 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
923 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
924
925 ranges
926 })
927 .collect();
928
929 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
930 let mut page_start_offsets = page_start_offsets.into_iter();
931
932 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
933 if chunk.is_some() || !projection.leaf_included(idx) {
934 continue;
935 }
936
937 if let Some(offsets) = page_start_offsets.next() {
938 let mut chunks = Vec::with_capacity(offsets.len());
939 for _ in 0..offsets.len() {
940 chunks.push(chunk_data.next().unwrap());
941 }
942
943 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
944 length: metadata.column(idx).byte_range().1 as usize,
945 data: offsets.into_iter().zip(chunks.into_iter()).collect(),
946 }))
947 }
948 }
949 } else {
950 let fetch_ranges = self
951 .column_chunks
952 .iter()
953 .enumerate()
954 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
955 .map(|(idx, _chunk)| {
956 let column = metadata.column(idx);
957 let (start, length) = column.byte_range();
958 start as usize..(start + length) as usize
959 })
960 .collect();
961
962 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
963
964 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
965 if chunk.is_some() || !projection.leaf_included(idx) {
966 continue;
967 }
968
969 if let Some(data) = chunk_data.next() {
970 *chunk = Some(Arc::new(ColumnChunkData::Dense {
971 offset: metadata.column(idx).byte_range().0 as usize,
972 data,
973 }));
974 }
975 }
976 }
977
978 Ok(())
979 }
980}
981
982impl RowGroups for InMemoryRowGroup<'_> {
983 fn num_rows(&self) -> usize {
984 self.row_count
985 }
986
987 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
989 match &self.column_chunks[i] {
990 None => Err(ParquetError::General(format!(
991 "Invalid column index {i}, column was not fetched"
992 ))),
993 Some(data) => {
994 let page_locations = self
995 .offset_index
996 .filter(|index| !index.is_empty())
998 .map(|index| index[i].page_locations.clone());
999 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1000 let page_reader = SerializedPageReader::new(
1001 data.clone(),
1002 column_chunk_metadata,
1003 self.row_count,
1004 page_locations,
1005 )?;
1006 let page_reader = page_reader.add_crypto_context(
1007 self.row_group_idx,
1008 i,
1009 self.metadata,
1010 column_chunk_metadata,
1011 )?;
1012
1013 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1014
1015 Ok(Box::new(ColumnChunkIterator {
1016 reader: Some(Ok(page_reader)),
1017 }))
1018 }
1019 }
1020 }
1021}
1022
1023#[derive(Clone)]
1025enum ColumnChunkData {
1026 Sparse {
1028 length: usize,
1030 data: Vec<(usize, Bytes)>,
1033 },
1034 Dense { offset: usize, data: Bytes },
1036}
1037
1038impl ColumnChunkData {
1039 fn get(&self, start: u64) -> Result<Bytes> {
1040 match &self {
1041 ColumnChunkData::Sparse { data, .. } => data
1042 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1043 .map(|idx| data[idx].1.clone())
1044 .map_err(|_| {
1045 ParquetError::General(format!(
1046 "Invalid offset in sparse column chunk data: {start}"
1047 ))
1048 }),
1049 ColumnChunkData::Dense { offset, data } => {
1050 let start = start as usize - *offset;
1051 Ok(data.slice(start..))
1052 }
1053 }
1054 }
1055}
1056
1057impl Length for ColumnChunkData {
1058 fn len(&self) -> u64 {
1059 match &self {
1060 ColumnChunkData::Sparse { length, .. } => *length as u64,
1061 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1062 }
1063 }
1064}
1065
1066impl ChunkReader for ColumnChunkData {
1067 type T = bytes::buf::Reader<Bytes>;
1068
1069 fn get_read(&self, start: u64) -> Result<Self::T> {
1070 Ok(self.get(start)?.reader())
1071 }
1072
1073 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1074 Ok(self.get(start)?.slice(..length))
1075 }
1076}
1077
1078struct ColumnChunkIterator {
1080 reader: Option<Result<Box<dyn PageReader>>>,
1081}
1082
1083impl Iterator for ColumnChunkIterator {
1084 type Item = Result<Box<dyn PageReader>>;
1085
1086 fn next(&mut self) -> Option<Self::Item> {
1087 self.reader.take()
1088 }
1089}
1090
1091impl PageIterator for ColumnChunkIterator {}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096 use crate::arrow::arrow_reader::{
1097 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1098 };
1099 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1100 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1101 use crate::arrow::ArrowWriter;
1102 use crate::file::metadata::ParquetMetaDataReader;
1103 use crate::file::properties::WriterProperties;
1104 use arrow::compute::kernels::cmp::eq;
1105 use arrow::error::Result as ArrowResult;
1106 use arrow_array::builder::{ListBuilder, StringBuilder};
1107 use arrow_array::cast::AsArray;
1108 use arrow_array::types::Int32Type;
1109 use arrow_array::{
1110 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1111 StructArray, UInt64Array,
1112 };
1113 use arrow_schema::{DataType, Field, Schema};
1114 use futures::{StreamExt, TryStreamExt};
1115 use rand::{rng, Rng};
1116 use std::collections::HashMap;
1117 use std::sync::{Arc, Mutex};
1118 use tempfile::tempfile;
1119
1120 #[derive(Clone)]
1121 struct TestReader {
1122 data: Bytes,
1123 metadata: Arc<ParquetMetaData>,
1124 requests: Arc<Mutex<Vec<Range<usize>>>>,
1125 }
1126
1127 impl AsyncFileReader for TestReader {
1128 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1129 self.requests.lock().unwrap().push(range.clone());
1130 futures::future::ready(Ok(self.data.slice(range))).boxed()
1131 }
1132
1133 fn get_metadata<'a>(
1134 &'a mut self,
1135 _options: Option<&'a ArrowReaderOptions>,
1136 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1137 futures::future::ready(Ok(self.metadata.clone())).boxed()
1138 }
1139 }
1140
1141 #[tokio::test]
1142 async fn test_async_reader() {
1143 let testdata = arrow::util::test_util::parquet_test_data();
1144 let path = format!("{testdata}/alltypes_plain.parquet");
1145 let data = Bytes::from(std::fs::read(path).unwrap());
1146
1147 let metadata = ParquetMetaDataReader::new()
1148 .parse_and_finish(&data)
1149 .unwrap();
1150 let metadata = Arc::new(metadata);
1151
1152 assert_eq!(metadata.num_row_groups(), 1);
1153
1154 let async_reader = TestReader {
1155 data: data.clone(),
1156 metadata: metadata.clone(),
1157 requests: Default::default(),
1158 };
1159
1160 let requests = async_reader.requests.clone();
1161 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1162 .await
1163 .unwrap();
1164
1165 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1166 let stream = builder
1167 .with_projection(mask.clone())
1168 .with_batch_size(1024)
1169 .build()
1170 .unwrap();
1171
1172 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1173
1174 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1175 .unwrap()
1176 .with_projection(mask)
1177 .with_batch_size(104)
1178 .build()
1179 .unwrap()
1180 .collect::<ArrowResult<Vec<_>>>()
1181 .unwrap();
1182
1183 assert_eq!(async_batches, sync_batches);
1184
1185 let requests = requests.lock().unwrap();
1186 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1187 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1188
1189 assert_eq!(
1190 &requests[..],
1191 &[
1192 offset_1 as usize..(offset_1 + length_1) as usize,
1193 offset_2 as usize..(offset_2 + length_2) as usize
1194 ]
1195 );
1196 }
1197
1198 #[tokio::test]
1199 async fn test_async_reader_with_next_row_group() {
1200 let testdata = arrow::util::test_util::parquet_test_data();
1201 let path = format!("{testdata}/alltypes_plain.parquet");
1202 let data = Bytes::from(std::fs::read(path).unwrap());
1203
1204 let metadata = ParquetMetaDataReader::new()
1205 .parse_and_finish(&data)
1206 .unwrap();
1207 let metadata = Arc::new(metadata);
1208
1209 assert_eq!(metadata.num_row_groups(), 1);
1210
1211 let async_reader = TestReader {
1212 data: data.clone(),
1213 metadata: metadata.clone(),
1214 requests: Default::default(),
1215 };
1216
1217 let requests = async_reader.requests.clone();
1218 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1219 .await
1220 .unwrap();
1221
1222 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1223 let mut stream = builder
1224 .with_projection(mask.clone())
1225 .with_batch_size(1024)
1226 .build()
1227 .unwrap();
1228
1229 let mut readers = vec![];
1230 while let Some(reader) = stream.next_row_group().await.unwrap() {
1231 readers.push(reader);
1232 }
1233
1234 let async_batches: Vec<_> = readers
1235 .into_iter()
1236 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1237 .collect();
1238
1239 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1240 .unwrap()
1241 .with_projection(mask)
1242 .with_batch_size(104)
1243 .build()
1244 .unwrap()
1245 .collect::<ArrowResult<Vec<_>>>()
1246 .unwrap();
1247
1248 assert_eq!(async_batches, sync_batches);
1249
1250 let requests = requests.lock().unwrap();
1251 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1252 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1253
1254 assert_eq!(
1255 &requests[..],
1256 &[
1257 offset_1 as usize..(offset_1 + length_1) as usize,
1258 offset_2 as usize..(offset_2 + length_2) as usize
1259 ]
1260 );
1261 }
1262
1263 #[tokio::test]
1264 async fn test_async_reader_with_index() {
1265 let testdata = arrow::util::test_util::parquet_test_data();
1266 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1267 let data = Bytes::from(std::fs::read(path).unwrap());
1268
1269 let metadata = ParquetMetaDataReader::new()
1270 .parse_and_finish(&data)
1271 .unwrap();
1272 let metadata = Arc::new(metadata);
1273
1274 assert_eq!(metadata.num_row_groups(), 1);
1275
1276 let async_reader = TestReader {
1277 data: data.clone(),
1278 metadata: metadata.clone(),
1279 requests: Default::default(),
1280 };
1281
1282 let options = ArrowReaderOptions::new().with_page_index(true);
1283 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1284 .await
1285 .unwrap();
1286
1287 let metadata_with_index = builder.metadata();
1289
1290 let offset_index = metadata_with_index.offset_index().unwrap();
1292 let column_index = metadata_with_index.column_index().unwrap();
1293
1294 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1295 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1296
1297 let num_columns = metadata_with_index
1298 .file_metadata()
1299 .schema_descr()
1300 .num_columns();
1301
1302 offset_index
1304 .iter()
1305 .for_each(|x| assert_eq!(x.len(), num_columns));
1306 column_index
1307 .iter()
1308 .for_each(|x| assert_eq!(x.len(), num_columns));
1309
1310 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1311 let stream = builder
1312 .with_projection(mask.clone())
1313 .with_batch_size(1024)
1314 .build()
1315 .unwrap();
1316
1317 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1318
1319 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1320 .unwrap()
1321 .with_projection(mask)
1322 .with_batch_size(1024)
1323 .build()
1324 .unwrap()
1325 .collect::<ArrowResult<Vec<_>>>()
1326 .unwrap();
1327
1328 assert_eq!(async_batches, sync_batches);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_async_reader_with_limit() {
1333 let testdata = arrow::util::test_util::parquet_test_data();
1334 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1335 let data = Bytes::from(std::fs::read(path).unwrap());
1336
1337 let metadata = ParquetMetaDataReader::new()
1338 .parse_and_finish(&data)
1339 .unwrap();
1340 let metadata = Arc::new(metadata);
1341
1342 assert_eq!(metadata.num_row_groups(), 1);
1343
1344 let async_reader = TestReader {
1345 data: data.clone(),
1346 metadata: metadata.clone(),
1347 requests: Default::default(),
1348 };
1349
1350 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1351 .await
1352 .unwrap();
1353
1354 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1355 let stream = builder
1356 .with_projection(mask.clone())
1357 .with_batch_size(1024)
1358 .with_limit(1)
1359 .build()
1360 .unwrap();
1361
1362 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1363
1364 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1365 .unwrap()
1366 .with_projection(mask)
1367 .with_batch_size(1024)
1368 .with_limit(1)
1369 .build()
1370 .unwrap()
1371 .collect::<ArrowResult<Vec<_>>>()
1372 .unwrap();
1373
1374 assert_eq!(async_batches, sync_batches);
1375 }
1376
1377 #[tokio::test]
1378 async fn test_async_reader_skip_pages() {
1379 let testdata = arrow::util::test_util::parquet_test_data();
1380 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1381 let data = Bytes::from(std::fs::read(path).unwrap());
1382
1383 let metadata = ParquetMetaDataReader::new()
1384 .parse_and_finish(&data)
1385 .unwrap();
1386 let metadata = Arc::new(metadata);
1387
1388 assert_eq!(metadata.num_row_groups(), 1);
1389
1390 let async_reader = TestReader {
1391 data: data.clone(),
1392 metadata: metadata.clone(),
1393 requests: Default::default(),
1394 };
1395
1396 let options = ArrowReaderOptions::new().with_page_index(true);
1397 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1398 .await
1399 .unwrap();
1400
1401 let selection = RowSelection::from(vec![
1402 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1411
1412 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1413
1414 let stream = builder
1415 .with_projection(mask.clone())
1416 .with_row_selection(selection.clone())
1417 .build()
1418 .expect("building stream");
1419
1420 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1421
1422 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1423 .unwrap()
1424 .with_projection(mask)
1425 .with_batch_size(1024)
1426 .with_row_selection(selection)
1427 .build()
1428 .unwrap()
1429 .collect::<ArrowResult<Vec<_>>>()
1430 .unwrap();
1431
1432 assert_eq!(async_batches, sync_batches);
1433 }
1434
1435 #[tokio::test]
1436 async fn test_fuzz_async_reader_selection() {
1437 let testdata = arrow::util::test_util::parquet_test_data();
1438 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1439 let data = Bytes::from(std::fs::read(path).unwrap());
1440
1441 let metadata = ParquetMetaDataReader::new()
1442 .parse_and_finish(&data)
1443 .unwrap();
1444 let metadata = Arc::new(metadata);
1445
1446 assert_eq!(metadata.num_row_groups(), 1);
1447
1448 let mut rand = rng();
1449
1450 for _ in 0..100 {
1451 let mut expected_rows = 0;
1452 let mut total_rows = 0;
1453 let mut skip = false;
1454 let mut selectors = vec![];
1455
1456 while total_rows < 7300 {
1457 let row_count: usize = rand.random_range(1..100);
1458
1459 let row_count = row_count.min(7300 - total_rows);
1460
1461 selectors.push(RowSelector { row_count, skip });
1462
1463 total_rows += row_count;
1464 if !skip {
1465 expected_rows += row_count;
1466 }
1467
1468 skip = !skip;
1469 }
1470
1471 let selection = RowSelection::from(selectors);
1472
1473 let async_reader = TestReader {
1474 data: data.clone(),
1475 metadata: metadata.clone(),
1476 requests: Default::default(),
1477 };
1478
1479 let options = ArrowReaderOptions::new().with_page_index(true);
1480 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1481 .await
1482 .unwrap();
1483
1484 let col_idx: usize = rand.random_range(0..13);
1485 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1486
1487 let stream = builder
1488 .with_projection(mask.clone())
1489 .with_row_selection(selection.clone())
1490 .build()
1491 .expect("building stream");
1492
1493 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1494
1495 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1496
1497 assert_eq!(actual_rows, expected_rows);
1498 }
1499 }
1500
1501 #[tokio::test]
1502 async fn test_async_reader_zero_row_selector() {
1503 let testdata = arrow::util::test_util::parquet_test_data();
1505 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1506 let data = Bytes::from(std::fs::read(path).unwrap());
1507
1508 let metadata = ParquetMetaDataReader::new()
1509 .parse_and_finish(&data)
1510 .unwrap();
1511 let metadata = Arc::new(metadata);
1512
1513 assert_eq!(metadata.num_row_groups(), 1);
1514
1515 let mut rand = rng();
1516
1517 let mut expected_rows = 0;
1518 let mut total_rows = 0;
1519 let mut skip = false;
1520 let mut selectors = vec![];
1521
1522 selectors.push(RowSelector {
1523 row_count: 0,
1524 skip: false,
1525 });
1526
1527 while total_rows < 7300 {
1528 let row_count: usize = rand.random_range(1..100);
1529
1530 let row_count = row_count.min(7300 - total_rows);
1531
1532 selectors.push(RowSelector { row_count, skip });
1533
1534 total_rows += row_count;
1535 if !skip {
1536 expected_rows += row_count;
1537 }
1538
1539 skip = !skip;
1540 }
1541
1542 let selection = RowSelection::from(selectors);
1543
1544 let async_reader = TestReader {
1545 data: data.clone(),
1546 metadata: metadata.clone(),
1547 requests: Default::default(),
1548 };
1549
1550 let options = ArrowReaderOptions::new().with_page_index(true);
1551 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1552 .await
1553 .unwrap();
1554
1555 let col_idx: usize = rand.random_range(0..13);
1556 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1557
1558 let stream = builder
1559 .with_projection(mask.clone())
1560 .with_row_selection(selection.clone())
1561 .build()
1562 .expect("building stream");
1563
1564 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1565
1566 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1567
1568 assert_eq!(actual_rows, expected_rows);
1569 }
1570
1571 #[tokio::test]
1572 async fn test_row_filter() {
1573 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1574 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1575 let c = Int32Array::from_iter(0..6);
1576 let data = RecordBatch::try_from_iter([
1577 ("a", Arc::new(a) as ArrayRef),
1578 ("b", Arc::new(b) as ArrayRef),
1579 ("c", Arc::new(c) as ArrayRef),
1580 ])
1581 .unwrap();
1582
1583 let mut buf = Vec::with_capacity(1024);
1584 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1585 writer.write(&data).unwrap();
1586 writer.close().unwrap();
1587
1588 let data: Bytes = buf.into();
1589 let metadata = ParquetMetaDataReader::new()
1590 .parse_and_finish(&data)
1591 .unwrap();
1592 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1593
1594 let test = TestReader {
1595 data,
1596 metadata: Arc::new(metadata),
1597 requests: Default::default(),
1598 };
1599 let requests = test.requests.clone();
1600
1601 let a_scalar = StringArray::from_iter_values(["b"]);
1602 let a_filter = ArrowPredicateFn::new(
1603 ProjectionMask::leaves(&parquet_schema, vec![0]),
1604 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1605 );
1606
1607 let b_scalar = StringArray::from_iter_values(["4"]);
1608 let b_filter = ArrowPredicateFn::new(
1609 ProjectionMask::leaves(&parquet_schema, vec![1]),
1610 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1611 );
1612
1613 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1614
1615 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1616 let stream = ParquetRecordBatchStreamBuilder::new(test)
1617 .await
1618 .unwrap()
1619 .with_projection(mask.clone())
1620 .with_batch_size(1024)
1621 .with_row_filter(filter)
1622 .build()
1623 .unwrap();
1624
1625 let batches: Vec<_> = stream.try_collect().await.unwrap();
1626 assert_eq!(batches.len(), 1);
1627
1628 let batch = &batches[0];
1629 assert_eq!(batch.num_rows(), 1);
1630 assert_eq!(batch.num_columns(), 2);
1631
1632 let col = batch.column(0);
1633 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1634 assert_eq!(val, "b");
1635
1636 let col = batch.column(1);
1637 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1638 assert_eq!(val, 3);
1639
1640 assert_eq!(requests.lock().unwrap().len(), 3);
1642 }
1643
1644 #[tokio::test]
1645 async fn test_limit_multiple_row_groups() {
1646 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1647 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1648 let c = Int32Array::from_iter(0..6);
1649 let data = RecordBatch::try_from_iter([
1650 ("a", Arc::new(a) as ArrayRef),
1651 ("b", Arc::new(b) as ArrayRef),
1652 ("c", Arc::new(c) as ArrayRef),
1653 ])
1654 .unwrap();
1655
1656 let mut buf = Vec::with_capacity(1024);
1657 let props = WriterProperties::builder()
1658 .set_max_row_group_size(3)
1659 .build();
1660 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1661 writer.write(&data).unwrap();
1662 writer.close().unwrap();
1663
1664 let data: Bytes = buf.into();
1665 let metadata = ParquetMetaDataReader::new()
1666 .parse_and_finish(&data)
1667 .unwrap();
1668
1669 assert_eq!(metadata.num_row_groups(), 2);
1670
1671 let test = TestReader {
1672 data,
1673 metadata: Arc::new(metadata),
1674 requests: Default::default(),
1675 };
1676
1677 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1678 .await
1679 .unwrap()
1680 .with_batch_size(1024)
1681 .with_limit(4)
1682 .build()
1683 .unwrap();
1684
1685 let batches: Vec<_> = stream.try_collect().await.unwrap();
1686 assert_eq!(batches.len(), 2);
1688
1689 let batch = &batches[0];
1690 assert_eq!(batch.num_rows(), 3);
1692 assert_eq!(batch.num_columns(), 3);
1693 let col2 = batch.column(2).as_primitive::<Int32Type>();
1694 assert_eq!(col2.values(), &[0, 1, 2]);
1695
1696 let batch = &batches[1];
1697 assert_eq!(batch.num_rows(), 1);
1699 assert_eq!(batch.num_columns(), 3);
1700 let col2 = batch.column(2).as_primitive::<Int32Type>();
1701 assert_eq!(col2.values(), &[3]);
1702
1703 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1704 .await
1705 .unwrap()
1706 .with_offset(2)
1707 .with_limit(3)
1708 .build()
1709 .unwrap();
1710
1711 let batches: Vec<_> = stream.try_collect().await.unwrap();
1712 assert_eq!(batches.len(), 2);
1714
1715 let batch = &batches[0];
1716 assert_eq!(batch.num_rows(), 1);
1718 assert_eq!(batch.num_columns(), 3);
1719 let col2 = batch.column(2).as_primitive::<Int32Type>();
1720 assert_eq!(col2.values(), &[2]);
1721
1722 let batch = &batches[1];
1723 assert_eq!(batch.num_rows(), 2);
1725 assert_eq!(batch.num_columns(), 3);
1726 let col2 = batch.column(2).as_primitive::<Int32Type>();
1727 assert_eq!(col2.values(), &[3, 4]);
1728
1729 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1730 .await
1731 .unwrap()
1732 .with_offset(4)
1733 .with_limit(20)
1734 .build()
1735 .unwrap();
1736
1737 let batches: Vec<_> = stream.try_collect().await.unwrap();
1738 assert_eq!(batches.len(), 1);
1740
1741 let batch = &batches[0];
1742 assert_eq!(batch.num_rows(), 2);
1744 assert_eq!(batch.num_columns(), 3);
1745 let col2 = batch.column(2).as_primitive::<Int32Type>();
1746 assert_eq!(col2.values(), &[4, 5]);
1747 }
1748
1749 #[tokio::test]
1750 async fn test_row_filter_with_index() {
1751 let testdata = arrow::util::test_util::parquet_test_data();
1752 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1753 let data = Bytes::from(std::fs::read(path).unwrap());
1754
1755 let metadata = ParquetMetaDataReader::new()
1756 .parse_and_finish(&data)
1757 .unwrap();
1758 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1759 let metadata = Arc::new(metadata);
1760
1761 assert_eq!(metadata.num_row_groups(), 1);
1762
1763 let async_reader = TestReader {
1764 data: data.clone(),
1765 metadata: metadata.clone(),
1766 requests: Default::default(),
1767 };
1768
1769 let a_filter =
1770 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1771 Ok(batch.column(0).as_boolean().clone())
1772 });
1773
1774 let b_scalar = Int8Array::from(vec![2]);
1775 let b_filter = ArrowPredicateFn::new(
1776 ProjectionMask::leaves(&parquet_schema, vec![2]),
1777 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1778 );
1779
1780 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1781
1782 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1783
1784 let options = ArrowReaderOptions::new().with_page_index(true);
1785 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1786 .await
1787 .unwrap()
1788 .with_projection(mask.clone())
1789 .with_batch_size(1024)
1790 .with_row_filter(filter)
1791 .build()
1792 .unwrap();
1793
1794 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1795
1796 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1797
1798 assert_eq!(total_rows, 730);
1799 }
1800
1801 #[tokio::test]
1802 async fn test_in_memory_row_group_sparse() {
1803 let testdata = arrow::util::test_util::parquet_test_data();
1804 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1805 let data = Bytes::from(std::fs::read(path).unwrap());
1806
1807 let metadata = ParquetMetaDataReader::new()
1808 .with_page_indexes(true)
1809 .parse_and_finish(&data)
1810 .unwrap();
1811
1812 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1813
1814 let mut metadata_builder = metadata.into_builder();
1815 let mut row_groups = metadata_builder.take_row_groups();
1816 row_groups.truncate(1);
1817 let row_group_meta = row_groups.pop().unwrap();
1818
1819 let metadata = metadata_builder
1820 .add_row_group(row_group_meta)
1821 .set_column_index(None)
1822 .set_offset_index(Some(vec![offset_index.clone()]))
1823 .build();
1824
1825 let metadata = Arc::new(metadata);
1826
1827 let num_rows = metadata.row_group(0).num_rows();
1828
1829 assert_eq!(metadata.num_row_groups(), 1);
1830
1831 let async_reader = TestReader {
1832 data: data.clone(),
1833 metadata: metadata.clone(),
1834 requests: Default::default(),
1835 };
1836
1837 let requests = async_reader.requests.clone();
1838 let (_, fields) = parquet_to_arrow_schema_and_fields(
1839 metadata.file_metadata().schema_descr(),
1840 ProjectionMask::all(),
1841 None,
1842 )
1843 .unwrap();
1844
1845 let _schema_desc = metadata.file_metadata().schema_descr();
1846
1847 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1848
1849 let reader_factory = ReaderFactory {
1850 metadata,
1851 fields: fields.map(Arc::new),
1852 input: async_reader,
1853 filter: None,
1854 limit: None,
1855 offset: None,
1856 };
1857
1858 let mut skip = true;
1859 let mut pages = offset_index[0].page_locations.iter().peekable();
1860
1861 let mut selectors = vec![];
1863 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1864 while let Some(page) = pages.next() {
1865 let num_rows = if let Some(next_page) = pages.peek() {
1866 next_page.first_row_index - page.first_row_index
1867 } else {
1868 num_rows - page.first_row_index
1869 };
1870
1871 if skip {
1872 selectors.push(RowSelector::skip(num_rows as usize));
1873 } else {
1874 selectors.push(RowSelector::select(num_rows as usize));
1875 let start = page.offset as usize;
1876 let end = start + page.compressed_page_size as usize;
1877 expected_page_requests.push(start..end);
1878 }
1879 skip = !skip;
1880 }
1881
1882 let selection = RowSelection::from(selectors);
1883
1884 let (_factory, _reader) = reader_factory
1885 .read_row_group(0, Some(selection), projection.clone(), 48)
1886 .await
1887 .expect("reading row group");
1888
1889 let requests = requests.lock().unwrap();
1890
1891 assert_eq!(&requests[..], &expected_page_requests)
1892 }
1893
1894 #[tokio::test]
1895 async fn test_batch_size_overallocate() {
1896 let testdata = arrow::util::test_util::parquet_test_data();
1897 let path = format!("{testdata}/alltypes_plain.parquet");
1899 let data = Bytes::from(std::fs::read(path).unwrap());
1900
1901 let metadata = ParquetMetaDataReader::new()
1902 .parse_and_finish(&data)
1903 .unwrap();
1904 let file_rows = metadata.file_metadata().num_rows() as usize;
1905 let metadata = Arc::new(metadata);
1906
1907 let async_reader = TestReader {
1908 data: data.clone(),
1909 metadata: metadata.clone(),
1910 requests: Default::default(),
1911 };
1912
1913 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1914 .await
1915 .unwrap();
1916
1917 let stream = builder
1918 .with_projection(ProjectionMask::all())
1919 .with_batch_size(1024)
1920 .build()
1921 .unwrap();
1922 assert_ne!(1024, file_rows);
1923 assert_eq!(stream.batch_size, file_rows);
1924 }
1925
1926 #[tokio::test]
1927 async fn test_get_row_group_column_bloom_filter_without_length() {
1928 let testdata = arrow::util::test_util::parquet_test_data();
1929 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1930 let data = Bytes::from(std::fs::read(path).unwrap());
1931 test_get_row_group_column_bloom_filter(data, false).await;
1932 }
1933
1934 #[tokio::test]
1935 async fn test_parquet_record_batch_stream_schema() {
1936 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1937 schema.flattened_fields().iter().map(|f| f.name()).collect()
1938 }
1939
1940 let mut metadata = HashMap::with_capacity(1);
1949 metadata.insert("key".to_string(), "value".to_string());
1950
1951 let nested_struct_array = StructArray::from(vec![
1952 (
1953 Arc::new(Field::new("d", DataType::Utf8, true)),
1954 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1955 ),
1956 (
1957 Arc::new(Field::new("e", DataType::Utf8, true)),
1958 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1959 ),
1960 ]);
1961 let struct_array = StructArray::from(vec![
1962 (
1963 Arc::new(Field::new("a", DataType::Int32, true)),
1964 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1965 ),
1966 (
1967 Arc::new(Field::new("b", DataType::UInt64, true)),
1968 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1969 ),
1970 (
1971 Arc::new(Field::new(
1972 "c",
1973 nested_struct_array.data_type().clone(),
1974 true,
1975 )),
1976 Arc::new(nested_struct_array) as ArrayRef,
1977 ),
1978 ]);
1979
1980 let schema =
1981 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1982 let record_batch = RecordBatch::from(struct_array)
1983 .with_schema(schema.clone())
1984 .unwrap();
1985
1986 let mut file = tempfile().unwrap();
1988 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1989 writer.write(&record_batch).unwrap();
1990 writer.close().unwrap();
1991
1992 let all_fields = ["a", "b", "c", "d", "e"];
1993 let projections = [
1995 (vec![], vec![]),
1996 (vec![0], vec!["a"]),
1997 (vec![0, 1], vec!["a", "b"]),
1998 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1999 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2000 ];
2001
2002 for (indices, expected_projected_names) in projections {
2004 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2005 assert_eq!(get_all_field_names(&builder), all_fields);
2007 assert_eq!(builder.metadata, metadata);
2008 assert_eq!(get_all_field_names(&reader), expected_projected_names);
2010 assert_eq!(reader.metadata, HashMap::default());
2011 assert_eq!(get_all_field_names(&batch), expected_projected_names);
2012 assert_eq!(batch.metadata, HashMap::default());
2013 };
2014
2015 let builder =
2016 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2017 let sync_builder_schema = builder.schema().clone();
2018 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2019 let mut reader = builder.with_projection(mask).build().unwrap();
2020 let sync_reader_schema = reader.schema();
2021 let batch = reader.next().unwrap().unwrap();
2022 let sync_batch_schema = batch.schema();
2023 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2024
2025 let file = tokio::fs::File::from(file.try_clone().unwrap());
2027 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2028 let async_builder_schema = builder.schema().clone();
2029 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2030 let mut reader = builder.with_projection(mask).build().unwrap();
2031 let async_reader_schema = reader.schema().clone();
2032 let batch = reader.next().await.unwrap().unwrap();
2033 let async_batch_schema = batch.schema();
2034 assert_schemas(
2035 async_builder_schema,
2036 async_reader_schema,
2037 async_batch_schema,
2038 );
2039 }
2040 }
2041
2042 #[tokio::test]
2043 async fn test_get_row_group_column_bloom_filter_with_length() {
2044 let testdata = arrow::util::test_util::parquet_test_data();
2046 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2047 let data = Bytes::from(std::fs::read(path).unwrap());
2048 let metadata = ParquetMetaDataReader::new()
2049 .parse_and_finish(&data)
2050 .unwrap();
2051 let metadata = Arc::new(metadata);
2052 let async_reader = TestReader {
2053 data: data.clone(),
2054 metadata: metadata.clone(),
2055 requests: Default::default(),
2056 };
2057 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2058 .await
2059 .unwrap();
2060 let schema = builder.schema().clone();
2061 let stream = builder.build().unwrap();
2062 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2063
2064 let mut parquet_data = Vec::new();
2065 let props = WriterProperties::builder()
2066 .set_bloom_filter_enabled(true)
2067 .build();
2068 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2069 for batch in batches {
2070 writer.write(&batch).unwrap();
2071 }
2072 writer.close().unwrap();
2073
2074 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2076 }
2077
2078 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2079 let metadata = ParquetMetaDataReader::new()
2080 .parse_and_finish(&data)
2081 .unwrap();
2082 let metadata = Arc::new(metadata);
2083
2084 assert_eq!(metadata.num_row_groups(), 1);
2085 let row_group = metadata.row_group(0);
2086 let column = row_group.column(0);
2087 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2088
2089 let async_reader = TestReader {
2090 data: data.clone(),
2091 metadata: metadata.clone(),
2092 requests: Default::default(),
2093 };
2094
2095 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2096 .await
2097 .unwrap();
2098
2099 let sbbf = builder
2100 .get_row_group_column_bloom_filter(0, 0)
2101 .await
2102 .unwrap()
2103 .unwrap();
2104 assert!(sbbf.check(&"Hello"));
2105 assert!(!sbbf.check(&"Hello_Not_Exists"));
2106 }
2107
2108 #[tokio::test]
2109 async fn test_nested_skip() {
2110 let schema = Arc::new(Schema::new(vec![
2111 Field::new("col_1", DataType::UInt64, false),
2112 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2113 ]));
2114
2115 let props = WriterProperties::builder()
2117 .set_data_page_row_count_limit(256)
2118 .set_write_batch_size(256)
2119 .set_max_row_group_size(1024);
2120
2121 let mut file = tempfile().unwrap();
2123 let mut writer =
2124 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2125
2126 let mut builder = ListBuilder::new(StringBuilder::new());
2127 for id in 0..1024 {
2128 match id % 3 {
2129 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2130 1 => builder.append_value([Some(format!("id_{id}"))]),
2131 _ => builder.append_null(),
2132 }
2133 }
2134 let refs = vec![
2135 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2136 Arc::new(builder.finish()) as ArrayRef,
2137 ];
2138
2139 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2140 writer.write(&batch).unwrap();
2141 writer.close().unwrap();
2142
2143 let selections = [
2144 RowSelection::from(vec![
2145 RowSelector::skip(313),
2146 RowSelector::select(1),
2147 RowSelector::skip(709),
2148 RowSelector::select(1),
2149 ]),
2150 RowSelection::from(vec![
2151 RowSelector::skip(255),
2152 RowSelector::select(1),
2153 RowSelector::skip(767),
2154 RowSelector::select(1),
2155 ]),
2156 RowSelection::from(vec![
2157 RowSelector::select(255),
2158 RowSelector::skip(1),
2159 RowSelector::select(767),
2160 RowSelector::skip(1),
2161 ]),
2162 RowSelection::from(vec![
2163 RowSelector::skip(254),
2164 RowSelector::select(1),
2165 RowSelector::select(1),
2166 RowSelector::skip(767),
2167 RowSelector::select(1),
2168 ]),
2169 ];
2170
2171 for selection in selections {
2172 let expected = selection.row_count();
2173 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2175 tokio::fs::File::from_std(file.try_clone().unwrap()),
2176 ArrowReaderOptions::new().with_page_index(true),
2177 )
2178 .await
2179 .unwrap();
2180
2181 reader = reader.with_row_selection(selection);
2182
2183 let mut stream = reader.build().unwrap();
2184
2185 let mut total_rows = 0;
2186 while let Some(rb) = stream.next().await {
2187 let rb = rb.unwrap();
2188 total_rows += rb.num_rows();
2189 }
2190 assert_eq!(total_rows, expected);
2191 }
2192 }
2193
2194 #[tokio::test]
2195 async fn test_row_filter_nested() {
2196 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2197 let b = StructArray::from(vec![
2198 (
2199 Arc::new(Field::new("aa", DataType::Utf8, true)),
2200 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2201 ),
2202 (
2203 Arc::new(Field::new("bb", DataType::Utf8, true)),
2204 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2205 ),
2206 ]);
2207 let c = Int32Array::from_iter(0..6);
2208 let data = RecordBatch::try_from_iter([
2209 ("a", Arc::new(a) as ArrayRef),
2210 ("b", Arc::new(b) as ArrayRef),
2211 ("c", Arc::new(c) as ArrayRef),
2212 ])
2213 .unwrap();
2214
2215 let mut buf = Vec::with_capacity(1024);
2216 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2217 writer.write(&data).unwrap();
2218 writer.close().unwrap();
2219
2220 let data: Bytes = buf.into();
2221 let metadata = ParquetMetaDataReader::new()
2222 .parse_and_finish(&data)
2223 .unwrap();
2224 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2225
2226 let test = TestReader {
2227 data,
2228 metadata: Arc::new(metadata),
2229 requests: Default::default(),
2230 };
2231 let requests = test.requests.clone();
2232
2233 let a_scalar = StringArray::from_iter_values(["b"]);
2234 let a_filter = ArrowPredicateFn::new(
2235 ProjectionMask::leaves(&parquet_schema, vec![0]),
2236 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2237 );
2238
2239 let b_scalar = StringArray::from_iter_values(["4"]);
2240 let b_filter = ArrowPredicateFn::new(
2241 ProjectionMask::leaves(&parquet_schema, vec![2]),
2242 move |batch| {
2243 let struct_array = batch
2245 .column(0)
2246 .as_any()
2247 .downcast_ref::<StructArray>()
2248 .unwrap();
2249 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2250 },
2251 );
2252
2253 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2254
2255 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2256 let stream = ParquetRecordBatchStreamBuilder::new(test)
2257 .await
2258 .unwrap()
2259 .with_projection(mask.clone())
2260 .with_batch_size(1024)
2261 .with_row_filter(filter)
2262 .build()
2263 .unwrap();
2264
2265 let batches: Vec<_> = stream.try_collect().await.unwrap();
2266 assert_eq!(batches.len(), 1);
2267
2268 let batch = &batches[0];
2269 assert_eq!(batch.num_rows(), 1);
2270 assert_eq!(batch.num_columns(), 2);
2271
2272 let col = batch.column(0);
2273 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2274 assert_eq!(val, "b");
2275
2276 let col = batch.column(1);
2277 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2278 assert_eq!(val, 3);
2279
2280 assert_eq!(requests.lock().unwrap().len(), 3);
2282 }
2283
2284 #[tokio::test]
2285 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2286 use tokio::fs::File;
2287 let testdata = arrow::util::test_util::parquet_test_data();
2288 let path = format!("{testdata}/alltypes_plain.parquet");
2289 let mut file = File::open(&path).await.unwrap();
2290 let file_size = file.metadata().await.unwrap().len();
2291 let mut metadata = ParquetMetaDataReader::new()
2292 .with_page_indexes(true)
2293 .load_and_finish(&mut file, file_size as usize)
2294 .await
2295 .unwrap();
2296
2297 metadata.set_offset_index(Some(vec![]));
2298 let options = ArrowReaderOptions::new().with_page_index(true);
2299 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2300 let reader =
2301 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2302 .build()
2303 .unwrap();
2304
2305 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2306 assert_eq!(result.len(), 1);
2307 }
2308
2309 #[tokio::test]
2310 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2311 use tokio::fs::File;
2312 let testdata = arrow::util::test_util::parquet_test_data();
2313 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2314 let mut file = File::open(&path).await.unwrap();
2315 let file_size = file.metadata().await.unwrap().len();
2316 let metadata = ParquetMetaDataReader::new()
2317 .with_page_indexes(true)
2318 .load_and_finish(&mut file, file_size as usize)
2319 .await
2320 .unwrap();
2321
2322 let options = ArrowReaderOptions::new().with_page_index(true);
2323 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2324 let reader =
2325 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2326 .build()
2327 .unwrap();
2328
2329 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2330 assert_eq!(result.len(), 8);
2331 }
2332
2333 #[tokio::test]
2334 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2335 use tempfile::TempDir;
2336 use tokio::fs::File;
2337 fn write_metadata_to_local_file(
2338 metadata: ParquetMetaData,
2339 file: impl AsRef<std::path::Path>,
2340 ) {
2341 use crate::file::metadata::ParquetMetaDataWriter;
2342 use std::fs::File;
2343 let file = File::create(file).unwrap();
2344 ParquetMetaDataWriter::new(file, &metadata)
2345 .finish()
2346 .unwrap()
2347 }
2348
2349 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2350 use std::fs::File;
2351 let file = File::open(file).unwrap();
2352 ParquetMetaDataReader::new()
2353 .with_page_indexes(true)
2354 .parse_and_finish(&file)
2355 .unwrap()
2356 }
2357
2358 let testdata = arrow::util::test_util::parquet_test_data();
2359 let path = format!("{testdata}/alltypes_plain.parquet");
2360 let mut file = File::open(&path).await.unwrap();
2361 let file_size = file.metadata().await.unwrap().len();
2362 let metadata = ParquetMetaDataReader::new()
2363 .with_page_indexes(true)
2364 .load_and_finish(&mut file, file_size as usize)
2365 .await
2366 .unwrap();
2367
2368 let tempdir = TempDir::new().unwrap();
2369 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2370 write_metadata_to_local_file(metadata, &metadata_path);
2371 let metadata = read_metadata_from_local_file(&metadata_path);
2372
2373 let options = ArrowReaderOptions::new().with_page_index(true);
2374 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2375 let reader =
2376 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2377 .build()
2378 .unwrap();
2379
2380 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2382 assert_eq!(result.len(), 1);
2383 }
2384}