1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{build_array_reader, RowGroups};
42use crate::arrow::arrow_reader::{
43 apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata,
44 ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
45};
46use crate::arrow::ProjectionMask;
47
48use crate::bloom_filter::{
49 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
50};
51use crate::column::page::{PageIterator, PageReader};
52use crate::errors::{ParquetError, Result};
53use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
54use crate::file::page_index::offset_index::OffsetIndexMetaData;
55use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
56use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
57
58mod metadata;
59pub use metadata::*;
60
61#[cfg(feature = "object_store")]
62mod store;
63
64use crate::arrow::schema::ParquetField;
65#[cfg(feature = "object_store")]
66pub use store::*;
67
68pub trait AsyncFileReader: Send {
82 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
84
85 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
87 async move {
88 let mut result = Vec::with_capacity(ranges.len());
89
90 for range in ranges.into_iter() {
91 let data = self.get_bytes(range).await?;
92 result.push(data);
93 }
94
95 Ok(result)
96 }
97 .boxed()
98 }
99
100 fn get_metadata<'a>(
117 &'a mut self,
118 options: Option<&'a ArrowReaderOptions>,
119 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
120}
121
122impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
124 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
125 self.as_mut().get_bytes(range)
126 }
127
128 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
129 self.as_mut().get_byte_ranges(ranges)
130 }
131
132 fn get_metadata<'a>(
133 &'a mut self,
134 options: Option<&'a ArrowReaderOptions>,
135 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
136 self.as_mut().get_metadata(options)
137 }
138}
139
140impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
141 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
142 async move {
143 self.seek(SeekFrom::End(-(suffix as i64))).await?;
144 let mut buf = Vec::with_capacity(suffix);
145 self.take(suffix as _).read_to_end(&mut buf).await?;
146 Ok(buf.into())
147 }
148 .boxed()
149 }
150}
151
152impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
153 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
154 async move {
155 self.seek(SeekFrom::Start(range.start)).await?;
156
157 let to_read = range.end - range.start;
158 let mut buffer = Vec::with_capacity(to_read.try_into()?);
159 let read = self.take(to_read).read_to_end(&mut buffer).await?;
160 if read as u64 != to_read {
161 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
162 }
163
164 Ok(buffer.into())
165 }
166 .boxed()
167 }
168
169 fn get_metadata<'a>(
170 &'a mut self,
171 options: Option<&'a ArrowReaderOptions>,
172 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
173 async move {
174 let metadata_reader = ParquetMetaDataReader::new()
175 .with_page_indexes(options.is_some_and(|o| o.page_index));
176
177 #[cfg(feature = "encryption")]
178 let metadata_reader = metadata_reader.with_decryption_properties(
179 options.and_then(|o| o.file_decryption_properties.as_ref()),
180 );
181
182 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183 Ok(Arc::new(parquet_metadata))
184 }
185 .boxed()
186 }
187}
188
189impl ArrowReaderMetadata {
190 pub async fn load_async<T: AsyncFileReader>(
194 input: &mut T,
195 options: ArrowReaderOptions,
196 ) -> Result<Self> {
197 let metadata = input.get_metadata(Some(&options)).await?;
198 Self::try_new(metadata, options)
199 }
200}
201
202#[doc(hidden)]
203pub struct AsyncReader<T>(T);
208
209pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224 pub async fn new(input: T) -> Result<Self> {
355 Self::new_with_options(input, Default::default()).await
356 }
357
358 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362 Ok(Self::new_with_metadata(input, metadata))
363 }
364
365 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411 Self::new_builder(AsyncReader(input), metadata)
412 }
413
414 pub async fn get_row_group_column_bloom_filter(
420 &mut self,
421 row_group_idx: usize,
422 column_idx: usize,
423 ) -> Result<Option<Sbbf>> {
424 let metadata = self.metadata.row_group(row_group_idx);
425 let column_metadata = metadata.column(column_idx);
426
427 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428 offset
429 .try_into()
430 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431 } else {
432 return Ok(None);
433 };
434
435 let buffer = match column_metadata.bloom_filter_length() {
436 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437 None => self
438 .input
439 .0
440 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441 }
442 .await?;
443
444 let (header, bitset_offset) =
445 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447 match header.algorithm {
448 BloomFilterAlgorithm::BLOCK(_) => {
449 }
451 }
452 match header.compression {
453 BloomFilterCompression::UNCOMPRESSED(_) => {
454 }
456 }
457 match header.hash {
458 BloomFilterHash::XXHASH(_) => {
459 }
461 }
462
463 let bitset = match column_metadata.bloom_filter_length() {
464 Some(_) => buffer.slice(
465 (TryInto::<usize>::try_into(bitset_offset).unwrap()
466 - TryInto::<usize>::try_into(offset).unwrap())..,
467 ),
468 None => {
469 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470 ParquetError::General("Bloom filter length is invalid".to_string())
471 })?;
472 self.input
473 .0
474 .get_bytes(bitset_offset..bitset_offset + bitset_length)
475 .await?
476 }
477 };
478 Ok(Some(Sbbf::new(&bitset)))
479 }
480
481 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485 let num_row_groups = self.metadata.row_groups().len();
486
487 let row_groups = match self.row_groups {
488 Some(row_groups) => {
489 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
490 return Err(general_err!(
491 "row group {} out of bounds 0..{}",
492 col,
493 num_row_groups
494 ));
495 }
496 row_groups.into()
497 }
498 None => (0..self.metadata.row_groups().len()).collect(),
499 };
500
501 let batch_size = self
503 .batch_size
504 .min(self.metadata.file_metadata().num_rows() as usize);
505 let reader_factory = ReaderFactory {
506 input: self.input.0,
507 filter: self.filter,
508 metadata: self.metadata.clone(),
509 fields: self.fields,
510 limit: self.limit,
511 offset: self.offset,
512 };
513
514 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
517 Some(DataType::Struct(fields)) => {
518 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
519 }
520 None => Fields::empty(),
521 _ => unreachable!("Must be Struct for root type"),
522 };
523 let schema = Arc::new(Schema::new(projected_fields));
524
525 Ok(ParquetRecordBatchStream {
526 metadata: self.metadata,
527 batch_size,
528 row_groups,
529 projection: self.projection,
530 selection: self.selection,
531 schema,
532 reader_factory: Some(reader_factory),
533 state: StreamState::Init,
534 })
535 }
536}
537
538type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
539
540struct ReaderFactory<T> {
543 metadata: Arc<ParquetMetaData>,
544
545 fields: Option<Arc<ParquetField>>,
546
547 input: T,
548
549 filter: Option<RowFilter>,
550
551 limit: Option<usize>,
552
553 offset: Option<usize>,
554}
555
556impl<T> ReaderFactory<T>
557where
558 T: AsyncFileReader + Send,
559{
560 async fn read_row_group(
564 mut self,
565 row_group_idx: usize,
566 mut selection: Option<RowSelection>,
567 projection: ProjectionMask,
568 batch_size: usize,
569 ) -> ReadResult<T> {
570 let meta = self.metadata.row_group(row_group_idx);
573 let offset_index = self
574 .metadata
575 .offset_index()
576 .filter(|index| !index.is_empty())
578 .map(|x| x[row_group_idx].as_slice());
579
580 let mut row_group = InMemoryRowGroup {
581 row_count: meta.num_rows() as usize,
583 column_chunks: vec![None; meta.columns().len()],
584 offset_index,
585 row_group_idx,
586 metadata: self.metadata.as_ref(),
587 };
588
589 if let Some(filter) = self.filter.as_mut() {
590 for predicate in filter.predicates.iter_mut() {
591 if !selects_any(selection.as_ref()) {
592 return Ok((self, None));
593 }
594
595 let predicate_projection = predicate.projection();
596 row_group
597 .fetch(&mut self.input, predicate_projection, selection.as_ref())
598 .await?;
599
600 let array_reader =
601 build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
602
603 selection = Some(evaluate_predicate(
604 batch_size,
605 array_reader,
606 selection,
607 predicate.as_mut(),
608 )?);
609 }
610 }
611
612 let rows_before = selection
614 .as_ref()
615 .map(|s| s.row_count())
616 .unwrap_or(row_group.row_count);
617
618 if rows_before == 0 {
619 return Ok((self, None));
620 }
621
622 selection = apply_range(selection, row_group.row_count, self.offset, self.limit);
623
624 let rows_after = selection
626 .as_ref()
627 .map(|s| s.row_count())
628 .unwrap_or(row_group.row_count);
629
630 if let Some(offset) = &mut self.offset {
632 *offset = offset.saturating_sub(rows_before - rows_after)
635 }
636
637 if rows_after == 0 {
638 return Ok((self, None));
639 }
640
641 if let Some(limit) = &mut self.limit {
642 *limit -= rows_after;
643 }
644
645 row_group
646 .fetch(&mut self.input, &projection, selection.as_ref())
647 .await?;
648
649 let reader = ParquetRecordBatchReader::new(
650 batch_size,
651 build_array_reader(self.fields.as_deref(), &projection, &row_group)?,
652 selection,
653 );
654
655 Ok((self, Some(reader)))
656 }
657}
658
659enum StreamState<T> {
660 Init,
662 Decoding(ParquetRecordBatchReader),
664 Reading(BoxFuture<'static, ReadResult<T>>),
666 Error,
668}
669
670impl<T> std::fmt::Debug for StreamState<T> {
671 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
672 match self {
673 StreamState::Init => write!(f, "StreamState::Init"),
674 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
675 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
676 StreamState::Error => write!(f, "StreamState::Error"),
677 }
678 }
679}
680
681pub struct ParquetRecordBatchStream<T> {
699 metadata: Arc<ParquetMetaData>,
700
701 schema: SchemaRef,
702
703 row_groups: VecDeque<usize>,
704
705 projection: ProjectionMask,
706
707 batch_size: usize,
708
709 selection: Option<RowSelection>,
710
711 reader_factory: Option<ReaderFactory<T>>,
713
714 state: StreamState<T>,
715}
716
717impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
718 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
719 f.debug_struct("ParquetRecordBatchStream")
720 .field("metadata", &self.metadata)
721 .field("schema", &self.schema)
722 .field("batch_size", &self.batch_size)
723 .field("projection", &self.projection)
724 .field("state", &self.state)
725 .finish()
726 }
727}
728
729impl<T> ParquetRecordBatchStream<T> {
730 pub fn schema(&self) -> &SchemaRef {
735 &self.schema
736 }
737}
738
739impl<T> ParquetRecordBatchStream<T>
740where
741 T: AsyncFileReader + Unpin + Send + 'static,
742{
743 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
757 loop {
758 match &mut self.state {
759 StreamState::Decoding(_) | StreamState::Reading(_) => {
760 return Err(ParquetError::General(
761 "Cannot combine the use of next_row_group with the Stream API".to_string(),
762 ))
763 }
764 StreamState::Init => {
765 let row_group_idx = match self.row_groups.pop_front() {
766 Some(idx) => idx,
767 None => return Ok(None),
768 };
769
770 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
771
772 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
773
774 let reader_factory = self.reader_factory.take().expect("lost reader factory");
775
776 let (reader_factory, maybe_reader) = reader_factory
777 .read_row_group(
778 row_group_idx,
779 selection,
780 self.projection.clone(),
781 self.batch_size,
782 )
783 .await
784 .inspect_err(|_| {
785 self.state = StreamState::Error;
786 })?;
787 self.reader_factory = Some(reader_factory);
788
789 if let Some(reader) = maybe_reader {
790 return Ok(Some(reader));
791 } else {
792 continue;
794 }
795 }
796 StreamState::Error => return Ok(None), }
798 }
799 }
800}
801
802impl<T> Stream for ParquetRecordBatchStream<T>
803where
804 T: AsyncFileReader + Unpin + Send + 'static,
805{
806 type Item = Result<RecordBatch>;
807
808 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
809 loop {
810 match &mut self.state {
811 StreamState::Decoding(batch_reader) => match batch_reader.next() {
812 Some(Ok(batch)) => {
813 return Poll::Ready(Some(Ok(batch)));
814 }
815 Some(Err(e)) => {
816 self.state = StreamState::Error;
817 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
818 }
819 None => self.state = StreamState::Init,
820 },
821 StreamState::Init => {
822 let row_group_idx = match self.row_groups.pop_front() {
823 Some(idx) => idx,
824 None => return Poll::Ready(None),
825 };
826
827 let reader = self.reader_factory.take().expect("lost reader factory");
828
829 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
830
831 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
832
833 let fut = reader
834 .read_row_group(
835 row_group_idx,
836 selection,
837 self.projection.clone(),
838 self.batch_size,
839 )
840 .boxed();
841
842 self.state = StreamState::Reading(fut)
843 }
844 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
845 Ok((reader_factory, maybe_reader)) => {
846 self.reader_factory = Some(reader_factory);
847 match maybe_reader {
848 Some(reader) => self.state = StreamState::Decoding(reader),
850 None => self.state = StreamState::Init,
852 }
853 }
854 Err(e) => {
855 self.state = StreamState::Error;
856 return Poll::Ready(Some(Err(e)));
857 }
858 },
859 StreamState::Error => return Poll::Ready(None), }
861 }
862 }
863}
864
865struct InMemoryRowGroup<'a> {
867 offset_index: Option<&'a [OffsetIndexMetaData]>,
868 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
869 row_count: usize,
870 row_group_idx: usize,
871 metadata: &'a ParquetMetaData,
872}
873
874impl InMemoryRowGroup<'_> {
875 async fn fetch<T: AsyncFileReader + Send>(
877 &mut self,
878 input: &mut T,
879 projection: &ProjectionMask,
880 selection: Option<&RowSelection>,
881 ) -> Result<()> {
882 let metadata = self.metadata.row_group(self.row_group_idx);
883 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
884 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
887
888 let fetch_ranges = self
889 .column_chunks
890 .iter()
891 .zip(metadata.columns())
892 .enumerate()
893 .filter(|&(idx, (chunk, _chunk_meta))| {
894 chunk.is_none() && projection.leaf_included(idx)
895 })
896 .flat_map(|(idx, (_chunk, chunk_meta))| {
897 let mut ranges: Vec<Range<u64>> = vec![];
900 let (start, _len) = chunk_meta.byte_range();
901 match offset_index[idx].page_locations.first() {
902 Some(first) if first.offset as u64 != start => {
903 ranges.push(start..first.offset as u64);
904 }
905 _ => (),
906 }
907
908 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
909 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
910
911 ranges
912 })
913 .collect();
914
915 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
916 let mut page_start_offsets = page_start_offsets.into_iter();
917
918 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
919 if chunk.is_some() || !projection.leaf_included(idx) {
920 continue;
921 }
922
923 if let Some(offsets) = page_start_offsets.next() {
924 let mut chunks = Vec::with_capacity(offsets.len());
925 for _ in 0..offsets.len() {
926 chunks.push(chunk_data.next().unwrap());
927 }
928
929 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
930 length: metadata.column(idx).byte_range().1 as usize,
931 data: offsets
932 .into_iter()
933 .map(|x| x as usize)
934 .zip(chunks.into_iter())
935 .collect(),
936 }))
937 }
938 }
939 } else {
940 let fetch_ranges = self
941 .column_chunks
942 .iter()
943 .enumerate()
944 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
945 .map(|(idx, _chunk)| {
946 let column = metadata.column(idx);
947 let (start, length) = column.byte_range();
948 start..(start + length)
949 })
950 .collect();
951
952 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
953
954 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
955 if chunk.is_some() || !projection.leaf_included(idx) {
956 continue;
957 }
958
959 if let Some(data) = chunk_data.next() {
960 *chunk = Some(Arc::new(ColumnChunkData::Dense {
961 offset: metadata.column(idx).byte_range().0 as usize,
962 data,
963 }));
964 }
965 }
966 }
967
968 Ok(())
969 }
970}
971
972impl RowGroups for InMemoryRowGroup<'_> {
973 fn num_rows(&self) -> usize {
974 self.row_count
975 }
976
977 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
979 match &self.column_chunks[i] {
980 None => Err(ParquetError::General(format!(
981 "Invalid column index {i}, column was not fetched"
982 ))),
983 Some(data) => {
984 let page_locations = self
985 .offset_index
986 .filter(|index| !index.is_empty())
988 .map(|index| index[i].page_locations.clone());
989 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
990 let page_reader = SerializedPageReader::new(
991 data.clone(),
992 column_chunk_metadata,
993 self.row_count,
994 page_locations,
995 )?;
996 let page_reader = page_reader.add_crypto_context(
997 self.row_group_idx,
998 i,
999 self.metadata,
1000 column_chunk_metadata,
1001 )?;
1002
1003 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1004
1005 Ok(Box::new(ColumnChunkIterator {
1006 reader: Some(Ok(page_reader)),
1007 }))
1008 }
1009 }
1010 }
1011}
1012
1013#[derive(Clone)]
1015enum ColumnChunkData {
1016 Sparse {
1018 length: usize,
1020 data: Vec<(usize, Bytes)>,
1023 },
1024 Dense { offset: usize, data: Bytes },
1026}
1027
1028impl ColumnChunkData {
1029 fn get(&self, start: u64) -> Result<Bytes> {
1030 match &self {
1031 ColumnChunkData::Sparse { data, .. } => data
1032 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1033 .map(|idx| data[idx].1.clone())
1034 .map_err(|_| {
1035 ParquetError::General(format!(
1036 "Invalid offset in sparse column chunk data: {start}"
1037 ))
1038 }),
1039 ColumnChunkData::Dense { offset, data } => {
1040 let start = start as usize - *offset;
1041 Ok(data.slice(start..))
1042 }
1043 }
1044 }
1045}
1046
1047impl Length for ColumnChunkData {
1048 fn len(&self) -> u64 {
1049 match &self {
1050 ColumnChunkData::Sparse { length, .. } => *length as u64,
1051 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1052 }
1053 }
1054}
1055
1056impl ChunkReader for ColumnChunkData {
1057 type T = bytes::buf::Reader<Bytes>;
1058
1059 fn get_read(&self, start: u64) -> Result<Self::T> {
1060 Ok(self.get(start)?.reader())
1061 }
1062
1063 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1064 Ok(self.get(start)?.slice(..length))
1065 }
1066}
1067
1068struct ColumnChunkIterator {
1070 reader: Option<Result<Box<dyn PageReader>>>,
1071}
1072
1073impl Iterator for ColumnChunkIterator {
1074 type Item = Result<Box<dyn PageReader>>;
1075
1076 fn next(&mut self) -> Option<Self::Item> {
1077 self.reader.take()
1078 }
1079}
1080
1081impl PageIterator for ColumnChunkIterator {}
1082
1083#[cfg(test)]
1084mod tests {
1085 use super::*;
1086 use crate::arrow::arrow_reader::{
1087 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1088 };
1089 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1090 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1091 use crate::arrow::ArrowWriter;
1092 use crate::file::metadata::ParquetMetaDataReader;
1093 use crate::file::properties::WriterProperties;
1094 use arrow::compute::kernels::cmp::eq;
1095 use arrow::error::Result as ArrowResult;
1096 use arrow_array::builder::{ListBuilder, StringBuilder};
1097 use arrow_array::cast::AsArray;
1098 use arrow_array::types::Int32Type;
1099 use arrow_array::{
1100 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1101 StructArray, UInt64Array,
1102 };
1103 use arrow_schema::{DataType, Field, Schema};
1104 use futures::{StreamExt, TryStreamExt};
1105 use rand::{rng, Rng};
1106 use std::collections::HashMap;
1107 use std::sync::{Arc, Mutex};
1108 use tempfile::tempfile;
1109
1110 #[derive(Clone)]
1111 struct TestReader {
1112 data: Bytes,
1113 metadata: Option<Arc<ParquetMetaData>>,
1114 requests: Arc<Mutex<Vec<Range<usize>>>>,
1115 }
1116
1117 impl AsyncFileReader for TestReader {
1118 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1119 let range = range.clone();
1120 self.requests
1121 .lock()
1122 .unwrap()
1123 .push(range.start as usize..range.end as usize);
1124 futures::future::ready(Ok(self
1125 .data
1126 .slice(range.start as usize..range.end as usize)))
1127 .boxed()
1128 }
1129
1130 fn get_metadata<'a>(
1131 &'a mut self,
1132 options: Option<&'a ArrowReaderOptions>,
1133 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1134 let metadata_reader = ParquetMetaDataReader::new()
1135 .with_page_indexes(options.is_some_and(|o| o.page_index));
1136 self.metadata = Some(Arc::new(
1137 metadata_reader.parse_and_finish(&self.data).unwrap(),
1138 ));
1139 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1140 }
1141 }
1142
1143 #[tokio::test]
1144 async fn test_async_reader() {
1145 let testdata = arrow::util::test_util::parquet_test_data();
1146 let path = format!("{testdata}/alltypes_plain.parquet");
1147 let data = Bytes::from(std::fs::read(path).unwrap());
1148
1149 let async_reader = TestReader {
1150 data: data.clone(),
1151 metadata: Default::default(),
1152 requests: Default::default(),
1153 };
1154
1155 let requests = async_reader.requests.clone();
1156 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1157 .await
1158 .unwrap();
1159
1160 let metadata = builder.metadata().clone();
1161 assert_eq!(metadata.num_row_groups(), 1);
1162
1163 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1164 let stream = builder
1165 .with_projection(mask.clone())
1166 .with_batch_size(1024)
1167 .build()
1168 .unwrap();
1169
1170 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1171
1172 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1173 .unwrap()
1174 .with_projection(mask)
1175 .with_batch_size(104)
1176 .build()
1177 .unwrap()
1178 .collect::<ArrowResult<Vec<_>>>()
1179 .unwrap();
1180
1181 assert_eq!(async_batches, sync_batches);
1182
1183 let requests = requests.lock().unwrap();
1184 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1185 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1186
1187 assert_eq!(
1188 &requests[..],
1189 &[
1190 offset_1 as usize..(offset_1 + length_1) as usize,
1191 offset_2 as usize..(offset_2 + length_2) as usize
1192 ]
1193 );
1194 }
1195
1196 #[tokio::test]
1197 async fn test_async_reader_with_next_row_group() {
1198 let testdata = arrow::util::test_util::parquet_test_data();
1199 let path = format!("{testdata}/alltypes_plain.parquet");
1200 let data = Bytes::from(std::fs::read(path).unwrap());
1201
1202 let async_reader = TestReader {
1203 data: data.clone(),
1204 metadata: Default::default(),
1205 requests: Default::default(),
1206 };
1207
1208 let requests = async_reader.requests.clone();
1209 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1210 .await
1211 .unwrap();
1212
1213 let metadata = builder.metadata().clone();
1214 assert_eq!(metadata.num_row_groups(), 1);
1215
1216 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1217 let mut stream = builder
1218 .with_projection(mask.clone())
1219 .with_batch_size(1024)
1220 .build()
1221 .unwrap();
1222
1223 let mut readers = vec![];
1224 while let Some(reader) = stream.next_row_group().await.unwrap() {
1225 readers.push(reader);
1226 }
1227
1228 let async_batches: Vec<_> = readers
1229 .into_iter()
1230 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1231 .collect();
1232
1233 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1234 .unwrap()
1235 .with_projection(mask)
1236 .with_batch_size(104)
1237 .build()
1238 .unwrap()
1239 .collect::<ArrowResult<Vec<_>>>()
1240 .unwrap();
1241
1242 assert_eq!(async_batches, sync_batches);
1243
1244 let requests = requests.lock().unwrap();
1245 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1246 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1247
1248 assert_eq!(
1249 &requests[..],
1250 &[
1251 offset_1 as usize..(offset_1 + length_1) as usize,
1252 offset_2 as usize..(offset_2 + length_2) as usize
1253 ]
1254 );
1255 }
1256
1257 #[tokio::test]
1258 async fn test_async_reader_with_index() {
1259 let testdata = arrow::util::test_util::parquet_test_data();
1260 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1261 let data = Bytes::from(std::fs::read(path).unwrap());
1262
1263 let async_reader = TestReader {
1264 data: data.clone(),
1265 metadata: Default::default(),
1266 requests: Default::default(),
1267 };
1268
1269 let options = ArrowReaderOptions::new().with_page_index(true);
1270 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1271 .await
1272 .unwrap();
1273
1274 let metadata_with_index = builder.metadata();
1276 assert_eq!(metadata_with_index.num_row_groups(), 1);
1277
1278 let offset_index = metadata_with_index.offset_index().unwrap();
1280 let column_index = metadata_with_index.column_index().unwrap();
1281
1282 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1283 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1284
1285 let num_columns = metadata_with_index
1286 .file_metadata()
1287 .schema_descr()
1288 .num_columns();
1289
1290 offset_index
1292 .iter()
1293 .for_each(|x| assert_eq!(x.len(), num_columns));
1294 column_index
1295 .iter()
1296 .for_each(|x| assert_eq!(x.len(), num_columns));
1297
1298 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1299 let stream = builder
1300 .with_projection(mask.clone())
1301 .with_batch_size(1024)
1302 .build()
1303 .unwrap();
1304
1305 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1306
1307 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1308 .unwrap()
1309 .with_projection(mask)
1310 .with_batch_size(1024)
1311 .build()
1312 .unwrap()
1313 .collect::<ArrowResult<Vec<_>>>()
1314 .unwrap();
1315
1316 assert_eq!(async_batches, sync_batches);
1317 }
1318
1319 #[tokio::test]
1320 async fn test_async_reader_with_limit() {
1321 let testdata = arrow::util::test_util::parquet_test_data();
1322 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1323 let data = Bytes::from(std::fs::read(path).unwrap());
1324
1325 let metadata = ParquetMetaDataReader::new()
1326 .parse_and_finish(&data)
1327 .unwrap();
1328 let metadata = Arc::new(metadata);
1329
1330 assert_eq!(metadata.num_row_groups(), 1);
1331
1332 let async_reader = TestReader {
1333 data: data.clone(),
1334 metadata: Default::default(),
1335 requests: Default::default(),
1336 };
1337
1338 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1339 .await
1340 .unwrap();
1341
1342 assert_eq!(builder.metadata().num_row_groups(), 1);
1343
1344 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1345 let stream = builder
1346 .with_projection(mask.clone())
1347 .with_batch_size(1024)
1348 .with_limit(1)
1349 .build()
1350 .unwrap();
1351
1352 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1353
1354 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1355 .unwrap()
1356 .with_projection(mask)
1357 .with_batch_size(1024)
1358 .with_limit(1)
1359 .build()
1360 .unwrap()
1361 .collect::<ArrowResult<Vec<_>>>()
1362 .unwrap();
1363
1364 assert_eq!(async_batches, sync_batches);
1365 }
1366
1367 #[tokio::test]
1368 async fn test_async_reader_skip_pages() {
1369 let testdata = arrow::util::test_util::parquet_test_data();
1370 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1371 let data = Bytes::from(std::fs::read(path).unwrap());
1372
1373 let async_reader = TestReader {
1374 data: data.clone(),
1375 metadata: Default::default(),
1376 requests: Default::default(),
1377 };
1378
1379 let options = ArrowReaderOptions::new().with_page_index(true);
1380 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1381 .await
1382 .unwrap();
1383
1384 assert_eq!(builder.metadata().num_row_groups(), 1);
1385
1386 let selection = RowSelection::from(vec![
1387 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1396
1397 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1398
1399 let stream = builder
1400 .with_projection(mask.clone())
1401 .with_row_selection(selection.clone())
1402 .build()
1403 .expect("building stream");
1404
1405 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1406
1407 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1408 .unwrap()
1409 .with_projection(mask)
1410 .with_batch_size(1024)
1411 .with_row_selection(selection)
1412 .build()
1413 .unwrap()
1414 .collect::<ArrowResult<Vec<_>>>()
1415 .unwrap();
1416
1417 assert_eq!(async_batches, sync_batches);
1418 }
1419
1420 #[tokio::test]
1421 async fn test_fuzz_async_reader_selection() {
1422 let testdata = arrow::util::test_util::parquet_test_data();
1423 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1424 let data = Bytes::from(std::fs::read(path).unwrap());
1425
1426 let mut rand = rng();
1427
1428 for _ in 0..100 {
1429 let mut expected_rows = 0;
1430 let mut total_rows = 0;
1431 let mut skip = false;
1432 let mut selectors = vec![];
1433
1434 while total_rows < 7300 {
1435 let row_count: usize = rand.random_range(1..100);
1436
1437 let row_count = row_count.min(7300 - total_rows);
1438
1439 selectors.push(RowSelector { row_count, skip });
1440
1441 total_rows += row_count;
1442 if !skip {
1443 expected_rows += row_count;
1444 }
1445
1446 skip = !skip;
1447 }
1448
1449 let selection = RowSelection::from(selectors);
1450
1451 let async_reader = TestReader {
1452 data: data.clone(),
1453 metadata: Default::default(),
1454 requests: Default::default(),
1455 };
1456
1457 let options = ArrowReaderOptions::new().with_page_index(true);
1458 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1459 .await
1460 .unwrap();
1461
1462 assert_eq!(builder.metadata().num_row_groups(), 1);
1463
1464 let col_idx: usize = rand.random_range(0..13);
1465 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1466
1467 let stream = builder
1468 .with_projection(mask.clone())
1469 .with_row_selection(selection.clone())
1470 .build()
1471 .expect("building stream");
1472
1473 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1474
1475 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1476
1477 assert_eq!(actual_rows, expected_rows);
1478 }
1479 }
1480
1481 #[tokio::test]
1482 async fn test_async_reader_zero_row_selector() {
1483 let testdata = arrow::util::test_util::parquet_test_data();
1485 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1486 let data = Bytes::from(std::fs::read(path).unwrap());
1487
1488 let mut rand = rng();
1489
1490 let mut expected_rows = 0;
1491 let mut total_rows = 0;
1492 let mut skip = false;
1493 let mut selectors = vec![];
1494
1495 selectors.push(RowSelector {
1496 row_count: 0,
1497 skip: false,
1498 });
1499
1500 while total_rows < 7300 {
1501 let row_count: usize = rand.random_range(1..100);
1502
1503 let row_count = row_count.min(7300 - total_rows);
1504
1505 selectors.push(RowSelector { row_count, skip });
1506
1507 total_rows += row_count;
1508 if !skip {
1509 expected_rows += row_count;
1510 }
1511
1512 skip = !skip;
1513 }
1514
1515 let selection = RowSelection::from(selectors);
1516
1517 let async_reader = TestReader {
1518 data: data.clone(),
1519 metadata: Default::default(),
1520 requests: Default::default(),
1521 };
1522
1523 let options = ArrowReaderOptions::new().with_page_index(true);
1524 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1525 .await
1526 .unwrap();
1527
1528 assert_eq!(builder.metadata().num_row_groups(), 1);
1529
1530 let col_idx: usize = rand.random_range(0..13);
1531 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1532
1533 let stream = builder
1534 .with_projection(mask.clone())
1535 .with_row_selection(selection.clone())
1536 .build()
1537 .expect("building stream");
1538
1539 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1540
1541 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1542
1543 assert_eq!(actual_rows, expected_rows);
1544 }
1545
1546 #[tokio::test]
1547 async fn test_row_filter() {
1548 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1549 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1550 let c = Int32Array::from_iter(0..6);
1551 let data = RecordBatch::try_from_iter([
1552 ("a", Arc::new(a) as ArrayRef),
1553 ("b", Arc::new(b) as ArrayRef),
1554 ("c", Arc::new(c) as ArrayRef),
1555 ])
1556 .unwrap();
1557
1558 let mut buf = Vec::with_capacity(1024);
1559 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1560 writer.write(&data).unwrap();
1561 writer.close().unwrap();
1562
1563 let data: Bytes = buf.into();
1564 let metadata = ParquetMetaDataReader::new()
1565 .parse_and_finish(&data)
1566 .unwrap();
1567 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1568
1569 let test = TestReader {
1570 data,
1571 metadata: Default::default(),
1572 requests: Default::default(),
1573 };
1574 let requests = test.requests.clone();
1575
1576 let a_scalar = StringArray::from_iter_values(["b"]);
1577 let a_filter = ArrowPredicateFn::new(
1578 ProjectionMask::leaves(&parquet_schema, vec![0]),
1579 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1580 );
1581
1582 let b_scalar = StringArray::from_iter_values(["4"]);
1583 let b_filter = ArrowPredicateFn::new(
1584 ProjectionMask::leaves(&parquet_schema, vec![1]),
1585 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1586 );
1587
1588 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1589
1590 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1591 let stream = ParquetRecordBatchStreamBuilder::new(test)
1592 .await
1593 .unwrap()
1594 .with_projection(mask.clone())
1595 .with_batch_size(1024)
1596 .with_row_filter(filter)
1597 .build()
1598 .unwrap();
1599
1600 let batches: Vec<_> = stream.try_collect().await.unwrap();
1601 assert_eq!(batches.len(), 1);
1602
1603 let batch = &batches[0];
1604 assert_eq!(batch.num_rows(), 1);
1605 assert_eq!(batch.num_columns(), 2);
1606
1607 let col = batch.column(0);
1608 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1609 assert_eq!(val, "b");
1610
1611 let col = batch.column(1);
1612 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1613 assert_eq!(val, 3);
1614
1615 assert_eq!(requests.lock().unwrap().len(), 3);
1617 }
1618
1619 #[tokio::test]
1620 async fn test_limit_multiple_row_groups() {
1621 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1622 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1623 let c = Int32Array::from_iter(0..6);
1624 let data = RecordBatch::try_from_iter([
1625 ("a", Arc::new(a) as ArrayRef),
1626 ("b", Arc::new(b) as ArrayRef),
1627 ("c", Arc::new(c) as ArrayRef),
1628 ])
1629 .unwrap();
1630
1631 let mut buf = Vec::with_capacity(1024);
1632 let props = WriterProperties::builder()
1633 .set_max_row_group_size(3)
1634 .build();
1635 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1636 writer.write(&data).unwrap();
1637 writer.close().unwrap();
1638
1639 let data: Bytes = buf.into();
1640 let metadata = ParquetMetaDataReader::new()
1641 .parse_and_finish(&data)
1642 .unwrap();
1643
1644 assert_eq!(metadata.num_row_groups(), 2);
1645
1646 let test = TestReader {
1647 data,
1648 metadata: Default::default(),
1649 requests: Default::default(),
1650 };
1651
1652 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1653 .await
1654 .unwrap()
1655 .with_batch_size(1024)
1656 .with_limit(4)
1657 .build()
1658 .unwrap();
1659
1660 let batches: Vec<_> = stream.try_collect().await.unwrap();
1661 assert_eq!(batches.len(), 2);
1663
1664 let batch = &batches[0];
1665 assert_eq!(batch.num_rows(), 3);
1667 assert_eq!(batch.num_columns(), 3);
1668 let col2 = batch.column(2).as_primitive::<Int32Type>();
1669 assert_eq!(col2.values(), &[0, 1, 2]);
1670
1671 let batch = &batches[1];
1672 assert_eq!(batch.num_rows(), 1);
1674 assert_eq!(batch.num_columns(), 3);
1675 let col2 = batch.column(2).as_primitive::<Int32Type>();
1676 assert_eq!(col2.values(), &[3]);
1677
1678 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1679 .await
1680 .unwrap()
1681 .with_offset(2)
1682 .with_limit(3)
1683 .build()
1684 .unwrap();
1685
1686 let batches: Vec<_> = stream.try_collect().await.unwrap();
1687 assert_eq!(batches.len(), 2);
1689
1690 let batch = &batches[0];
1691 assert_eq!(batch.num_rows(), 1);
1693 assert_eq!(batch.num_columns(), 3);
1694 let col2 = batch.column(2).as_primitive::<Int32Type>();
1695 assert_eq!(col2.values(), &[2]);
1696
1697 let batch = &batches[1];
1698 assert_eq!(batch.num_rows(), 2);
1700 assert_eq!(batch.num_columns(), 3);
1701 let col2 = batch.column(2).as_primitive::<Int32Type>();
1702 assert_eq!(col2.values(), &[3, 4]);
1703
1704 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1705 .await
1706 .unwrap()
1707 .with_offset(4)
1708 .with_limit(20)
1709 .build()
1710 .unwrap();
1711
1712 let batches: Vec<_> = stream.try_collect().await.unwrap();
1713 assert_eq!(batches.len(), 1);
1715
1716 let batch = &batches[0];
1717 assert_eq!(batch.num_rows(), 2);
1719 assert_eq!(batch.num_columns(), 3);
1720 let col2 = batch.column(2).as_primitive::<Int32Type>();
1721 assert_eq!(col2.values(), &[4, 5]);
1722 }
1723
1724 #[tokio::test]
1725 async fn test_row_filter_with_index() {
1726 let testdata = arrow::util::test_util::parquet_test_data();
1727 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1728 let data = Bytes::from(std::fs::read(path).unwrap());
1729
1730 let metadata = ParquetMetaDataReader::new()
1731 .parse_and_finish(&data)
1732 .unwrap();
1733 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1734
1735 assert_eq!(metadata.num_row_groups(), 1);
1736
1737 let async_reader = TestReader {
1738 data: data.clone(),
1739 metadata: Default::default(),
1740 requests: Default::default(),
1741 };
1742
1743 let a_filter =
1744 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1745 Ok(batch.column(0).as_boolean().clone())
1746 });
1747
1748 let b_scalar = Int8Array::from(vec![2]);
1749 let b_filter = ArrowPredicateFn::new(
1750 ProjectionMask::leaves(&parquet_schema, vec![2]),
1751 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1752 );
1753
1754 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1755
1756 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1757
1758 let options = ArrowReaderOptions::new().with_page_index(true);
1759 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1760 .await
1761 .unwrap()
1762 .with_projection(mask.clone())
1763 .with_batch_size(1024)
1764 .with_row_filter(filter)
1765 .build()
1766 .unwrap();
1767
1768 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1769
1770 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1771
1772 assert_eq!(total_rows, 730);
1773 }
1774
1775 #[tokio::test]
1776 async fn test_in_memory_row_group_sparse() {
1777 let testdata = arrow::util::test_util::parquet_test_data();
1778 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1779 let data = Bytes::from(std::fs::read(path).unwrap());
1780
1781 let metadata = ParquetMetaDataReader::new()
1782 .with_page_indexes(true)
1783 .parse_and_finish(&data)
1784 .unwrap();
1785
1786 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1787
1788 let mut metadata_builder = metadata.into_builder();
1789 let mut row_groups = metadata_builder.take_row_groups();
1790 row_groups.truncate(1);
1791 let row_group_meta = row_groups.pop().unwrap();
1792
1793 let metadata = metadata_builder
1794 .add_row_group(row_group_meta)
1795 .set_column_index(None)
1796 .set_offset_index(Some(vec![offset_index.clone()]))
1797 .build();
1798
1799 let metadata = Arc::new(metadata);
1800
1801 let num_rows = metadata.row_group(0).num_rows();
1802
1803 assert_eq!(metadata.num_row_groups(), 1);
1804
1805 let async_reader = TestReader {
1806 data: data.clone(),
1807 metadata: Default::default(),
1808 requests: Default::default(),
1809 };
1810
1811 let requests = async_reader.requests.clone();
1812 let (_, fields) = parquet_to_arrow_schema_and_fields(
1813 metadata.file_metadata().schema_descr(),
1814 ProjectionMask::all(),
1815 None,
1816 )
1817 .unwrap();
1818
1819 let _schema_desc = metadata.file_metadata().schema_descr();
1820
1821 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1822
1823 let reader_factory = ReaderFactory {
1824 metadata,
1825 fields: fields.map(Arc::new),
1826 input: async_reader,
1827 filter: None,
1828 limit: None,
1829 offset: None,
1830 };
1831
1832 let mut skip = true;
1833 let mut pages = offset_index[0].page_locations.iter().peekable();
1834
1835 let mut selectors = vec![];
1837 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1838 while let Some(page) = pages.next() {
1839 let num_rows = if let Some(next_page) = pages.peek() {
1840 next_page.first_row_index - page.first_row_index
1841 } else {
1842 num_rows - page.first_row_index
1843 };
1844
1845 if skip {
1846 selectors.push(RowSelector::skip(num_rows as usize));
1847 } else {
1848 selectors.push(RowSelector::select(num_rows as usize));
1849 let start = page.offset as usize;
1850 let end = start + page.compressed_page_size as usize;
1851 expected_page_requests.push(start..end);
1852 }
1853 skip = !skip;
1854 }
1855
1856 let selection = RowSelection::from(selectors);
1857
1858 let (_factory, _reader) = reader_factory
1859 .read_row_group(0, Some(selection), projection.clone(), 48)
1860 .await
1861 .expect("reading row group");
1862
1863 let requests = requests.lock().unwrap();
1864
1865 assert_eq!(&requests[..], &expected_page_requests)
1866 }
1867
1868 #[tokio::test]
1869 async fn test_batch_size_overallocate() {
1870 let testdata = arrow::util::test_util::parquet_test_data();
1871 let path = format!("{testdata}/alltypes_plain.parquet");
1873 let data = Bytes::from(std::fs::read(path).unwrap());
1874
1875 let async_reader = TestReader {
1876 data: data.clone(),
1877 metadata: Default::default(),
1878 requests: Default::default(),
1879 };
1880
1881 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1882 .await
1883 .unwrap();
1884
1885 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1886
1887 let stream = builder
1888 .with_projection(ProjectionMask::all())
1889 .with_batch_size(1024)
1890 .build()
1891 .unwrap();
1892 assert_ne!(1024, file_rows);
1893 assert_eq!(stream.batch_size, file_rows);
1894 }
1895
1896 #[tokio::test]
1897 async fn test_get_row_group_column_bloom_filter_without_length() {
1898 let testdata = arrow::util::test_util::parquet_test_data();
1899 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1900 let data = Bytes::from(std::fs::read(path).unwrap());
1901 test_get_row_group_column_bloom_filter(data, false).await;
1902 }
1903
1904 #[tokio::test]
1905 async fn test_parquet_record_batch_stream_schema() {
1906 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1907 schema.flattened_fields().iter().map(|f| f.name()).collect()
1908 }
1909
1910 let mut metadata = HashMap::with_capacity(1);
1919 metadata.insert("key".to_string(), "value".to_string());
1920
1921 let nested_struct_array = StructArray::from(vec![
1922 (
1923 Arc::new(Field::new("d", DataType::Utf8, true)),
1924 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1925 ),
1926 (
1927 Arc::new(Field::new("e", DataType::Utf8, true)),
1928 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1929 ),
1930 ]);
1931 let struct_array = StructArray::from(vec![
1932 (
1933 Arc::new(Field::new("a", DataType::Int32, true)),
1934 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1935 ),
1936 (
1937 Arc::new(Field::new("b", DataType::UInt64, true)),
1938 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1939 ),
1940 (
1941 Arc::new(Field::new(
1942 "c",
1943 nested_struct_array.data_type().clone(),
1944 true,
1945 )),
1946 Arc::new(nested_struct_array) as ArrayRef,
1947 ),
1948 ]);
1949
1950 let schema =
1951 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1952 let record_batch = RecordBatch::from(struct_array)
1953 .with_schema(schema.clone())
1954 .unwrap();
1955
1956 let mut file = tempfile().unwrap();
1958 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1959 writer.write(&record_batch).unwrap();
1960 writer.close().unwrap();
1961
1962 let all_fields = ["a", "b", "c", "d", "e"];
1963 let projections = [
1965 (vec![], vec![]),
1966 (vec![0], vec!["a"]),
1967 (vec![0, 1], vec!["a", "b"]),
1968 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1969 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1970 ];
1971
1972 for (indices, expected_projected_names) in projections {
1974 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1975 assert_eq!(get_all_field_names(&builder), all_fields);
1977 assert_eq!(builder.metadata, metadata);
1978 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1980 assert_eq!(reader.metadata, HashMap::default());
1981 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1982 assert_eq!(batch.metadata, HashMap::default());
1983 };
1984
1985 let builder =
1986 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1987 let sync_builder_schema = builder.schema().clone();
1988 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1989 let mut reader = builder.with_projection(mask).build().unwrap();
1990 let sync_reader_schema = reader.schema();
1991 let batch = reader.next().unwrap().unwrap();
1992 let sync_batch_schema = batch.schema();
1993 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1994
1995 let file = tokio::fs::File::from(file.try_clone().unwrap());
1997 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1998 let async_builder_schema = builder.schema().clone();
1999 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2000 let mut reader = builder.with_projection(mask).build().unwrap();
2001 let async_reader_schema = reader.schema().clone();
2002 let batch = reader.next().await.unwrap().unwrap();
2003 let async_batch_schema = batch.schema();
2004 assert_schemas(
2005 async_builder_schema,
2006 async_reader_schema,
2007 async_batch_schema,
2008 );
2009 }
2010 }
2011
2012 #[tokio::test]
2013 async fn test_get_row_group_column_bloom_filter_with_length() {
2014 let testdata = arrow::util::test_util::parquet_test_data();
2016 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2017 let data = Bytes::from(std::fs::read(path).unwrap());
2018 let async_reader = TestReader {
2019 data: data.clone(),
2020 metadata: Default::default(),
2021 requests: Default::default(),
2022 };
2023 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2024 .await
2025 .unwrap();
2026 let schema = builder.schema().clone();
2027 let stream = builder.build().unwrap();
2028 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2029
2030 let mut parquet_data = Vec::new();
2031 let props = WriterProperties::builder()
2032 .set_bloom_filter_enabled(true)
2033 .build();
2034 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2035 for batch in batches {
2036 writer.write(&batch).unwrap();
2037 }
2038 writer.close().unwrap();
2039
2040 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2042 }
2043
2044 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2045 let async_reader = TestReader {
2046 data: data.clone(),
2047 metadata: Default::default(),
2048 requests: Default::default(),
2049 };
2050
2051 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2052 .await
2053 .unwrap();
2054
2055 let metadata = builder.metadata();
2056 assert_eq!(metadata.num_row_groups(), 1);
2057 let row_group = metadata.row_group(0);
2058 let column = row_group.column(0);
2059 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2060
2061 let sbbf = builder
2062 .get_row_group_column_bloom_filter(0, 0)
2063 .await
2064 .unwrap()
2065 .unwrap();
2066 assert!(sbbf.check(&"Hello"));
2067 assert!(!sbbf.check(&"Hello_Not_Exists"));
2068 }
2069
2070 #[tokio::test]
2071 async fn test_nested_skip() {
2072 let schema = Arc::new(Schema::new(vec![
2073 Field::new("col_1", DataType::UInt64, false),
2074 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2075 ]));
2076
2077 let props = WriterProperties::builder()
2079 .set_data_page_row_count_limit(256)
2080 .set_write_batch_size(256)
2081 .set_max_row_group_size(1024);
2082
2083 let mut file = tempfile().unwrap();
2085 let mut writer =
2086 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2087
2088 let mut builder = ListBuilder::new(StringBuilder::new());
2089 for id in 0..1024 {
2090 match id % 3 {
2091 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2092 1 => builder.append_value([Some(format!("id_{id}"))]),
2093 _ => builder.append_null(),
2094 }
2095 }
2096 let refs = vec![
2097 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2098 Arc::new(builder.finish()) as ArrayRef,
2099 ];
2100
2101 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2102 writer.write(&batch).unwrap();
2103 writer.close().unwrap();
2104
2105 let selections = [
2106 RowSelection::from(vec![
2107 RowSelector::skip(313),
2108 RowSelector::select(1),
2109 RowSelector::skip(709),
2110 RowSelector::select(1),
2111 ]),
2112 RowSelection::from(vec![
2113 RowSelector::skip(255),
2114 RowSelector::select(1),
2115 RowSelector::skip(767),
2116 RowSelector::select(1),
2117 ]),
2118 RowSelection::from(vec![
2119 RowSelector::select(255),
2120 RowSelector::skip(1),
2121 RowSelector::select(767),
2122 RowSelector::skip(1),
2123 ]),
2124 RowSelection::from(vec![
2125 RowSelector::skip(254),
2126 RowSelector::select(1),
2127 RowSelector::select(1),
2128 RowSelector::skip(767),
2129 RowSelector::select(1),
2130 ]),
2131 ];
2132
2133 for selection in selections {
2134 let expected = selection.row_count();
2135 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2137 tokio::fs::File::from_std(file.try_clone().unwrap()),
2138 ArrowReaderOptions::new().with_page_index(true),
2139 )
2140 .await
2141 .unwrap();
2142
2143 reader = reader.with_row_selection(selection);
2144
2145 let mut stream = reader.build().unwrap();
2146
2147 let mut total_rows = 0;
2148 while let Some(rb) = stream.next().await {
2149 let rb = rb.unwrap();
2150 total_rows += rb.num_rows();
2151 }
2152 assert_eq!(total_rows, expected);
2153 }
2154 }
2155
2156 #[tokio::test]
2157 async fn test_row_filter_nested() {
2158 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2159 let b = StructArray::from(vec![
2160 (
2161 Arc::new(Field::new("aa", DataType::Utf8, true)),
2162 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2163 ),
2164 (
2165 Arc::new(Field::new("bb", DataType::Utf8, true)),
2166 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2167 ),
2168 ]);
2169 let c = Int32Array::from_iter(0..6);
2170 let data = RecordBatch::try_from_iter([
2171 ("a", Arc::new(a) as ArrayRef),
2172 ("b", Arc::new(b) as ArrayRef),
2173 ("c", Arc::new(c) as ArrayRef),
2174 ])
2175 .unwrap();
2176
2177 let mut buf = Vec::with_capacity(1024);
2178 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2179 writer.write(&data).unwrap();
2180 writer.close().unwrap();
2181
2182 let data: Bytes = buf.into();
2183 let metadata = ParquetMetaDataReader::new()
2184 .parse_and_finish(&data)
2185 .unwrap();
2186 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2187
2188 let test = TestReader {
2189 data,
2190 metadata: Default::default(),
2191 requests: Default::default(),
2192 };
2193 let requests = test.requests.clone();
2194
2195 let a_scalar = StringArray::from_iter_values(["b"]);
2196 let a_filter = ArrowPredicateFn::new(
2197 ProjectionMask::leaves(&parquet_schema, vec![0]),
2198 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2199 );
2200
2201 let b_scalar = StringArray::from_iter_values(["4"]);
2202 let b_filter = ArrowPredicateFn::new(
2203 ProjectionMask::leaves(&parquet_schema, vec![2]),
2204 move |batch| {
2205 let struct_array = batch
2207 .column(0)
2208 .as_any()
2209 .downcast_ref::<StructArray>()
2210 .unwrap();
2211 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2212 },
2213 );
2214
2215 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2216
2217 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2218 let stream = ParquetRecordBatchStreamBuilder::new(test)
2219 .await
2220 .unwrap()
2221 .with_projection(mask.clone())
2222 .with_batch_size(1024)
2223 .with_row_filter(filter)
2224 .build()
2225 .unwrap();
2226
2227 let batches: Vec<_> = stream.try_collect().await.unwrap();
2228 assert_eq!(batches.len(), 1);
2229
2230 let batch = &batches[0];
2231 assert_eq!(batch.num_rows(), 1);
2232 assert_eq!(batch.num_columns(), 2);
2233
2234 let col = batch.column(0);
2235 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2236 assert_eq!(val, "b");
2237
2238 let col = batch.column(1);
2239 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2240 assert_eq!(val, 3);
2241
2242 assert_eq!(requests.lock().unwrap().len(), 3);
2244 }
2245
2246 #[tokio::test]
2247 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2248 use tokio::fs::File;
2249 let testdata = arrow::util::test_util::parquet_test_data();
2250 let path = format!("{testdata}/alltypes_plain.parquet");
2251 let mut file = File::open(&path).await.unwrap();
2252 let file_size = file.metadata().await.unwrap().len();
2253 let mut metadata = ParquetMetaDataReader::new()
2254 .with_page_indexes(true)
2255 .load_and_finish(&mut file, file_size)
2256 .await
2257 .unwrap();
2258
2259 metadata.set_offset_index(Some(vec![]));
2260 let options = ArrowReaderOptions::new().with_page_index(true);
2261 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2262 let reader =
2263 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2264 .build()
2265 .unwrap();
2266
2267 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2268 assert_eq!(result.len(), 1);
2269 }
2270
2271 #[tokio::test]
2272 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2273 use tokio::fs::File;
2274 let testdata = arrow::util::test_util::parquet_test_data();
2275 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2276 let mut file = File::open(&path).await.unwrap();
2277 let file_size = file.metadata().await.unwrap().len();
2278 let metadata = ParquetMetaDataReader::new()
2279 .with_page_indexes(true)
2280 .load_and_finish(&mut file, file_size)
2281 .await
2282 .unwrap();
2283
2284 let options = ArrowReaderOptions::new().with_page_index(true);
2285 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2286 let reader =
2287 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2288 .build()
2289 .unwrap();
2290
2291 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2292 assert_eq!(result.len(), 8);
2293 }
2294
2295 #[tokio::test]
2296 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2297 use tempfile::TempDir;
2298 use tokio::fs::File;
2299 fn write_metadata_to_local_file(
2300 metadata: ParquetMetaData,
2301 file: impl AsRef<std::path::Path>,
2302 ) {
2303 use crate::file::metadata::ParquetMetaDataWriter;
2304 use std::fs::File;
2305 let file = File::create(file).unwrap();
2306 ParquetMetaDataWriter::new(file, &metadata)
2307 .finish()
2308 .unwrap()
2309 }
2310
2311 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2312 use std::fs::File;
2313 let file = File::open(file).unwrap();
2314 ParquetMetaDataReader::new()
2315 .with_page_indexes(true)
2316 .parse_and_finish(&file)
2317 .unwrap()
2318 }
2319
2320 let testdata = arrow::util::test_util::parquet_test_data();
2321 let path = format!("{testdata}/alltypes_plain.parquet");
2322 let mut file = File::open(&path).await.unwrap();
2323 let file_size = file.metadata().await.unwrap().len();
2324 let metadata = ParquetMetaDataReader::new()
2325 .with_page_indexes(true)
2326 .load_and_finish(&mut file, file_size)
2327 .await
2328 .unwrap();
2329
2330 let tempdir = TempDir::new().unwrap();
2331 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2332 write_metadata_to_local_file(metadata, &metadata_path);
2333 let metadata = read_metadata_from_local_file(&metadata_path);
2334
2335 let options = ArrowReaderOptions::new().with_page_index(true);
2336 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2337 let reader =
2338 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2339 .build()
2340 .unwrap();
2341
2342 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2344 assert_eq!(result.len(), 1);
2345 }
2346}