1use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61pub trait AsyncFileReader: Send {
75 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80 async move {
81 let mut result = Vec::with_capacity(ranges.len());
82
83 for range in ranges.into_iter() {
84 let data = self.get_bytes(range).await?;
85 result.push(data);
86 }
87
88 Ok(result)
89 }
90 .boxed()
91 }
92
93 fn get_metadata<'a>(
110 &'a mut self,
111 options: Option<&'a ArrowReaderOptions>,
112 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118 self.as_mut().get_bytes(range)
119 }
120
121 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122 self.as_mut().get_byte_ranges(ranges)
123 }
124
125 fn get_metadata<'a>(
126 &'a mut self,
127 options: Option<&'a ArrowReaderOptions>,
128 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129 self.as_mut().get_metadata(options)
130 }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135 async move {
136 self.seek(SeekFrom::End(-(suffix as i64))).await?;
137 let mut buf = Vec::with_capacity(suffix);
138 self.take(suffix as _).read_to_end(&mut buf).await?;
139 Ok(buf.into())
140 }
141 .boxed()
142 }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147 async move {
148 self.seek(SeekFrom::Start(range.start)).await?;
149
150 let to_read = range.end - range.start;
151 let mut buffer = Vec::with_capacity(to_read.try_into()?);
152 let read = self.take(to_read).read_to_end(&mut buffer).await?;
153 if read as u64 != to_read {
154 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155 }
156
157 Ok(buffer.into())
158 }
159 .boxed()
160 }
161
162 fn get_metadata<'a>(
163 &'a mut self,
164 options: Option<&'a ArrowReaderOptions>,
165 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166 async move {
167 let metadata_opts = options.map(|o| o.metadata_options().clone());
168 let mut metadata_reader =
169 ParquetMetaDataReader::new().with_metadata_options(metadata_opts);
170
171 if let Some(opts) = options {
172 metadata_reader = metadata_reader
173 .with_column_index_policy(opts.column_index_policy())
174 .with_offset_index_policy(opts.offset_index_policy());
175 }
176
177 #[cfg(feature = "encryption")]
178 let metadata_reader = metadata_reader.with_decryption_properties(
179 options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
180 );
181
182 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183 Ok(Arc::new(parquet_metadata))
184 }
185 .boxed()
186 }
187}
188
189impl ArrowReaderMetadata {
190 pub async fn load_async<T: AsyncFileReader>(
194 input: &mut T,
195 options: ArrowReaderOptions,
196 ) -> Result<Self> {
197 let metadata = input.get_metadata(Some(&options)).await?;
198 Self::try_new(metadata, options)
199 }
200}
201
202#[doc(hidden)]
203pub struct AsyncReader<T>(T);
208
209pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224 pub async fn new(input: T) -> Result<Self> {
355 Self::new_with_options(input, Default::default()).await
356 }
357
358 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362 Ok(Self::new_with_metadata(input, metadata))
363 }
364
365 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411 Self::new_builder(AsyncReader(input), metadata)
412 }
413
414 pub async fn get_row_group_column_bloom_filter(
420 &mut self,
421 row_group_idx: usize,
422 column_idx: usize,
423 ) -> Result<Option<Sbbf>> {
424 let metadata = self.metadata.row_group(row_group_idx);
425 let column_metadata = metadata.column(column_idx);
426
427 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428 offset
429 .try_into()
430 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431 } else {
432 return Ok(None);
433 };
434
435 let buffer = match column_metadata.bloom_filter_length() {
436 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437 None => self
438 .input
439 .0
440 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441 }
442 .await?;
443
444 let (header, bitset_offset) =
445 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447 match header.algorithm {
448 BloomFilterAlgorithm::BLOCK => {
449 }
451 }
452 match header.compression {
453 BloomFilterCompression::UNCOMPRESSED => {
454 }
456 }
457 match header.hash {
458 BloomFilterHash::XXHASH => {
459 }
461 }
462
463 let bitset = match column_metadata.bloom_filter_length() {
464 Some(_) => buffer.slice(
465 (TryInto::<usize>::try_into(bitset_offset).unwrap()
466 - TryInto::<usize>::try_into(offset).unwrap())..,
467 ),
468 None => {
469 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470 ParquetError::General("Bloom filter length is invalid".to_string())
471 })?;
472 self.input
473 .0
474 .get_bytes(bitset_offset..bitset_offset + bitset_length)
475 .await?
476 }
477 };
478 Ok(Some(Sbbf::new(&bitset)))
479 }
480
481 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485 let Self {
486 input,
487 metadata,
488 schema,
489 fields,
490 batch_size,
491 row_groups,
492 projection,
493 filter,
494 selection,
495 row_selection_policy: selection_strategy,
496 limit,
497 offset,
498 metrics,
499 max_predicate_cache_size,
500 } = self;
501
502 let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
505 let projected_fields = schema
506 .fields
507 .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
508 let projected_schema = Arc::new(Schema::new(projected_fields));
509
510 let decoder = ParquetPushDecoderBuilder {
511 input: NoInput,
512 metadata,
513 schema,
514 fields,
515 projection,
516 filter,
517 selection,
518 row_selection_policy: selection_strategy,
519 batch_size,
520 row_groups,
521 limit,
522 offset,
523 metrics,
524 max_predicate_cache_size,
525 }
526 .build()?;
527
528 let request_state = RequestState::None { input: input.0 };
529
530 Ok(ParquetRecordBatchStream {
531 schema: projected_schema,
532 decoder,
533 request_state,
534 })
535 }
536}
537
538enum RequestState<T> {
542 None {
544 input: T,
545 },
546 Outstanding {
548 ranges: Vec<Range<u64>>,
550 future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
555 },
556 Done,
557}
558
559impl<T> RequestState<T>
560where
561 T: AsyncFileReader + Unpin + Send + 'static,
562{
563 fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
565 let ranges_captured = ranges.clone();
566
567 let future = async move {
572 let data = input.get_byte_ranges(ranges_captured).await?;
573 Ok((input, data))
574 }
575 .boxed();
576 RequestState::Outstanding { ranges, future }
577 }
578}
579
580impl<T> std::fmt::Debug for RequestState<T> {
581 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582 match self {
583 RequestState::None { input: _ } => f
584 .debug_struct("RequestState::None")
585 .field("input", &"...")
586 .finish(),
587 RequestState::Outstanding { ranges, .. } => f
588 .debug_struct("RequestState::Outstanding")
589 .field("ranges", &ranges)
590 .finish(),
591 RequestState::Done => {
592 write!(f, "RequestState::Done")
593 }
594 }
595 }
596}
597
598pub struct ParquetRecordBatchStream<T> {
618 schema: SchemaRef,
620 request_state: RequestState<T>,
622 decoder: ParquetPushDecoder,
624}
625
626impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
627 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
628 f.debug_struct("ParquetRecordBatchStream")
629 .field("request_state", &self.request_state)
630 .finish()
631 }
632}
633
634impl<T> ParquetRecordBatchStream<T> {
635 pub fn schema(&self) -> &SchemaRef {
640 &self.schema
641 }
642}
643
644impl<T> ParquetRecordBatchStream<T>
645where
646 T: AsyncFileReader + Unpin + Send + 'static,
647{
648 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
662 loop {
663 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
666 match request_state {
667 RequestState::None { input } => {
669 match self.decoder.try_next_reader()? {
670 DecodeResult::NeedsData(ranges) => {
671 self.request_state = RequestState::begin_request(input, ranges);
672 continue; }
674 DecodeResult::Data(reader) => {
675 self.request_state = RequestState::None { input };
676 return Ok(Some(reader));
677 }
678 DecodeResult::Finished => return Ok(None),
679 }
680 }
681 RequestState::Outstanding { ranges, future } => {
682 let (input, data) = future.await?;
683 self.decoder.push_ranges(ranges, data)?;
685 self.request_state = RequestState::None { input };
686 continue; }
688 RequestState::Done => {
689 self.request_state = RequestState::Done;
690 return Ok(None);
691 }
692 }
693 }
694 }
695}
696
697impl<T> Stream for ParquetRecordBatchStream<T>
698where
699 T: AsyncFileReader + Unpin + Send + 'static,
700{
701 type Item = Result<RecordBatch>;
702 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
703 match self.poll_next_inner(cx) {
704 Ok(res) => {
705 res.map(|res| Ok(res).transpose())
708 }
709 Err(e) => {
710 self.request_state = RequestState::Done;
711 Poll::Ready(Some(Err(e)))
712 }
713 }
714 }
715}
716
717impl<T> ParquetRecordBatchStream<T>
718where
719 T: AsyncFileReader + Unpin + Send + 'static,
720{
721 fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
726 loop {
727 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
728 match request_state {
729 RequestState::None { input } => {
730 match self.decoder.try_decode()? {
732 DecodeResult::NeedsData(ranges) => {
733 self.request_state = RequestState::begin_request(input, ranges);
734 continue; }
736 DecodeResult::Data(batch) => {
737 self.request_state = RequestState::None { input };
738 return Ok(Poll::Ready(Some(batch)));
739 }
740 DecodeResult::Finished => {
741 self.request_state = RequestState::Done;
742 return Ok(Poll::Ready(None));
743 }
744 }
745 }
746 RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
747 Poll::Ready(result) => {
749 let (input, data) = result?;
750 self.decoder.push_ranges(ranges, data)?;
752 self.request_state = RequestState::None { input };
753 continue; }
755 Poll::Pending => {
756 self.request_state = RequestState::Outstanding { ranges, future };
757 return Ok(Poll::Pending);
758 }
759 },
760 RequestState::Done => {
761 self.request_state = RequestState::Done;
763 return Ok(Poll::Ready(None));
764 }
765 }
766 }
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
774 use crate::arrow::arrow_reader::{
775 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
776 };
777 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
778 use crate::arrow::schema::virtual_type::RowNumber;
779 use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
780 use crate::file::metadata::PageIndexPolicy;
781 use crate::file::metadata::ParquetMetaDataReader;
782 use crate::file::properties::WriterProperties;
783 use arrow::compute::kernels::cmp::eq;
784 use arrow::error::Result as ArrowResult;
785 use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
786 use arrow_array::cast::AsArray;
787 use arrow_array::types::Int32Type;
788 use arrow_array::{
789 Array, ArrayRef, BooleanArray, Int32Array, RecordBatchReader, Scalar, StringArray,
790 StructArray, UInt64Array,
791 };
792 use arrow_schema::{DataType, Field, Schema};
793 use futures::{StreamExt, TryStreamExt};
794 use rand::{Rng, rng};
795 use std::collections::HashMap;
796 use std::sync::{Arc, Mutex};
797 use tempfile::tempfile;
798
799 #[derive(Clone)]
800 struct TestReader {
801 data: Bytes,
802 metadata: Option<Arc<ParquetMetaData>>,
803 requests: Arc<Mutex<Vec<Range<usize>>>>,
804 }
805
806 impl TestReader {
807 fn new(data: Bytes) -> Self {
808 Self {
809 data,
810 metadata: Default::default(),
811 requests: Default::default(),
812 }
813 }
814 }
815
816 impl AsyncFileReader for TestReader {
817 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
818 let range = range.clone();
819 self.requests
820 .lock()
821 .unwrap()
822 .push(range.start as usize..range.end as usize);
823 futures::future::ready(Ok(self
824 .data
825 .slice(range.start as usize..range.end as usize)))
826 .boxed()
827 }
828
829 fn get_metadata<'a>(
830 &'a mut self,
831 options: Option<&'a ArrowReaderOptions>,
832 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
833 let mut metadata_reader = ParquetMetaDataReader::new();
834 if let Some(opts) = options {
835 metadata_reader = metadata_reader
836 .with_column_index_policy(opts.column_index_policy())
837 .with_offset_index_policy(opts.offset_index_policy());
838 }
839 self.metadata = Some(Arc::new(
840 metadata_reader.parse_and_finish(&self.data).unwrap(),
841 ));
842 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
843 }
844 }
845
846 #[tokio::test]
847 async fn test_async_reader() {
848 let testdata = arrow::util::test_util::parquet_test_data();
849 let path = format!("{testdata}/alltypes_plain.parquet");
850 let data = Bytes::from(std::fs::read(path).unwrap());
851
852 let async_reader = TestReader::new(data.clone());
853
854 let requests = async_reader.requests.clone();
855 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
856 .await
857 .unwrap();
858
859 let metadata = builder.metadata().clone();
860 assert_eq!(metadata.num_row_groups(), 1);
861
862 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
863 let stream = builder
864 .with_projection(mask.clone())
865 .with_batch_size(1024)
866 .build()
867 .unwrap();
868
869 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
870
871 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
872 .unwrap()
873 .with_projection(mask)
874 .with_batch_size(104)
875 .build()
876 .unwrap()
877 .collect::<ArrowResult<Vec<_>>>()
878 .unwrap();
879
880 assert_eq!(async_batches, sync_batches);
881
882 let requests = requests.lock().unwrap();
883 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
884 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
885
886 assert_eq!(
887 &requests[..],
888 &[
889 offset_1 as usize..(offset_1 + length_1) as usize,
890 offset_2 as usize..(offset_2 + length_2) as usize
891 ]
892 );
893 }
894
895 #[tokio::test]
896 async fn test_async_reader_with_next_row_group() {
897 let testdata = arrow::util::test_util::parquet_test_data();
898 let path = format!("{testdata}/alltypes_plain.parquet");
899 let data = Bytes::from(std::fs::read(path).unwrap());
900
901 let async_reader = TestReader::new(data.clone());
902
903 let requests = async_reader.requests.clone();
904 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
905 .await
906 .unwrap();
907
908 let metadata = builder.metadata().clone();
909 assert_eq!(metadata.num_row_groups(), 1);
910
911 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
912 let mut stream = builder
913 .with_projection(mask.clone())
914 .with_batch_size(1024)
915 .build()
916 .unwrap();
917
918 let mut readers = vec![];
919 while let Some(reader) = stream.next_row_group().await.unwrap() {
920 readers.push(reader);
921 }
922
923 let async_batches: Vec<_> = readers
924 .into_iter()
925 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
926 .collect();
927
928 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
929 .unwrap()
930 .with_projection(mask)
931 .with_batch_size(104)
932 .build()
933 .unwrap()
934 .collect::<ArrowResult<Vec<_>>>()
935 .unwrap();
936
937 assert_eq!(async_batches, sync_batches);
938
939 let requests = requests.lock().unwrap();
940 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
941 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
942
943 assert_eq!(
944 &requests[..],
945 &[
946 offset_1 as usize..(offset_1 + length_1) as usize,
947 offset_2 as usize..(offset_2 + length_2) as usize
948 ]
949 );
950 }
951
952 #[tokio::test]
953 async fn test_async_reader_with_index() {
954 let testdata = arrow::util::test_util::parquet_test_data();
955 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
956 let data = Bytes::from(std::fs::read(path).unwrap());
957
958 let async_reader = TestReader::new(data.clone());
959
960 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
961 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
962 .await
963 .unwrap();
964
965 let metadata_with_index = builder.metadata();
967 assert_eq!(metadata_with_index.num_row_groups(), 1);
968
969 let offset_index = metadata_with_index.offset_index().unwrap();
971 let column_index = metadata_with_index.column_index().unwrap();
972
973 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
974 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
975
976 let num_columns = metadata_with_index
977 .file_metadata()
978 .schema_descr()
979 .num_columns();
980
981 offset_index
983 .iter()
984 .for_each(|x| assert_eq!(x.len(), num_columns));
985 column_index
986 .iter()
987 .for_each(|x| assert_eq!(x.len(), num_columns));
988
989 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
990 let stream = builder
991 .with_projection(mask.clone())
992 .with_batch_size(1024)
993 .build()
994 .unwrap();
995
996 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
997
998 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
999 .unwrap()
1000 .with_projection(mask)
1001 .with_batch_size(1024)
1002 .build()
1003 .unwrap()
1004 .collect::<ArrowResult<Vec<_>>>()
1005 .unwrap();
1006
1007 assert_eq!(async_batches, sync_batches);
1008 }
1009
1010 #[tokio::test]
1011 async fn test_async_reader_with_limit() {
1012 let testdata = arrow::util::test_util::parquet_test_data();
1013 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1014 let data = Bytes::from(std::fs::read(path).unwrap());
1015
1016 let metadata = ParquetMetaDataReader::new()
1017 .parse_and_finish(&data)
1018 .unwrap();
1019 let metadata = Arc::new(metadata);
1020
1021 assert_eq!(metadata.num_row_groups(), 1);
1022
1023 let async_reader = TestReader::new(data.clone());
1024
1025 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1026 .await
1027 .unwrap();
1028
1029 assert_eq!(builder.metadata().num_row_groups(), 1);
1030
1031 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1032 let stream = builder
1033 .with_projection(mask.clone())
1034 .with_batch_size(1024)
1035 .with_limit(1)
1036 .build()
1037 .unwrap();
1038
1039 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1040
1041 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1042 .unwrap()
1043 .with_projection(mask)
1044 .with_batch_size(1024)
1045 .with_limit(1)
1046 .build()
1047 .unwrap()
1048 .collect::<ArrowResult<Vec<_>>>()
1049 .unwrap();
1050
1051 assert_eq!(async_batches, sync_batches);
1052 }
1053
1054 #[tokio::test]
1055 async fn test_async_reader_skip_pages() {
1056 let testdata = arrow::util::test_util::parquet_test_data();
1057 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1058 let data = Bytes::from(std::fs::read(path).unwrap());
1059
1060 let async_reader = TestReader::new(data.clone());
1061
1062 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1063 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1064 .await
1065 .unwrap();
1066
1067 assert_eq!(builder.metadata().num_row_groups(), 1);
1068
1069 let selection = RowSelection::from(vec![
1070 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1079
1080 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1081
1082 let stream = builder
1083 .with_projection(mask.clone())
1084 .with_row_selection(selection.clone())
1085 .build()
1086 .expect("building stream");
1087
1088 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1089
1090 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1091 .unwrap()
1092 .with_projection(mask)
1093 .with_batch_size(1024)
1094 .with_row_selection(selection)
1095 .build()
1096 .unwrap()
1097 .collect::<ArrowResult<Vec<_>>>()
1098 .unwrap();
1099
1100 assert_eq!(async_batches, sync_batches);
1101 }
1102
1103 #[tokio::test]
1104 async fn test_fuzz_async_reader_selection() {
1105 let testdata = arrow::util::test_util::parquet_test_data();
1106 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1107 let data = Bytes::from(std::fs::read(path).unwrap());
1108
1109 let mut rand = rng();
1110
1111 for _ in 0..100 {
1112 let mut expected_rows = 0;
1113 let mut total_rows = 0;
1114 let mut skip = false;
1115 let mut selectors = vec![];
1116
1117 while total_rows < 7300 {
1118 let row_count: usize = rand.random_range(1..100);
1119
1120 let row_count = row_count.min(7300 - total_rows);
1121
1122 selectors.push(RowSelector { row_count, skip });
1123
1124 total_rows += row_count;
1125 if !skip {
1126 expected_rows += row_count;
1127 }
1128
1129 skip = !skip;
1130 }
1131
1132 let selection = RowSelection::from(selectors);
1133
1134 let async_reader = TestReader::new(data.clone());
1135
1136 let options =
1137 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1138 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1139 .await
1140 .unwrap();
1141
1142 assert_eq!(builder.metadata().num_row_groups(), 1);
1143
1144 let col_idx: usize = rand.random_range(0..13);
1145 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1146
1147 let stream = builder
1148 .with_projection(mask.clone())
1149 .with_row_selection(selection.clone())
1150 .build()
1151 .expect("building stream");
1152
1153 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1154
1155 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1156
1157 assert_eq!(actual_rows, expected_rows);
1158 }
1159 }
1160
1161 #[tokio::test]
1162 async fn test_async_reader_zero_row_selector() {
1163 let testdata = arrow::util::test_util::parquet_test_data();
1165 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1166 let data = Bytes::from(std::fs::read(path).unwrap());
1167
1168 let mut rand = rng();
1169
1170 let mut expected_rows = 0;
1171 let mut total_rows = 0;
1172 let mut skip = false;
1173 let mut selectors = vec![];
1174
1175 selectors.push(RowSelector {
1176 row_count: 0,
1177 skip: false,
1178 });
1179
1180 while total_rows < 7300 {
1181 let row_count: usize = rand.random_range(1..100);
1182
1183 let row_count = row_count.min(7300 - total_rows);
1184
1185 selectors.push(RowSelector { row_count, skip });
1186
1187 total_rows += row_count;
1188 if !skip {
1189 expected_rows += row_count;
1190 }
1191
1192 skip = !skip;
1193 }
1194
1195 let selection = RowSelection::from(selectors);
1196
1197 let async_reader = TestReader::new(data.clone());
1198
1199 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1200 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1201 .await
1202 .unwrap();
1203
1204 assert_eq!(builder.metadata().num_row_groups(), 1);
1205
1206 let col_idx: usize = rand.random_range(0..13);
1207 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1208
1209 let stream = builder
1210 .with_projection(mask.clone())
1211 .with_row_selection(selection.clone())
1212 .build()
1213 .expect("building stream");
1214
1215 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1216
1217 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1218
1219 assert_eq!(actual_rows, expected_rows);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_limit_multiple_row_groups() {
1224 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1225 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1226 let c = Int32Array::from_iter(0..6);
1227 let data = RecordBatch::try_from_iter([
1228 ("a", Arc::new(a) as ArrayRef),
1229 ("b", Arc::new(b) as ArrayRef),
1230 ("c", Arc::new(c) as ArrayRef),
1231 ])
1232 .unwrap();
1233
1234 let mut buf = Vec::with_capacity(1024);
1235 let props = WriterProperties::builder()
1236 .set_max_row_group_size(3)
1237 .build();
1238 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1239 writer.write(&data).unwrap();
1240 writer.close().unwrap();
1241
1242 let data: Bytes = buf.into();
1243 let metadata = ParquetMetaDataReader::new()
1244 .parse_and_finish(&data)
1245 .unwrap();
1246
1247 assert_eq!(metadata.num_row_groups(), 2);
1248
1249 let test = TestReader::new(data);
1250
1251 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1252 .await
1253 .unwrap()
1254 .with_batch_size(1024)
1255 .with_limit(4)
1256 .build()
1257 .unwrap();
1258
1259 let batches: Vec<_> = stream.try_collect().await.unwrap();
1260 assert_eq!(batches.len(), 2);
1262
1263 let batch = &batches[0];
1264 assert_eq!(batch.num_rows(), 3);
1266 assert_eq!(batch.num_columns(), 3);
1267 let col2 = batch.column(2).as_primitive::<Int32Type>();
1268 assert_eq!(col2.values(), &[0, 1, 2]);
1269
1270 let batch = &batches[1];
1271 assert_eq!(batch.num_rows(), 1);
1273 assert_eq!(batch.num_columns(), 3);
1274 let col2 = batch.column(2).as_primitive::<Int32Type>();
1275 assert_eq!(col2.values(), &[3]);
1276
1277 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1278 .await
1279 .unwrap()
1280 .with_offset(2)
1281 .with_limit(3)
1282 .build()
1283 .unwrap();
1284
1285 let batches: Vec<_> = stream.try_collect().await.unwrap();
1286 assert_eq!(batches.len(), 2);
1288
1289 let batch = &batches[0];
1290 assert_eq!(batch.num_rows(), 1);
1292 assert_eq!(batch.num_columns(), 3);
1293 let col2 = batch.column(2).as_primitive::<Int32Type>();
1294 assert_eq!(col2.values(), &[2]);
1295
1296 let batch = &batches[1];
1297 assert_eq!(batch.num_rows(), 2);
1299 assert_eq!(batch.num_columns(), 3);
1300 let col2 = batch.column(2).as_primitive::<Int32Type>();
1301 assert_eq!(col2.values(), &[3, 4]);
1302
1303 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1304 .await
1305 .unwrap()
1306 .with_offset(4)
1307 .with_limit(20)
1308 .build()
1309 .unwrap();
1310
1311 let batches: Vec<_> = stream.try_collect().await.unwrap();
1312 assert_eq!(batches.len(), 1);
1314
1315 let batch = &batches[0];
1316 assert_eq!(batch.num_rows(), 2);
1318 assert_eq!(batch.num_columns(), 3);
1319 let col2 = batch.column(2).as_primitive::<Int32Type>();
1320 assert_eq!(col2.values(), &[4, 5]);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_batch_size_overallocate() {
1325 let testdata = arrow::util::test_util::parquet_test_data();
1326 let path = format!("{testdata}/alltypes_plain.parquet");
1328 let data = Bytes::from(std::fs::read(path).unwrap());
1329
1330 let async_reader = TestReader::new(data.clone());
1331
1332 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1333 .await
1334 .unwrap();
1335
1336 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1337
1338 let builder = builder
1339 .with_projection(ProjectionMask::all())
1340 .with_batch_size(1024);
1341
1342 assert_ne!(1024, file_rows);
1345 assert_eq!(builder.batch_size, file_rows);
1346
1347 let _stream = builder.build().unwrap();
1348 }
1349
1350 #[tokio::test]
1351 async fn test_parquet_record_batch_stream_schema() {
1352 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1353 schema.flattened_fields().iter().map(|f| f.name()).collect()
1354 }
1355
1356 let mut metadata = HashMap::with_capacity(1);
1365 metadata.insert("key".to_string(), "value".to_string());
1366
1367 let nested_struct_array = StructArray::from(vec![
1368 (
1369 Arc::new(Field::new("d", DataType::Utf8, true)),
1370 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1371 ),
1372 (
1373 Arc::new(Field::new("e", DataType::Utf8, true)),
1374 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1375 ),
1376 ]);
1377 let struct_array = StructArray::from(vec![
1378 (
1379 Arc::new(Field::new("a", DataType::Int32, true)),
1380 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1381 ),
1382 (
1383 Arc::new(Field::new("b", DataType::UInt64, true)),
1384 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1385 ),
1386 (
1387 Arc::new(Field::new(
1388 "c",
1389 nested_struct_array.data_type().clone(),
1390 true,
1391 )),
1392 Arc::new(nested_struct_array) as ArrayRef,
1393 ),
1394 ]);
1395
1396 let schema =
1397 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1398 let record_batch = RecordBatch::from(struct_array)
1399 .with_schema(schema.clone())
1400 .unwrap();
1401
1402 let mut file = tempfile().unwrap();
1404 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1405 writer.write(&record_batch).unwrap();
1406 writer.close().unwrap();
1407
1408 let all_fields = ["a", "b", "c", "d", "e"];
1409 let projections = [
1411 (vec![], vec![]),
1412 (vec![0], vec!["a"]),
1413 (vec![0, 1], vec!["a", "b"]),
1414 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1415 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1416 ];
1417
1418 for (indices, expected_projected_names) in projections {
1420 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1421 assert_eq!(get_all_field_names(&builder), all_fields);
1423 assert_eq!(builder.metadata, metadata);
1424 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1426 assert_eq!(reader.metadata, HashMap::default());
1427 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1428 assert_eq!(batch.metadata, HashMap::default());
1429 };
1430
1431 let builder =
1432 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1433 let sync_builder_schema = builder.schema().clone();
1434 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1435 let mut reader = builder.with_projection(mask).build().unwrap();
1436 let sync_reader_schema = reader.schema();
1437 let batch = reader.next().unwrap().unwrap();
1438 let sync_batch_schema = batch.schema();
1439 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1440
1441 let file = tokio::fs::File::from(file.try_clone().unwrap());
1443 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1444 let async_builder_schema = builder.schema().clone();
1445 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1446 let mut reader = builder.with_projection(mask).build().unwrap();
1447 let async_reader_schema = reader.schema().clone();
1448 let batch = reader.next().await.unwrap().unwrap();
1449 let async_batch_schema = batch.schema();
1450 assert_schemas(
1451 async_builder_schema,
1452 async_reader_schema,
1453 async_batch_schema,
1454 );
1455 }
1456 }
1457
1458 #[tokio::test]
1459 async fn test_nested_skip() {
1460 let schema = Arc::new(Schema::new(vec![
1461 Field::new("col_1", DataType::UInt64, false),
1462 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1463 ]));
1464
1465 let props = WriterProperties::builder()
1467 .set_data_page_row_count_limit(256)
1468 .set_write_batch_size(256)
1469 .set_max_row_group_size(1024);
1470
1471 let mut file = tempfile().unwrap();
1473 let mut writer =
1474 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1475
1476 let mut builder = ListBuilder::new(StringBuilder::new());
1477 for id in 0..1024 {
1478 match id % 3 {
1479 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1480 1 => builder.append_value([Some(format!("id_{id}"))]),
1481 _ => builder.append_null(),
1482 }
1483 }
1484 let refs = vec![
1485 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1486 Arc::new(builder.finish()) as ArrayRef,
1487 ];
1488
1489 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1490 writer.write(&batch).unwrap();
1491 writer.close().unwrap();
1492
1493 let selections = [
1494 RowSelection::from(vec![
1495 RowSelector::skip(313),
1496 RowSelector::select(1),
1497 RowSelector::skip(709),
1498 RowSelector::select(1),
1499 ]),
1500 RowSelection::from(vec![
1501 RowSelector::skip(255),
1502 RowSelector::select(1),
1503 RowSelector::skip(767),
1504 RowSelector::select(1),
1505 ]),
1506 RowSelection::from(vec![
1507 RowSelector::select(255),
1508 RowSelector::skip(1),
1509 RowSelector::select(767),
1510 RowSelector::skip(1),
1511 ]),
1512 RowSelection::from(vec![
1513 RowSelector::skip(254),
1514 RowSelector::select(1),
1515 RowSelector::select(1),
1516 RowSelector::skip(767),
1517 RowSelector::select(1),
1518 ]),
1519 ];
1520
1521 for selection in selections {
1522 let expected = selection.row_count();
1523 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1525 tokio::fs::File::from_std(file.try_clone().unwrap()),
1526 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1527 )
1528 .await
1529 .unwrap();
1530
1531 reader = reader.with_row_selection(selection);
1532
1533 let mut stream = reader.build().unwrap();
1534
1535 let mut total_rows = 0;
1536 while let Some(rb) = stream.next().await {
1537 let rb = rb.unwrap();
1538 total_rows += rb.num_rows();
1539 }
1540 assert_eq!(total_rows, expected);
1541 }
1542 }
1543
1544 #[tokio::test]
1545 #[allow(deprecated)]
1546 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1547 use tokio::fs::File;
1548 let testdata = arrow::util::test_util::parquet_test_data();
1549 let path = format!("{testdata}/alltypes_plain.parquet");
1550 let mut file = File::open(&path).await.unwrap();
1551 let file_size = file.metadata().await.unwrap().len();
1552 let mut metadata = ParquetMetaDataReader::new()
1553 .with_page_indexes(true)
1554 .load_and_finish(&mut file, file_size)
1555 .await
1556 .unwrap();
1557
1558 metadata.set_offset_index(Some(vec![]));
1559 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1560 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1561 let reader =
1562 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1563 .build()
1564 .unwrap();
1565
1566 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1567 assert_eq!(result.len(), 1);
1568 }
1569
1570 #[tokio::test]
1571 #[allow(deprecated)]
1572 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1573 use tokio::fs::File;
1574 let testdata = arrow::util::test_util::parquet_test_data();
1575 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1576 let mut file = File::open(&path).await.unwrap();
1577 let file_size = file.metadata().await.unwrap().len();
1578 let metadata = ParquetMetaDataReader::new()
1579 .with_page_indexes(true)
1580 .load_and_finish(&mut file, file_size)
1581 .await
1582 .unwrap();
1583
1584 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1585 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1586 let reader =
1587 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1588 .build()
1589 .unwrap();
1590
1591 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1592 assert_eq!(result.len(), 8);
1593 }
1594
1595 #[tokio::test]
1596 #[allow(deprecated)]
1597 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
1598 use tempfile::TempDir;
1599 use tokio::fs::File;
1600 fn write_metadata_to_local_file(
1601 metadata: ParquetMetaData,
1602 file: impl AsRef<std::path::Path>,
1603 ) {
1604 use crate::file::metadata::ParquetMetaDataWriter;
1605 use std::fs::File;
1606 let file = File::create(file).unwrap();
1607 ParquetMetaDataWriter::new(file, &metadata)
1608 .finish()
1609 .unwrap()
1610 }
1611
1612 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
1613 use std::fs::File;
1614 let file = File::open(file).unwrap();
1615 ParquetMetaDataReader::new()
1616 .with_page_indexes(true)
1617 .parse_and_finish(&file)
1618 .unwrap()
1619 }
1620
1621 let testdata = arrow::util::test_util::parquet_test_data();
1622 let path = format!("{testdata}/alltypes_plain.parquet");
1623 let mut file = File::open(&path).await.unwrap();
1624 let file_size = file.metadata().await.unwrap().len();
1625 let metadata = ParquetMetaDataReader::new()
1626 .with_page_indexes(true)
1627 .load_and_finish(&mut file, file_size)
1628 .await
1629 .unwrap();
1630
1631 let tempdir = TempDir::new().unwrap();
1632 let metadata_path = tempdir.path().join("thrift_metadata.dat");
1633 write_metadata_to_local_file(metadata, &metadata_path);
1634 let metadata = read_metadata_from_local_file(&metadata_path);
1635
1636 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1637 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1638 let reader =
1639 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1640 .build()
1641 .unwrap();
1642
1643 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1645 assert_eq!(result.len(), 1);
1646 }
1647
1648 #[tokio::test]
1649 async fn test_cached_array_reader_sparse_offset_error() {
1650 use futures::TryStreamExt;
1651
1652 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
1653 use arrow_array::{BooleanArray, RecordBatch};
1654
1655 let testdata = arrow::util::test_util::parquet_test_data();
1656 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1657 let data = Bytes::from(std::fs::read(path).unwrap());
1658
1659 let async_reader = TestReader::new(data);
1660
1661 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1663 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1664 .await
1665 .unwrap();
1666
1667 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
1671
1672 let parquet_schema = builder.parquet_schema();
1676 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
1677 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
1678 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1679 });
1680 let filter = RowFilter::new(vec![Box::new(always_true)]);
1681
1682 let stream = builder
1685 .with_batch_size(8)
1686 .with_projection(proj)
1687 .with_row_selection(selection)
1688 .with_row_filter(filter)
1689 .build()
1690 .unwrap();
1691
1692 let _result: Vec<_> = stream.try_collect().await.unwrap();
1695 }
1696
1697 #[tokio::test]
1698 async fn test_predicate_cache_disabled() {
1699 let k = Int32Array::from_iter_values(0..10);
1700 let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
1701
1702 let mut buf = Vec::new();
1703 let props = WriterProperties::builder()
1705 .set_data_page_row_count_limit(1)
1706 .set_write_batch_size(1)
1707 .set_max_row_group_size(10)
1708 .set_write_page_header_statistics(true)
1709 .build();
1710 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1711 writer.write(&data).unwrap();
1712 writer.close().unwrap();
1713
1714 let data = Bytes::from(buf);
1715 let metadata = ParquetMetaDataReader::new()
1716 .with_page_index_policy(PageIndexPolicy::Required)
1717 .parse_and_finish(&data)
1718 .unwrap();
1719 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1720
1721 let build_filter = || {
1723 let scalar = Int32Array::from_iter_values([5]);
1724 let predicate = ArrowPredicateFn::new(
1725 ProjectionMask::leaves(&parquet_schema, vec![0]),
1726 move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
1727 );
1728 RowFilter::new(vec![Box::new(predicate)])
1729 };
1730
1731 let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
1733
1734 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1735 let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1736
1737 let reader_with_cache = TestReader::new(data.clone());
1739 let requests_with_cache = reader_with_cache.requests.clone();
1740 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1741 reader_with_cache,
1742 reader_metadata.clone(),
1743 )
1744 .with_batch_size(1000)
1745 .with_row_selection(selection.clone())
1746 .with_row_filter(build_filter())
1747 .build()
1748 .unwrap();
1749 let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
1750
1751 let reader_without_cache = TestReader::new(data);
1753 let requests_without_cache = reader_without_cache.requests.clone();
1754 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1755 reader_without_cache,
1756 reader_metadata,
1757 )
1758 .with_batch_size(1000)
1759 .with_row_selection(selection)
1760 .with_row_filter(build_filter())
1761 .with_max_predicate_cache_size(0) .build()
1763 .unwrap();
1764 let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
1765
1766 assert_eq!(batches_with_cache, batches_without_cache);
1767
1768 let requests_with_cache = requests_with_cache.lock().unwrap();
1769 let requests_without_cache = requests_without_cache.lock().unwrap();
1770
1771 assert_eq!(requests_with_cache.len(), 11);
1773 assert_eq!(requests_without_cache.len(), 2);
1774
1775 assert_eq!(
1777 requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
1778 433
1779 );
1780 assert_eq!(
1781 requests_without_cache
1782 .iter()
1783 .map(|r| r.len())
1784 .sum::<usize>(),
1785 92
1786 );
1787 }
1788
1789 #[test]
1790 fn test_row_numbers_with_multiple_row_groups() {
1791 test_row_numbers_with_multiple_row_groups_helper(
1792 false,
1793 |path, selection, _row_filter, batch_size| {
1794 let runtime = tokio::runtime::Builder::new_current_thread()
1795 .enable_all()
1796 .build()
1797 .expect("Could not create runtime");
1798 runtime.block_on(async move {
1799 let file = tokio::fs::File::open(path).await.unwrap();
1800 let row_number_field = Arc::new(
1801 Field::new("row_number", DataType::Int64, false)
1802 .with_extension_type(RowNumber),
1803 );
1804 let options = ArrowReaderOptions::new()
1805 .with_virtual_columns(vec![row_number_field])
1806 .unwrap();
1807 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1808 .await
1809 .unwrap()
1810 .with_row_selection(selection)
1811 .with_batch_size(batch_size)
1812 .build()
1813 .expect("Could not create reader");
1814 reader.try_collect::<Vec<_>>().await.unwrap()
1815 })
1816 },
1817 );
1818 }
1819
1820 #[test]
1821 fn test_row_numbers_with_multiple_row_groups_and_filter() {
1822 test_row_numbers_with_multiple_row_groups_helper(
1823 true,
1824 |path, selection, row_filter, batch_size| {
1825 let runtime = tokio::runtime::Builder::new_current_thread()
1826 .enable_all()
1827 .build()
1828 .expect("Could not create runtime");
1829 runtime.block_on(async move {
1830 let file = tokio::fs::File::open(path).await.unwrap();
1831 let row_number_field = Arc::new(
1832 Field::new("row_number", DataType::Int64, false)
1833 .with_extension_type(RowNumber),
1834 );
1835 let options = ArrowReaderOptions::new()
1836 .with_virtual_columns(vec![row_number_field])
1837 .unwrap();
1838 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1839 .await
1840 .unwrap()
1841 .with_row_selection(selection)
1842 .with_row_filter(row_filter.expect("No row filter"))
1843 .with_batch_size(batch_size)
1844 .build()
1845 .expect("Could not create reader");
1846 reader.try_collect::<Vec<_>>().await.unwrap()
1847 })
1848 },
1849 );
1850 }
1851
1852 #[tokio::test]
1853 async fn test_nested_lists() -> Result<()> {
1854 let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
1856 let table_schema = Arc::new(Schema::new(vec![
1857 Field::new("id", DataType::Int32, false),
1858 Field::new("vector", DataType::List(list_inner_field.clone()), true),
1859 ]));
1860
1861 let mut list_builder =
1862 ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
1863 list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
1864 list_builder.append(true);
1865 list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
1866 list_builder.append(true);
1867 list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
1868 list_builder.append(true);
1869 list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
1870 list_builder.append(true);
1871 let list_array = list_builder.finish();
1872
1873 let data = vec![RecordBatch::try_new(
1874 table_schema.clone(),
1875 vec![
1876 Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
1877 Arc::new(list_array),
1878 ],
1879 )?];
1880
1881 let mut buffer = Vec::new();
1882 let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
1883
1884 for batch in data {
1885 writer.write(&batch).await?;
1886 }
1887
1888 writer.close().await?;
1889
1890 let reader = TestReader::new(Bytes::from(buffer));
1891 let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
1892
1893 let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
1894 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1895 });
1896
1897 let projection_mask = ProjectionMask::all();
1898
1899 let mut stream = builder
1900 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1901 .with_projection(projection_mask)
1902 .build()?;
1903
1904 while let Some(batch) = stream.next().await {
1905 let _ = batch.unwrap(); }
1907
1908 Ok(())
1909 }
1910}