1use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder, PushDecoderInput};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61pub trait AsyncFileReader: Send {
75 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80 async move {
81 let mut result = Vec::with_capacity(ranges.len());
82
83 for range in ranges.into_iter() {
84 let data = self.get_bytes(range).await?;
85 result.push(data);
86 }
87
88 Ok(result)
89 }
90 .boxed()
91 }
92
93 fn get_metadata<'a>(
110 &'a mut self,
111 options: Option<&'a ArrowReaderOptions>,
112 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118 self.as_mut().get_bytes(range)
119 }
120
121 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122 self.as_mut().get_byte_ranges(ranges)
123 }
124
125 fn get_metadata<'a>(
126 &'a mut self,
127 options: Option<&'a ArrowReaderOptions>,
128 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129 self.as_mut().get_metadata(options)
130 }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135 async move {
136 self.seek(SeekFrom::End(-(suffix as i64))).await?;
137 let mut buf = Vec::with_capacity(suffix);
138 self.take(suffix as _).read_to_end(&mut buf).await?;
139 Ok(buf.into())
140 }
141 .boxed()
142 }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147 async move {
148 self.seek(SeekFrom::Start(range.start)).await?;
149
150 let to_read = range.end - range.start;
151 let mut buffer = Vec::with_capacity(to_read.try_into()?);
152 let read = self.take(to_read).read_to_end(&mut buffer).await?;
153 if read as u64 != to_read {
154 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155 }
156
157 Ok(buffer.into())
158 }
159 .boxed()
160 }
161
162 fn get_metadata<'a>(
163 &'a mut self,
164 options: Option<&'a ArrowReaderOptions>,
165 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166 async move {
167 let metadata_opts = options.map(|o| o.metadata_options().clone());
168 let mut metadata_reader =
169 ParquetMetaDataReader::new().with_metadata_options(metadata_opts);
170
171 if let Some(opts) = options {
172 metadata_reader = metadata_reader
173 .with_column_index_policy(opts.column_index_policy())
174 .with_offset_index_policy(opts.offset_index_policy());
175 }
176
177 #[cfg(feature = "encryption")]
178 let metadata_reader = metadata_reader.with_decryption_properties(
179 options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
180 );
181
182 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183 Ok(Arc::new(parquet_metadata))
184 }
185 .boxed()
186 }
187}
188
189impl ArrowReaderMetadata {
190 pub async fn load_async<T: AsyncFileReader>(
194 input: &mut T,
195 options: ArrowReaderOptions,
196 ) -> Result<Self> {
197 let metadata = input.get_metadata(Some(&options)).await?;
198 Self::try_new(metadata, options)
199 }
200}
201
202#[doc(hidden)]
203pub struct AsyncReader<T>(T);
208
209pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
229
230impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
231 pub async fn new(input: T) -> Result<Self> {
447 Self::new_with_options(input, Default::default()).await
448 }
449
450 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
453 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
454 Ok(Self::new_with_metadata(input, metadata))
455 }
456
457 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
503 Self::new_builder(AsyncReader(input), metadata)
504 }
505
506 pub async fn get_row_group_column_bloom_filter(
512 &mut self,
513 row_group_idx: usize,
514 column_idx: usize,
515 ) -> Result<Option<Sbbf>> {
516 let metadata = self.metadata.row_group(row_group_idx);
517 let column_metadata = metadata.column(column_idx);
518
519 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
520 offset
521 .try_into()
522 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
523 } else {
524 return Ok(None);
525 };
526
527 let buffer = match column_metadata.bloom_filter_length() {
528 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
529 None => self
530 .input
531 .0
532 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
533 }
534 .await?;
535
536 let (header, bitset_offset) =
537 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
538
539 match header.algorithm {
540 BloomFilterAlgorithm::BLOCK => {
541 }
543 }
544 match header.compression {
545 BloomFilterCompression::UNCOMPRESSED => {
546 }
548 }
549 match header.hash {
550 BloomFilterHash::XXHASH => {
551 }
553 }
554
555 let bitset = match column_metadata.bloom_filter_length() {
556 Some(_) => buffer.slice(
557 (TryInto::<usize>::try_into(bitset_offset).unwrap()
558 - TryInto::<usize>::try_into(offset).unwrap())..,
559 ),
560 None => {
561 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
562 ParquetError::General("Bloom filter length is invalid".to_string())
563 })?;
564 self.input
565 .0
566 .get_bytes(bitset_offset..bitset_offset + bitset_length)
567 .await?
568 }
569 };
570 Ok(Some(Sbbf::new(&bitset)))
571 }
572
573 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
577 let Self {
578 input,
579 metadata,
580 schema,
581 fields,
582 batch_size,
583 row_groups,
584 projection,
585 filter,
586 selection,
587 row_selection_policy: selection_strategy,
588 limit,
589 offset,
590 metrics,
591 max_predicate_cache_size,
592 } = self;
593
594 let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
597 let projected_fields = schema
598 .fields
599 .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
600 let projected_schema = Arc::new(Schema::new(projected_fields));
601
602 let decoder = ParquetPushDecoderBuilder {
603 input: PushDecoderInput::default(),
604 metadata,
605 schema,
606 fields,
607 projection,
608 filter,
609 selection,
610 row_selection_policy: selection_strategy,
611 batch_size,
612 row_groups,
613 limit,
614 offset,
615 metrics,
616 max_predicate_cache_size,
617 }
618 .build()?;
619
620 let request_state = RequestState::None { input: input.0 };
621
622 Ok(ParquetRecordBatchStream {
623 schema: projected_schema,
624 decoder,
625 request_state,
626 })
627 }
628}
629
630enum RequestState<T> {
634 None {
636 input: T,
637 },
638 Outstanding {
640 ranges: Vec<Range<u64>>,
642 future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
647 },
648 Done,
649}
650
651impl<T> RequestState<T>
652where
653 T: AsyncFileReader + Unpin + Send + 'static,
654{
655 fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
657 let ranges_captured = ranges.clone();
658
659 let future = async move {
664 let data = input.get_byte_ranges(ranges_captured).await?;
665 Ok((input, data))
666 }
667 .boxed();
668 RequestState::Outstanding { ranges, future }
669 }
670}
671
672impl<T> std::fmt::Debug for RequestState<T> {
673 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
674 match self {
675 RequestState::None { input: _ } => f
676 .debug_struct("RequestState::None")
677 .field("input", &"...")
678 .finish(),
679 RequestState::Outstanding { ranges, .. } => f
680 .debug_struct("RequestState::Outstanding")
681 .field("ranges", &ranges)
682 .finish(),
683 RequestState::Done => {
684 write!(f, "RequestState::Done")
685 }
686 }
687 }
688}
689
690pub struct ParquetRecordBatchStream<T> {
710 schema: SchemaRef,
712 request_state: RequestState<T>,
714 decoder: ParquetPushDecoder,
716}
717
718impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
719 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
720 f.debug_struct("ParquetRecordBatchStream")
721 .field("request_state", &self.request_state)
722 .finish()
723 }
724}
725
726impl<T> ParquetRecordBatchStream<T> {
727 pub fn schema(&self) -> &SchemaRef {
732 &self.schema
733 }
734}
735
736impl<T> ParquetRecordBatchStream<T>
737where
738 T: AsyncFileReader + Unpin + Send + 'static,
739{
740 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
754 loop {
755 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
758 match request_state {
759 RequestState::None { input } => {
761 match self.decoder.try_next_reader()? {
762 DecodeResult::NeedsData(ranges) => {
763 self.request_state = RequestState::begin_request(input, ranges);
764 continue; }
766 DecodeResult::Data(reader) => {
767 self.request_state = RequestState::None { input };
768 return Ok(Some(reader));
769 }
770 DecodeResult::Finished => return Ok(None),
771 }
772 }
773 RequestState::Outstanding { ranges, future } => {
774 let (input, data) = future.await?;
775 self.decoder.push_ranges(ranges, data)?;
777 self.request_state = RequestState::None { input };
778 continue; }
780 RequestState::Done => {
781 self.request_state = RequestState::Done;
782 return Ok(None);
783 }
784 }
785 }
786 }
787}
788
789impl<T> Stream for ParquetRecordBatchStream<T>
790where
791 T: AsyncFileReader + Unpin + Send + 'static,
792{
793 type Item = Result<RecordBatch>;
794 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
795 match self.poll_next_inner(cx) {
796 Ok(res) => {
797 res.map(|res| Ok(res).transpose())
800 }
801 Err(e) => {
802 self.request_state = RequestState::Done;
803 Poll::Ready(Some(Err(e)))
804 }
805 }
806 }
807}
808
809impl<T> ParquetRecordBatchStream<T>
810where
811 T: AsyncFileReader + Unpin + Send + 'static,
812{
813 fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
818 loop {
819 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
820 match request_state {
821 RequestState::None { input } => {
822 match self.decoder.try_decode()? {
824 DecodeResult::NeedsData(ranges) => {
825 self.request_state = RequestState::begin_request(input, ranges);
826 continue; }
828 DecodeResult::Data(batch) => {
829 self.request_state = RequestState::None { input };
830 return Ok(Poll::Ready(Some(batch)));
831 }
832 DecodeResult::Finished => {
833 self.request_state = RequestState::Done;
834 return Ok(Poll::Ready(None));
835 }
836 }
837 }
838 RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
839 Poll::Ready(result) => {
841 let (input, data) = result?;
842 self.decoder.push_ranges(ranges, data)?;
844 self.request_state = RequestState::None { input };
845 continue; }
847 Poll::Pending => {
848 self.request_state = RequestState::Outstanding { ranges, future };
849 return Ok(Poll::Pending);
850 }
851 },
852 RequestState::Done => {
853 self.request_state = RequestState::Done;
855 return Ok(Poll::Ready(None));
856 }
857 }
858 }
859 }
860}
861
862#[cfg(test)]
863mod tests {
864 use super::*;
865 use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
866 use crate::arrow::arrow_reader::{
867 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
868 };
869 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
870 use crate::arrow::schema::virtual_type::RowNumber;
871 use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
872 use crate::file::metadata::PageIndexPolicy;
873 use crate::file::metadata::ParquetMetaDataReader;
874 use crate::file::properties::WriterProperties;
875 use arrow::compute::kernels::cmp::eq;
876 use arrow::error::Result as ArrowResult;
877 use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
878 use arrow_array::cast::AsArray;
879 use arrow_array::types::Int32Type;
880 use arrow_array::{
881 Array, ArrayRef, BooleanArray, Int32Array, RecordBatchReader, Scalar, StringArray,
882 StructArray, UInt64Array,
883 };
884 use arrow_schema::{DataType, Field, Schema};
885 use futures::{StreamExt, TryStreamExt};
886 use rand::{Rng, rng};
887 use std::collections::HashMap;
888 use std::sync::{Arc, Mutex};
889 use tempfile::tempfile;
890
891 #[derive(Clone)]
892 struct TestReader {
893 data: Bytes,
894 metadata: Option<Arc<ParquetMetaData>>,
895 requests: Arc<Mutex<Vec<Range<usize>>>>,
896 }
897
898 impl TestReader {
899 fn new(data: Bytes) -> Self {
900 Self {
901 data,
902 metadata: Default::default(),
903 requests: Default::default(),
904 }
905 }
906 }
907
908 impl AsyncFileReader for TestReader {
909 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
910 let range = range.clone();
911 self.requests
912 .lock()
913 .unwrap()
914 .push(range.start as usize..range.end as usize);
915 futures::future::ready(Ok(self
916 .data
917 .slice(range.start as usize..range.end as usize)))
918 .boxed()
919 }
920
921 fn get_metadata<'a>(
922 &'a mut self,
923 options: Option<&'a ArrowReaderOptions>,
924 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
925 let mut metadata_reader = ParquetMetaDataReader::new();
926 if let Some(opts) = options {
927 metadata_reader = metadata_reader
928 .with_column_index_policy(opts.column_index_policy())
929 .with_offset_index_policy(opts.offset_index_policy());
930 }
931 self.metadata = Some(Arc::new(
932 metadata_reader.parse_and_finish(&self.data).unwrap(),
933 ));
934 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
935 }
936 }
937
938 #[tokio::test]
939 async fn test_async_reader() {
940 let testdata = arrow::util::test_util::parquet_test_data();
941 let path = format!("{testdata}/alltypes_plain.parquet");
942 let data = Bytes::from(std::fs::read(path).unwrap());
943
944 let async_reader = TestReader::new(data.clone());
945
946 let requests = async_reader.requests.clone();
947 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
948 .await
949 .unwrap();
950
951 let metadata = builder.metadata().clone();
952 assert_eq!(metadata.num_row_groups(), 1);
953
954 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
955 let stream = builder
956 .with_projection(mask.clone())
957 .with_batch_size(1024)
958 .build()
959 .unwrap();
960
961 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
962
963 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
964 .unwrap()
965 .with_projection(mask)
966 .with_batch_size(104)
967 .build()
968 .unwrap()
969 .collect::<ArrowResult<Vec<_>>>()
970 .unwrap();
971
972 assert_eq!(async_batches, sync_batches);
973
974 let requests = requests.lock().unwrap();
975 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
976 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
977
978 assert_eq!(
979 &requests[..],
980 &[
981 offset_1 as usize..(offset_1 + length_1) as usize,
982 offset_2 as usize..(offset_2 + length_2) as usize
983 ]
984 );
985 }
986
987 #[tokio::test]
988 async fn test_async_reader_with_next_row_group() {
989 let testdata = arrow::util::test_util::parquet_test_data();
990 let path = format!("{testdata}/alltypes_plain.parquet");
991 let data = Bytes::from(std::fs::read(path).unwrap());
992
993 let async_reader = TestReader::new(data.clone());
994
995 let requests = async_reader.requests.clone();
996 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
997 .await
998 .unwrap();
999
1000 let metadata = builder.metadata().clone();
1001 assert_eq!(metadata.num_row_groups(), 1);
1002
1003 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1004 let mut stream = builder
1005 .with_projection(mask.clone())
1006 .with_batch_size(1024)
1007 .build()
1008 .unwrap();
1009
1010 let mut readers = vec![];
1011 while let Some(reader) = stream.next_row_group().await.unwrap() {
1012 readers.push(reader);
1013 }
1014
1015 let async_batches: Vec<_> = readers
1016 .into_iter()
1017 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1018 .collect();
1019
1020 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1021 .unwrap()
1022 .with_projection(mask)
1023 .with_batch_size(104)
1024 .build()
1025 .unwrap()
1026 .collect::<ArrowResult<Vec<_>>>()
1027 .unwrap();
1028
1029 assert_eq!(async_batches, sync_batches);
1030
1031 let requests = requests.lock().unwrap();
1032 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1033 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1034
1035 assert_eq!(
1036 &requests[..],
1037 &[
1038 offset_1 as usize..(offset_1 + length_1) as usize,
1039 offset_2 as usize..(offset_2 + length_2) as usize
1040 ]
1041 );
1042 }
1043
1044 #[tokio::test]
1045 async fn test_async_reader_with_index() {
1046 let testdata = arrow::util::test_util::parquet_test_data();
1047 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1048 let data = Bytes::from(std::fs::read(path).unwrap());
1049
1050 let async_reader = TestReader::new(data.clone());
1051
1052 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1053 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1054 .await
1055 .unwrap();
1056
1057 let metadata_with_index = builder.metadata();
1059 assert_eq!(metadata_with_index.num_row_groups(), 1);
1060
1061 let offset_index = metadata_with_index.offset_index().unwrap();
1063 let column_index = metadata_with_index.column_index().unwrap();
1064
1065 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1066 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1067
1068 let num_columns = metadata_with_index
1069 .file_metadata()
1070 .schema_descr()
1071 .num_columns();
1072
1073 offset_index
1075 .iter()
1076 .for_each(|x| assert_eq!(x.len(), num_columns));
1077 column_index
1078 .iter()
1079 .for_each(|x| assert_eq!(x.len(), num_columns));
1080
1081 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1082 let stream = builder
1083 .with_projection(mask.clone())
1084 .with_batch_size(1024)
1085 .build()
1086 .unwrap();
1087
1088 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1089
1090 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1091 .unwrap()
1092 .with_projection(mask)
1093 .with_batch_size(1024)
1094 .build()
1095 .unwrap()
1096 .collect::<ArrowResult<Vec<_>>>()
1097 .unwrap();
1098
1099 assert_eq!(async_batches, sync_batches);
1100 }
1101
1102 #[tokio::test]
1103 async fn test_async_reader_with_limit() {
1104 let testdata = arrow::util::test_util::parquet_test_data();
1105 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1106 let data = Bytes::from(std::fs::read(path).unwrap());
1107
1108 let metadata = ParquetMetaDataReader::new()
1109 .parse_and_finish(&data)
1110 .unwrap();
1111 let metadata = Arc::new(metadata);
1112
1113 assert_eq!(metadata.num_row_groups(), 1);
1114
1115 let async_reader = TestReader::new(data.clone());
1116
1117 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1118 .await
1119 .unwrap();
1120
1121 assert_eq!(builder.metadata().num_row_groups(), 1);
1122
1123 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1124 let stream = builder
1125 .with_projection(mask.clone())
1126 .with_batch_size(1024)
1127 .with_limit(1)
1128 .build()
1129 .unwrap();
1130
1131 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1132
1133 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1134 .unwrap()
1135 .with_projection(mask)
1136 .with_batch_size(1024)
1137 .with_limit(1)
1138 .build()
1139 .unwrap()
1140 .collect::<ArrowResult<Vec<_>>>()
1141 .unwrap();
1142
1143 assert_eq!(async_batches, sync_batches);
1144 }
1145
1146 #[tokio::test]
1147 async fn test_async_reader_skip_pages() {
1148 let testdata = arrow::util::test_util::parquet_test_data();
1149 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1150 let data = Bytes::from(std::fs::read(path).unwrap());
1151
1152 let async_reader = TestReader::new(data.clone());
1153
1154 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1155 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1156 .await
1157 .unwrap();
1158
1159 assert_eq!(builder.metadata().num_row_groups(), 1);
1160
1161 let selection = RowSelection::from(vec![
1162 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1171
1172 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1173
1174 let stream = builder
1175 .with_projection(mask.clone())
1176 .with_row_selection(selection.clone())
1177 .build()
1178 .expect("building stream");
1179
1180 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1181
1182 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1183 .unwrap()
1184 .with_projection(mask)
1185 .with_batch_size(1024)
1186 .with_row_selection(selection)
1187 .build()
1188 .unwrap()
1189 .collect::<ArrowResult<Vec<_>>>()
1190 .unwrap();
1191
1192 assert_eq!(async_batches, sync_batches);
1193 }
1194
1195 #[tokio::test]
1196 async fn test_fuzz_async_reader_selection() {
1197 let testdata = arrow::util::test_util::parquet_test_data();
1198 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1199 let data = Bytes::from(std::fs::read(path).unwrap());
1200
1201 let mut rand = rng();
1202
1203 for _ in 0..100 {
1204 let mut expected_rows = 0;
1205 let mut total_rows = 0;
1206 let mut skip = false;
1207 let mut selectors = vec![];
1208
1209 while total_rows < 7300 {
1210 let row_count: usize = rand.random_range(1..100);
1211
1212 let row_count = row_count.min(7300 - total_rows);
1213
1214 selectors.push(RowSelector { row_count, skip });
1215
1216 total_rows += row_count;
1217 if !skip {
1218 expected_rows += row_count;
1219 }
1220
1221 skip = !skip;
1222 }
1223
1224 let selection = RowSelection::from(selectors);
1225
1226 let async_reader = TestReader::new(data.clone());
1227
1228 let options =
1229 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1230 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1231 .await
1232 .unwrap();
1233
1234 assert_eq!(builder.metadata().num_row_groups(), 1);
1235
1236 let col_idx: usize = rand.random_range(0..13);
1237 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1238
1239 let stream = builder
1240 .with_projection(mask.clone())
1241 .with_row_selection(selection.clone())
1242 .build()
1243 .expect("building stream");
1244
1245 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1246
1247 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1248
1249 assert_eq!(actual_rows, expected_rows);
1250 }
1251 }
1252
1253 #[tokio::test]
1254 async fn test_async_reader_zero_row_selector() {
1255 let testdata = arrow::util::test_util::parquet_test_data();
1257 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1258 let data = Bytes::from(std::fs::read(path).unwrap());
1259
1260 let mut rand = rng();
1261
1262 let mut expected_rows = 0;
1263 let mut total_rows = 0;
1264 let mut skip = false;
1265 let mut selectors = vec![];
1266
1267 selectors.push(RowSelector {
1268 row_count: 0,
1269 skip: false,
1270 });
1271
1272 while total_rows < 7300 {
1273 let row_count: usize = rand.random_range(1..100);
1274
1275 let row_count = row_count.min(7300 - total_rows);
1276
1277 selectors.push(RowSelector { row_count, skip });
1278
1279 total_rows += row_count;
1280 if !skip {
1281 expected_rows += row_count;
1282 }
1283
1284 skip = !skip;
1285 }
1286
1287 let selection = RowSelection::from(selectors);
1288
1289 let async_reader = TestReader::new(data.clone());
1290
1291 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1292 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1293 .await
1294 .unwrap();
1295
1296 assert_eq!(builder.metadata().num_row_groups(), 1);
1297
1298 let col_idx: usize = rand.random_range(0..13);
1299 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1300
1301 let stream = builder
1302 .with_projection(mask.clone())
1303 .with_row_selection(selection.clone())
1304 .build()
1305 .expect("building stream");
1306
1307 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1308
1309 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1310
1311 assert_eq!(actual_rows, expected_rows);
1312 }
1313
1314 #[tokio::test]
1315 async fn test_limit_multiple_row_groups() {
1316 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1317 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1318 let c = Int32Array::from_iter(0..6);
1319 let data = RecordBatch::try_from_iter([
1320 ("a", Arc::new(a) as ArrayRef),
1321 ("b", Arc::new(b) as ArrayRef),
1322 ("c", Arc::new(c) as ArrayRef),
1323 ])
1324 .unwrap();
1325
1326 let mut buf = Vec::with_capacity(1024);
1327 let props = WriterProperties::builder()
1328 .set_max_row_group_row_count(Some(3))
1329 .build();
1330 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1331 writer.write(&data).unwrap();
1332 writer.close().unwrap();
1333
1334 let data: Bytes = buf.into();
1335 let metadata = ParquetMetaDataReader::new()
1336 .parse_and_finish(&data)
1337 .unwrap();
1338
1339 assert_eq!(metadata.num_row_groups(), 2);
1340
1341 let test = TestReader::new(data);
1342
1343 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1344 .await
1345 .unwrap()
1346 .with_batch_size(1024)
1347 .with_limit(4)
1348 .build()
1349 .unwrap();
1350
1351 let batches: Vec<_> = stream.try_collect().await.unwrap();
1352 assert_eq!(batches.len(), 2);
1354
1355 let batch = &batches[0];
1356 assert_eq!(batch.num_rows(), 3);
1358 assert_eq!(batch.num_columns(), 3);
1359 let col2 = batch.column(2).as_primitive::<Int32Type>();
1360 assert_eq!(col2.values(), &[0, 1, 2]);
1361
1362 let batch = &batches[1];
1363 assert_eq!(batch.num_rows(), 1);
1365 assert_eq!(batch.num_columns(), 3);
1366 let col2 = batch.column(2).as_primitive::<Int32Type>();
1367 assert_eq!(col2.values(), &[3]);
1368
1369 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1370 .await
1371 .unwrap()
1372 .with_offset(2)
1373 .with_limit(3)
1374 .build()
1375 .unwrap();
1376
1377 let batches: Vec<_> = stream.try_collect().await.unwrap();
1378 assert_eq!(batches.len(), 2);
1380
1381 let batch = &batches[0];
1382 assert_eq!(batch.num_rows(), 1);
1384 assert_eq!(batch.num_columns(), 3);
1385 let col2 = batch.column(2).as_primitive::<Int32Type>();
1386 assert_eq!(col2.values(), &[2]);
1387
1388 let batch = &batches[1];
1389 assert_eq!(batch.num_rows(), 2);
1391 assert_eq!(batch.num_columns(), 3);
1392 let col2 = batch.column(2).as_primitive::<Int32Type>();
1393 assert_eq!(col2.values(), &[3, 4]);
1394
1395 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1396 .await
1397 .unwrap()
1398 .with_offset(4)
1399 .with_limit(20)
1400 .build()
1401 .unwrap();
1402
1403 let batches: Vec<_> = stream.try_collect().await.unwrap();
1404 assert_eq!(batches.len(), 1);
1406
1407 let batch = &batches[0];
1408 assert_eq!(batch.num_rows(), 2);
1410 assert_eq!(batch.num_columns(), 3);
1411 let col2 = batch.column(2).as_primitive::<Int32Type>();
1412 assert_eq!(col2.values(), &[4, 5]);
1413 }
1414
1415 #[tokio::test]
1416 async fn test_batch_size_overallocate() {
1417 let testdata = arrow::util::test_util::parquet_test_data();
1418 let path = format!("{testdata}/alltypes_plain.parquet");
1420 let data = Bytes::from(std::fs::read(path).unwrap());
1421
1422 let async_reader = TestReader::new(data.clone());
1423
1424 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1425 .await
1426 .unwrap();
1427
1428 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1429
1430 let builder = builder
1431 .with_projection(ProjectionMask::all())
1432 .with_batch_size(1024);
1433
1434 assert_ne!(1024, file_rows);
1437 assert_eq!(builder.batch_size, file_rows);
1438
1439 let _stream = builder.build().unwrap();
1440 }
1441
1442 #[tokio::test]
1443 async fn test_parquet_record_batch_stream_schema() {
1444 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1445 schema.flattened_fields().iter().map(|f| f.name()).collect()
1446 }
1447
1448 let mut metadata = HashMap::with_capacity(1);
1457 metadata.insert("key".to_string(), "value".to_string());
1458
1459 let nested_struct_array = StructArray::from(vec![
1460 (
1461 Arc::new(Field::new("d", DataType::Utf8, true)),
1462 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1463 ),
1464 (
1465 Arc::new(Field::new("e", DataType::Utf8, true)),
1466 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1467 ),
1468 ]);
1469 let struct_array = StructArray::from(vec![
1470 (
1471 Arc::new(Field::new("a", DataType::Int32, true)),
1472 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1473 ),
1474 (
1475 Arc::new(Field::new("b", DataType::UInt64, true)),
1476 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1477 ),
1478 (
1479 Arc::new(Field::new(
1480 "c",
1481 nested_struct_array.data_type().clone(),
1482 true,
1483 )),
1484 Arc::new(nested_struct_array) as ArrayRef,
1485 ),
1486 ]);
1487
1488 let schema =
1489 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1490 let record_batch = RecordBatch::from(struct_array)
1491 .with_schema(schema.clone())
1492 .unwrap();
1493
1494 let mut file = tempfile().unwrap();
1496 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1497 writer.write(&record_batch).unwrap();
1498 writer.close().unwrap();
1499
1500 let all_fields = ["a", "b", "c", "d", "e"];
1501 let projections = [
1503 (vec![], vec![]),
1504 (vec![0], vec!["a"]),
1505 (vec![0, 1], vec!["a", "b"]),
1506 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1507 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1508 ];
1509
1510 for (indices, expected_projected_names) in projections {
1512 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1513 assert_eq!(get_all_field_names(&builder), all_fields);
1515 assert_eq!(builder.metadata, metadata);
1516 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1518 assert_eq!(reader.metadata, HashMap::default());
1519 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1520 assert_eq!(batch.metadata, HashMap::default());
1521 };
1522
1523 let builder =
1524 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1525 let sync_builder_schema = builder.schema().clone();
1526 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1527 let mut reader = builder.with_projection(mask).build().unwrap();
1528 let sync_reader_schema = reader.schema();
1529 let batch = reader.next().unwrap().unwrap();
1530 let sync_batch_schema = batch.schema();
1531 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1532
1533 let file = tokio::fs::File::from(file.try_clone().unwrap());
1535 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1536 let async_builder_schema = builder.schema().clone();
1537 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1538 let mut reader = builder.with_projection(mask).build().unwrap();
1539 let async_reader_schema = reader.schema().clone();
1540 let batch = reader.next().await.unwrap().unwrap();
1541 let async_batch_schema = batch.schema();
1542 assert_schemas(
1543 async_builder_schema,
1544 async_reader_schema,
1545 async_batch_schema,
1546 );
1547 }
1548 }
1549
1550 #[tokio::test]
1551 async fn test_nested_skip() {
1552 let schema = Arc::new(Schema::new(vec![
1553 Field::new("col_1", DataType::UInt64, false),
1554 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1555 ]));
1556
1557 let props = WriterProperties::builder()
1559 .set_data_page_row_count_limit(256)
1560 .set_write_batch_size(256)
1561 .set_max_row_group_row_count(Some(1024));
1562
1563 let mut file = tempfile().unwrap();
1565 let mut writer =
1566 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1567
1568 let mut builder = ListBuilder::new(StringBuilder::new());
1569 for id in 0..1024 {
1570 match id % 3 {
1571 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1572 1 => builder.append_value([Some(format!("id_{id}"))]),
1573 _ => builder.append_null(),
1574 }
1575 }
1576 let refs = vec![
1577 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1578 Arc::new(builder.finish()) as ArrayRef,
1579 ];
1580
1581 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1582 writer.write(&batch).unwrap();
1583 writer.close().unwrap();
1584
1585 let selections = [
1586 RowSelection::from(vec![
1587 RowSelector::skip(313),
1588 RowSelector::select(1),
1589 RowSelector::skip(709),
1590 RowSelector::select(1),
1591 ]),
1592 RowSelection::from(vec![
1593 RowSelector::skip(255),
1594 RowSelector::select(1),
1595 RowSelector::skip(767),
1596 RowSelector::select(1),
1597 ]),
1598 RowSelection::from(vec![
1599 RowSelector::select(255),
1600 RowSelector::skip(1),
1601 RowSelector::select(767),
1602 RowSelector::skip(1),
1603 ]),
1604 RowSelection::from(vec![
1605 RowSelector::skip(254),
1606 RowSelector::select(1),
1607 RowSelector::select(1),
1608 RowSelector::skip(767),
1609 RowSelector::select(1),
1610 ]),
1611 ];
1612
1613 for selection in selections {
1614 let expected = selection.row_count();
1615 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1617 tokio::fs::File::from_std(file.try_clone().unwrap()),
1618 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1619 )
1620 .await
1621 .unwrap();
1622
1623 reader = reader.with_row_selection(selection);
1624
1625 let mut stream = reader.build().unwrap();
1626
1627 let mut total_rows = 0;
1628 while let Some(rb) = stream.next().await {
1629 let rb = rb.unwrap();
1630 total_rows += rb.num_rows();
1631 }
1632 assert_eq!(total_rows, expected);
1633 }
1634 }
1635
1636 #[tokio::test]
1637 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1638 use tokio::fs::File;
1639 let testdata = arrow::util::test_util::parquet_test_data();
1640 let path = format!("{testdata}/alltypes_plain.parquet");
1641 let mut file = File::open(&path).await.unwrap();
1642 let file_size = file.metadata().await.unwrap().len();
1643 let mut metadata = ParquetMetaDataReader::new()
1644 .with_page_index_policy(PageIndexPolicy::Required)
1645 .load_and_finish(&mut file, file_size)
1646 .await
1647 .unwrap();
1648
1649 metadata.set_offset_index(Some(vec![]));
1650 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1651 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1652 let reader =
1653 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1654 .build()
1655 .unwrap();
1656
1657 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1658 assert_eq!(result.len(), 1);
1659 }
1660
1661 #[tokio::test]
1662 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1663 use tokio::fs::File;
1664 let testdata = arrow::util::test_util::parquet_test_data();
1665 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1666 let mut file = File::open(&path).await.unwrap();
1667 let file_size = file.metadata().await.unwrap().len();
1668 let metadata = ParquetMetaDataReader::new()
1669 .with_page_index_policy(PageIndexPolicy::Required)
1670 .load_and_finish(&mut file, file_size)
1671 .await
1672 .unwrap();
1673
1674 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1675 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1676 let reader =
1677 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1678 .build()
1679 .unwrap();
1680
1681 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1682 assert_eq!(result.len(), 8);
1683 }
1684
1685 #[tokio::test]
1686 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
1687 use tempfile::TempDir;
1688 use tokio::fs::File;
1689 fn write_metadata_to_local_file(
1690 metadata: ParquetMetaData,
1691 file: impl AsRef<std::path::Path>,
1692 ) {
1693 use crate::file::metadata::ParquetMetaDataWriter;
1694 use std::fs::File;
1695 let file = File::create(file).unwrap();
1696 ParquetMetaDataWriter::new(file, &metadata)
1697 .finish()
1698 .unwrap()
1699 }
1700
1701 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
1702 use std::fs::File;
1703 let file = File::open(file).unwrap();
1704 ParquetMetaDataReader::new()
1705 .with_page_index_policy(PageIndexPolicy::Required)
1706 .parse_and_finish(&file)
1707 .unwrap()
1708 }
1709
1710 let testdata = arrow::util::test_util::parquet_test_data();
1711 let path = format!("{testdata}/alltypes_plain.parquet");
1712 let mut file = File::open(&path).await.unwrap();
1713 let file_size = file.metadata().await.unwrap().len();
1714 let metadata = ParquetMetaDataReader::new()
1715 .with_page_index_policy(PageIndexPolicy::Required)
1716 .load_and_finish(&mut file, file_size)
1717 .await
1718 .unwrap();
1719
1720 let tempdir = TempDir::new().unwrap();
1721 let metadata_path = tempdir.path().join("thrift_metadata.dat");
1722 write_metadata_to_local_file(metadata, &metadata_path);
1723 let metadata = read_metadata_from_local_file(&metadata_path);
1724
1725 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1726 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1727 let reader =
1728 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1729 .build()
1730 .unwrap();
1731
1732 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1734 assert_eq!(result.len(), 1);
1735 }
1736
1737 #[tokio::test]
1738 async fn test_cached_array_reader_sparse_offset_error() {
1739 use futures::TryStreamExt;
1740
1741 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
1742 use arrow_array::{BooleanArray, RecordBatch};
1743
1744 let testdata = arrow::util::test_util::parquet_test_data();
1745 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1746 let data = Bytes::from(std::fs::read(path).unwrap());
1747
1748 let async_reader = TestReader::new(data);
1749
1750 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1752 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1753 .await
1754 .unwrap();
1755
1756 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
1760
1761 let parquet_schema = builder.parquet_schema();
1765 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
1766 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
1767 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1768 });
1769 let filter = RowFilter::new(vec![Box::new(always_true)]);
1770
1771 let stream = builder
1774 .with_batch_size(8)
1775 .with_projection(proj)
1776 .with_row_selection(selection)
1777 .with_row_filter(filter)
1778 .build()
1779 .unwrap();
1780
1781 let _result: Vec<_> = stream.try_collect().await.unwrap();
1784 }
1785
1786 #[tokio::test]
1787 async fn test_predicate_cache_disabled() {
1788 let k = Int32Array::from_iter_values(0..10);
1789 let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
1790
1791 let mut buf = Vec::new();
1792 let props = WriterProperties::builder()
1794 .set_data_page_row_count_limit(1)
1795 .set_write_batch_size(1)
1796 .set_max_row_group_row_count(Some(10))
1797 .set_write_page_header_statistics(true)
1798 .build();
1799 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1800 writer.write(&data).unwrap();
1801 writer.close().unwrap();
1802
1803 let data = Bytes::from(buf);
1804 let metadata = ParquetMetaDataReader::new()
1805 .with_page_index_policy(PageIndexPolicy::Required)
1806 .parse_and_finish(&data)
1807 .unwrap();
1808 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1809
1810 let build_filter = || {
1812 let scalar = Int32Array::from_iter_values([5]);
1813 let predicate = ArrowPredicateFn::new(
1814 ProjectionMask::leaves(&parquet_schema, vec![0]),
1815 move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
1816 );
1817 RowFilter::new(vec![Box::new(predicate)])
1818 };
1819
1820 let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
1822
1823 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1824 let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1825
1826 let reader_with_cache = TestReader::new(data.clone());
1828 let requests_with_cache = reader_with_cache.requests.clone();
1829 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1830 reader_with_cache,
1831 reader_metadata.clone(),
1832 )
1833 .with_batch_size(1000)
1834 .with_row_selection(selection.clone())
1835 .with_row_filter(build_filter())
1836 .build()
1837 .unwrap();
1838 let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
1839
1840 let reader_without_cache = TestReader::new(data);
1842 let requests_without_cache = reader_without_cache.requests.clone();
1843 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1844 reader_without_cache,
1845 reader_metadata,
1846 )
1847 .with_batch_size(1000)
1848 .with_row_selection(selection)
1849 .with_row_filter(build_filter())
1850 .with_max_predicate_cache_size(0) .build()
1852 .unwrap();
1853 let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
1854
1855 assert_eq!(batches_with_cache, batches_without_cache);
1856
1857 let requests_with_cache = requests_with_cache.lock().unwrap();
1858 let requests_without_cache = requests_without_cache.lock().unwrap();
1859
1860 assert_eq!(requests_with_cache.len(), 11);
1862 assert_eq!(requests_without_cache.len(), 2);
1863
1864 assert_eq!(
1866 requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
1867 433
1868 );
1869 assert_eq!(
1870 requests_without_cache
1871 .iter()
1872 .map(|r| r.len())
1873 .sum::<usize>(),
1874 92
1875 );
1876 }
1877
1878 #[test]
1879 fn test_row_numbers_with_multiple_row_groups() {
1880 test_row_numbers_with_multiple_row_groups_helper(
1881 false,
1882 |path, selection, _row_filter, batch_size| {
1883 let runtime = tokio::runtime::Builder::new_current_thread()
1884 .enable_all()
1885 .build()
1886 .expect("Could not create runtime");
1887 runtime.block_on(async move {
1888 let file = tokio::fs::File::open(path).await.unwrap();
1889 let row_number_field = Arc::new(
1890 Field::new("row_number", DataType::Int64, false)
1891 .with_extension_type(RowNumber),
1892 );
1893 let options = ArrowReaderOptions::new()
1894 .with_virtual_columns(vec![row_number_field])
1895 .unwrap();
1896 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1897 .await
1898 .unwrap()
1899 .with_row_selection(selection)
1900 .with_batch_size(batch_size)
1901 .build()
1902 .expect("Could not create reader");
1903 reader.try_collect::<Vec<_>>().await.unwrap()
1904 })
1905 },
1906 );
1907 }
1908
1909 #[test]
1910 fn test_row_numbers_with_multiple_row_groups_and_filter() {
1911 test_row_numbers_with_multiple_row_groups_helper(
1912 true,
1913 |path, selection, row_filter, batch_size| {
1914 let runtime = tokio::runtime::Builder::new_current_thread()
1915 .enable_all()
1916 .build()
1917 .expect("Could not create runtime");
1918 runtime.block_on(async move {
1919 let file = tokio::fs::File::open(path).await.unwrap();
1920 let row_number_field = Arc::new(
1921 Field::new("row_number", DataType::Int64, false)
1922 .with_extension_type(RowNumber),
1923 );
1924 let options = ArrowReaderOptions::new()
1925 .with_virtual_columns(vec![row_number_field])
1926 .unwrap();
1927 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1928 .await
1929 .unwrap()
1930 .with_row_selection(selection)
1931 .with_row_filter(row_filter.expect("No row filter"))
1932 .with_batch_size(batch_size)
1933 .build()
1934 .expect("Could not create reader");
1935 reader.try_collect::<Vec<_>>().await.unwrap()
1936 })
1937 },
1938 );
1939 }
1940
1941 #[tokio::test]
1942 async fn test_nested_lists() -> Result<()> {
1943 let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
1945 let table_schema = Arc::new(Schema::new(vec![
1946 Field::new("id", DataType::Int32, false),
1947 Field::new("vector", DataType::List(list_inner_field.clone()), true),
1948 ]));
1949
1950 let mut list_builder =
1951 ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
1952 list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
1953 list_builder.append(true);
1954 list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
1955 list_builder.append(true);
1956 list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
1957 list_builder.append(true);
1958 list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
1959 list_builder.append(true);
1960 let list_array = list_builder.finish();
1961
1962 let data = vec![RecordBatch::try_new(
1963 table_schema.clone(),
1964 vec![
1965 Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
1966 Arc::new(list_array),
1967 ],
1968 )?];
1969
1970 let mut buffer = Vec::new();
1971 let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
1972
1973 for batch in data {
1974 writer.write(&batch).await?;
1975 }
1976
1977 writer.close().await?;
1978
1979 let reader = TestReader::new(Bytes::from(buffer));
1980 let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
1981
1982 let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
1983 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1984 });
1985
1986 let projection_mask = ProjectionMask::all();
1987
1988 let mut stream = builder
1989 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1990 .with_projection(projection_mask)
1991 .build()?;
1992
1993 while let Some(batch) = stream.next().await {
1994 let _ = batch.unwrap(); }
1996
1997 Ok(())
1998 }
1999}