1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::Bytes;
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::arrow_reader::{
42    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
43    RowFilter, RowSelection,
44};
45
46use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
47use crate::bloom_filter::{
48    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
49};
50use crate::errors::{ParquetError, Result};
51use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
52
53mod metadata;
54pub use metadata::*;
55
56#[cfg(feature = "object_store")]
57mod store;
58
59use crate::arrow::ProjectionMask;
60use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache};
61use crate::arrow::arrow_reader::ReadPlanBuilder;
62use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
63use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup};
64use crate::arrow::schema::ParquetField;
65#[cfg(feature = "object_store")]
66pub use store::*;
67
68pub trait AsyncFileReader: Send {
82    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
84
85    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
87        async move {
88            let mut result = Vec::with_capacity(ranges.len());
89
90            for range in ranges.into_iter() {
91                let data = self.get_bytes(range).await?;
92                result.push(data);
93            }
94
95            Ok(result)
96        }
97        .boxed()
98    }
99
100    fn get_metadata<'a>(
117        &'a mut self,
118        options: Option<&'a ArrowReaderOptions>,
119    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
120}
121
122impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
124    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
125        self.as_mut().get_bytes(range)
126    }
127
128    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
129        self.as_mut().get_byte_ranges(ranges)
130    }
131
132    fn get_metadata<'a>(
133        &'a mut self,
134        options: Option<&'a ArrowReaderOptions>,
135    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
136        self.as_mut().get_metadata(options)
137    }
138}
139
140impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
141    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
142        async move {
143            self.seek(SeekFrom::End(-(suffix as i64))).await?;
144            let mut buf = Vec::with_capacity(suffix);
145            self.take(suffix as _).read_to_end(&mut buf).await?;
146            Ok(buf.into())
147        }
148        .boxed()
149    }
150}
151
152impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
153    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
154        async move {
155            self.seek(SeekFrom::Start(range.start)).await?;
156
157            let to_read = range.end - range.start;
158            let mut buffer = Vec::with_capacity(to_read.try_into()?);
159            let read = self.take(to_read).read_to_end(&mut buffer).await?;
160            if read as u64 != to_read {
161                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
162            }
163
164            Ok(buffer.into())
165        }
166        .boxed()
167    }
168
169    fn get_metadata<'a>(
170        &'a mut self,
171        options: Option<&'a ArrowReaderOptions>,
172    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
173        async move {
174            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
175                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
176            );
177
178            #[cfg(feature = "encryption")]
179            let metadata_reader = metadata_reader.with_decryption_properties(
180                options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
181            );
182
183            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
184            Ok(Arc::new(parquet_metadata))
185        }
186        .boxed()
187    }
188}
189
190impl ArrowReaderMetadata {
191    pub async fn load_async<T: AsyncFileReader>(
195        input: &mut T,
196        options: ArrowReaderOptions,
197    ) -> Result<Self> {
198        let metadata = input.get_metadata(Some(&options)).await?;
199        Self::try_new(metadata, options)
200    }
201}
202
203#[doc(hidden)]
204pub struct AsyncReader<T>(T);
209
210pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
223
224impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
225    pub async fn new(input: T) -> Result<Self> {
356        Self::new_with_options(input, Default::default()).await
357    }
358
359    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
362        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
363        Ok(Self::new_with_metadata(input, metadata))
364    }
365
366    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
412        Self::new_builder(AsyncReader(input), metadata)
413    }
414
415    pub async fn get_row_group_column_bloom_filter(
421        &mut self,
422        row_group_idx: usize,
423        column_idx: usize,
424    ) -> Result<Option<Sbbf>> {
425        let metadata = self.metadata.row_group(row_group_idx);
426        let column_metadata = metadata.column(column_idx);
427
428        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
429            offset
430                .try_into()
431                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
432        } else {
433            return Ok(None);
434        };
435
436        let buffer = match column_metadata.bloom_filter_length() {
437            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
438            None => self
439                .input
440                .0
441                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
442        }
443        .await?;
444
445        let (header, bitset_offset) =
446            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
447
448        match header.algorithm {
449            BloomFilterAlgorithm::BLOCK => {
450                }
452        }
453        match header.compression {
454            BloomFilterCompression::UNCOMPRESSED => {
455                }
457        }
458        match header.hash {
459            BloomFilterHash::XXHASH => {
460                }
462        }
463
464        let bitset = match column_metadata.bloom_filter_length() {
465            Some(_) => buffer.slice(
466                (TryInto::<usize>::try_into(bitset_offset).unwrap()
467                    - TryInto::<usize>::try_into(offset).unwrap())..,
468            ),
469            None => {
470                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
471                    ParquetError::General("Bloom filter length is invalid".to_string())
472                })?;
473                self.input
474                    .0
475                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
476                    .await?
477            }
478        };
479        Ok(Some(Sbbf::new(&bitset)))
480    }
481
482    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
486        let num_row_groups = self.metadata.row_groups().len();
487
488        let row_groups = match self.row_groups {
489            Some(row_groups) => {
490                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
491                    return Err(general_err!(
492                        "row group {} out of bounds 0..{}",
493                        col,
494                        num_row_groups
495                    ));
496                }
497                row_groups.into()
498            }
499            None => (0..self.metadata.row_groups().len()).collect(),
500        };
501
502        let batch_size = self
504            .batch_size
505            .min(self.metadata.file_metadata().num_rows() as usize);
506        let reader_factory = ReaderFactory {
507            input: self.input.0,
508            filter: self.filter,
509            metadata: self.metadata.clone(),
510            fields: self.fields,
511            limit: self.limit,
512            offset: self.offset,
513            metrics: self.metrics,
514            max_predicate_cache_size: self.max_predicate_cache_size,
515        };
516
517        let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
520            Some(DataType::Struct(fields)) => {
521                fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
522            }
523            None => Fields::empty(),
524            _ => unreachable!("Must be Struct for root type"),
525        };
526        let schema = Arc::new(Schema::new(projected_fields));
527
528        Ok(ParquetRecordBatchStream {
529            metadata: self.metadata,
530            batch_size,
531            row_groups,
532            projection: self.projection,
533            selection: self.selection,
534            schema,
535            reader_factory: Some(reader_factory),
536            state: StreamState::Init,
537        })
538    }
539}
540
541type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
546
547struct ReaderFactory<T> {
550    metadata: Arc<ParquetMetaData>,
551
552    fields: Option<Arc<ParquetField>>,
554
555    input: T,
556
557    filter: Option<RowFilter>,
559
560    limit: Option<usize>,
562
563    offset: Option<usize>,
565
566    metrics: ArrowReaderMetrics,
568
569    max_predicate_cache_size: usize,
573}
574
575impl<T> ReaderFactory<T>
576where
577    T: AsyncFileReader + Send,
578{
579    async fn read_row_group(
585        mut self,
586        row_group_idx: usize,
587        selection: Option<RowSelection>,
588        projection: ProjectionMask,
589        batch_size: usize,
590    ) -> ReadResult<T> {
591        let meta = self.metadata.row_group(row_group_idx);
594        let offset_index = self
595            .metadata
596            .offset_index()
597            .filter(|index| !index.is_empty())
599            .map(|x| x[row_group_idx].as_slice());
600
601        let cache_projection = match self.compute_cache_projection(&projection) {
603            Some(projection) => projection,
604            None => ProjectionMask::none(meta.columns().len()),
605        };
606        let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
607            batch_size,
608            self.max_predicate_cache_size,
609        )));
610
611        let mut row_group = InMemoryRowGroup {
612            row_count: meta.num_rows() as usize,
614            column_chunks: vec![None; meta.columns().len()],
615            offset_index,
616            row_group_idx,
617            metadata: self.metadata.as_ref(),
618        };
619
620        let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
621
622        let filter = self.filter.as_mut();
623        let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
624
625        if let Some(filter) = filter {
627            let cache_options = cache_options_builder.clone().producer();
628
629            for predicate in filter.predicates.iter_mut() {
630                if !plan_builder.selects_any() {
631                    return Ok((self, None)); }
633
634                let selection = plan_builder.selection();
636                let cache_mask = Some(&cache_projection);
638                row_group
639                    .fetch(
640                        &mut self.input,
641                        predicate.projection(),
642                        selection,
643                        batch_size,
644                        cache_mask,
645                    )
646                    .await?;
647
648                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
649                    .with_cache_options(Some(&cache_options))
650                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
651
652                plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
653            }
654        }
655
656        let rows_before = plan_builder
658            .num_rows_selected()
659            .unwrap_or(row_group.row_count);
660
661        if rows_before == 0 {
662            return Ok((self, None)); }
664
665        let plan_builder = plan_builder
667            .limited(row_group.row_count)
668            .with_offset(self.offset)
669            .with_limit(self.limit)
670            .build_limited();
671
672        let rows_after = plan_builder
673            .num_rows_selected()
674            .unwrap_or(row_group.row_count);
675
676        if let Some(offset) = &mut self.offset {
678            *offset = offset.saturating_sub(rows_before - rows_after)
681        }
682
683        if rows_after == 0 {
684            return Ok((self, None)); }
686
687        if let Some(limit) = &mut self.limit {
688            *limit -= rows_after;
689        }
690        row_group
692            .fetch(
694                &mut self.input,
695                &projection,
696                plan_builder.selection(),
697                batch_size,
698                None,
699            )
700            .await?;
701
702        let plan = plan_builder.build();
703
704        let cache_options = cache_options_builder.consumer();
705        let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
706            .with_cache_options(Some(&cache_options))
707            .build_array_reader(self.fields.as_deref(), &projection)?;
708
709        let reader = ParquetRecordBatchReader::new(array_reader, plan);
710
711        Ok((self, Some(reader)))
712    }
713
714    fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
716        if self.max_predicate_cache_size == 0 {
718            return None;
719        }
720
721        let filters = self.filter.as_ref()?;
722        let mut cache_projection = filters.predicates.first()?.projection().clone();
723        for predicate in filters.predicates.iter() {
724            cache_projection.union(predicate.projection());
725        }
726        cache_projection.intersect(projection);
727        self.exclude_nested_columns_from_cache(&cache_projection)
728    }
729
730    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
732        let schema = self.metadata.file_metadata().schema_descr();
733        let num_leaves = schema.num_columns();
734
735        let num_roots = schema.root_schema().get_fields().len();
737        let mut root_leaf_counts = vec![0usize; num_roots];
738        for leaf_idx in 0..num_leaves {
739            let root_idx = schema.get_column_root_idx(leaf_idx);
740            root_leaf_counts[root_idx] += 1;
741        }
742
743        let mut included_leaves = Vec::new();
745        for leaf_idx in 0..num_leaves {
746            if mask.leaf_included(leaf_idx) {
747                let root_idx = schema.get_column_root_idx(leaf_idx);
748                if root_leaf_counts[root_idx] == 1 {
749                    included_leaves.push(leaf_idx);
750                }
751            }
752        }
753
754        if included_leaves.is_empty() {
755            None
756        } else {
757            Some(ProjectionMask::leaves(schema, included_leaves))
758        }
759    }
760}
761
762enum StreamState<T> {
763    Init,
765    Decoding(ParquetRecordBatchReader),
767    Reading(BoxFuture<'static, ReadResult<T>>),
769    Error,
771}
772
773impl<T> std::fmt::Debug for StreamState<T> {
774    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
775        match self {
776            StreamState::Init => write!(f, "StreamState::Init"),
777            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
778            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
779            StreamState::Error => write!(f, "StreamState::Error"),
780        }
781    }
782}
783
784pub struct ParquetRecordBatchStream<T> {
802    metadata: Arc<ParquetMetaData>,
803
804    schema: SchemaRef,
805
806    row_groups: VecDeque<usize>,
807
808    projection: ProjectionMask,
809
810    batch_size: usize,
811
812    selection: Option<RowSelection>,
813
814    reader_factory: Option<ReaderFactory<T>>,
816
817    state: StreamState<T>,
818}
819
820impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
821    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
822        f.debug_struct("ParquetRecordBatchStream")
823            .field("metadata", &self.metadata)
824            .field("schema", &self.schema)
825            .field("batch_size", &self.batch_size)
826            .field("projection", &self.projection)
827            .field("state", &self.state)
828            .finish()
829    }
830}
831
832impl<T> ParquetRecordBatchStream<T> {
833    pub fn schema(&self) -> &SchemaRef {
838        &self.schema
839    }
840}
841
842impl<T> ParquetRecordBatchStream<T>
843where
844    T: AsyncFileReader + Unpin + Send + 'static,
845{
846    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
860        loop {
861            match &mut self.state {
862                StreamState::Decoding(_) | StreamState::Reading(_) => {
863                    return Err(ParquetError::General(
864                        "Cannot combine the use of next_row_group with the Stream API".to_string(),
865                    ));
866                }
867                StreamState::Init => {
868                    let row_group_idx = match self.row_groups.pop_front() {
869                        Some(idx) => idx,
870                        None => return Ok(None),
871                    };
872
873                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
874
875                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
876
877                    let reader_factory = self.reader_factory.take().expect("lost reader factory");
878
879                    let (reader_factory, maybe_reader) = reader_factory
880                        .read_row_group(
881                            row_group_idx,
882                            selection,
883                            self.projection.clone(),
884                            self.batch_size,
885                        )
886                        .await
887                        .inspect_err(|_| {
888                            self.state = StreamState::Error;
889                        })?;
890                    self.reader_factory = Some(reader_factory);
891
892                    if let Some(reader) = maybe_reader {
893                        return Ok(Some(reader));
894                    } else {
895                        continue;
897                    }
898                }
899                StreamState::Error => return Ok(None), }
901        }
902    }
903}
904
905impl<T> Stream for ParquetRecordBatchStream<T>
906where
907    T: AsyncFileReader + Unpin + Send + 'static,
908{
909    type Item = Result<RecordBatch>;
910
911    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
912        loop {
913            match &mut self.state {
914                StreamState::Decoding(batch_reader) => match batch_reader.next() {
915                    Some(Ok(batch)) => {
916                        return Poll::Ready(Some(Ok(batch)));
917                    }
918                    Some(Err(e)) => {
919                        self.state = StreamState::Error;
920                        return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
921                    }
922                    None => self.state = StreamState::Init,
923                },
924                StreamState::Init => {
925                    let row_group_idx = match self.row_groups.pop_front() {
926                        Some(idx) => idx,
927                        None => return Poll::Ready(None),
928                    };
929
930                    let reader = self.reader_factory.take().expect("lost reader factory");
931
932                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
933
934                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
935
936                    let fut = reader
937                        .read_row_group(
938                            row_group_idx,
939                            selection,
940                            self.projection.clone(),
941                            self.batch_size,
942                        )
943                        .boxed();
944
945                    self.state = StreamState::Reading(fut)
946                }
947                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
948                    Ok((reader_factory, maybe_reader)) => {
949                        self.reader_factory = Some(reader_factory);
950                        match maybe_reader {
951                            Some(reader) => self.state = StreamState::Decoding(reader),
953                            None => self.state = StreamState::Init,
955                        }
956                    }
957                    Err(e) => {
958                        self.state = StreamState::Error;
959                        return Poll::Ready(Some(Err(e)));
960                    }
961                },
962                StreamState::Error => return Poll::Ready(None), }
964        }
965    }
966}
967
968impl InMemoryRowGroup<'_> {
972    pub(crate) async fn fetch<T: AsyncFileReader + Send>(
978        &mut self,
979        input: &mut T,
980        projection: &ProjectionMask,
981        selection: Option<&RowSelection>,
982        batch_size: usize,
983        cache_mask: Option<&ProjectionMask>,
984    ) -> Result<()> {
985        let FetchRanges {
987            ranges,
988            page_start_offsets,
989        } = self.fetch_ranges(projection, selection, batch_size, cache_mask);
990        let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
992        self.fill_column_chunks(projection, page_start_offsets, chunk_data);
994        Ok(())
995    }
996}
997#[cfg(test)]
998mod tests {
999    use super::*;
1000    use crate::arrow::ArrowWriter;
1001    use crate::arrow::arrow_reader::{
1002        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1003    };
1004    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1005    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1006    use crate::file::metadata::ParquetMetaDataReader;
1007    use crate::file::properties::WriterProperties;
1008    use arrow::compute::kernels::cmp::eq;
1009    use arrow::error::Result as ArrowResult;
1010    use arrow_array::builder::{ListBuilder, StringBuilder};
1011    use arrow_array::cast::AsArray;
1012    use arrow_array::types::Int32Type;
1013    use arrow_array::{
1014        Array, ArrayRef, Int8Array, Int32Array, RecordBatchReader, Scalar, StringArray,
1015        StructArray, UInt64Array,
1016    };
1017    use arrow_schema::{DataType, Field, Schema};
1018    use futures::{StreamExt, TryStreamExt};
1019    use rand::{Rng, rng};
1020    use std::collections::HashMap;
1021    use std::sync::{Arc, Mutex};
1022    use tempfile::tempfile;
1023
1024    #[derive(Clone)]
1025    struct TestReader {
1026        data: Bytes,
1027        metadata: Option<Arc<ParquetMetaData>>,
1028        requests: Arc<Mutex<Vec<Range<usize>>>>,
1029    }
1030
1031    impl TestReader {
1032        fn new(data: Bytes) -> Self {
1033            Self {
1034                data,
1035                metadata: Default::default(),
1036                requests: Default::default(),
1037            }
1038        }
1039    }
1040
1041    impl AsyncFileReader for TestReader {
1042        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1043            let range = range.clone();
1044            self.requests
1045                .lock()
1046                .unwrap()
1047                .push(range.start as usize..range.end as usize);
1048            futures::future::ready(Ok(self
1049                .data
1050                .slice(range.start as usize..range.end as usize)))
1051            .boxed()
1052        }
1053
1054        fn get_metadata<'a>(
1055            &'a mut self,
1056            options: Option<&'a ArrowReaderOptions>,
1057        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1058            let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1059                PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1060            );
1061            self.metadata = Some(Arc::new(
1062                metadata_reader.parse_and_finish(&self.data).unwrap(),
1063            ));
1064            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1065        }
1066    }
1067
1068    #[tokio::test]
1069    async fn test_async_reader() {
1070        let testdata = arrow::util::test_util::parquet_test_data();
1071        let path = format!("{testdata}/alltypes_plain.parquet");
1072        let data = Bytes::from(std::fs::read(path).unwrap());
1073
1074        let async_reader = TestReader::new(data.clone());
1075
1076        let requests = async_reader.requests.clone();
1077        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1078            .await
1079            .unwrap();
1080
1081        let metadata = builder.metadata().clone();
1082        assert_eq!(metadata.num_row_groups(), 1);
1083
1084        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1085        let stream = builder
1086            .with_projection(mask.clone())
1087            .with_batch_size(1024)
1088            .build()
1089            .unwrap();
1090
1091        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1092
1093        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1094            .unwrap()
1095            .with_projection(mask)
1096            .with_batch_size(104)
1097            .build()
1098            .unwrap()
1099            .collect::<ArrowResult<Vec<_>>>()
1100            .unwrap();
1101
1102        assert_eq!(async_batches, sync_batches);
1103
1104        let requests = requests.lock().unwrap();
1105        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1106        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1107
1108        assert_eq!(
1109            &requests[..],
1110            &[
1111                offset_1 as usize..(offset_1 + length_1) as usize,
1112                offset_2 as usize..(offset_2 + length_2) as usize
1113            ]
1114        );
1115    }
1116
1117    #[tokio::test]
1118    async fn test_async_reader_with_next_row_group() {
1119        let testdata = arrow::util::test_util::parquet_test_data();
1120        let path = format!("{testdata}/alltypes_plain.parquet");
1121        let data = Bytes::from(std::fs::read(path).unwrap());
1122
1123        let async_reader = TestReader::new(data.clone());
1124
1125        let requests = async_reader.requests.clone();
1126        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1127            .await
1128            .unwrap();
1129
1130        let metadata = builder.metadata().clone();
1131        assert_eq!(metadata.num_row_groups(), 1);
1132
1133        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1134        let mut stream = builder
1135            .with_projection(mask.clone())
1136            .with_batch_size(1024)
1137            .build()
1138            .unwrap();
1139
1140        let mut readers = vec![];
1141        while let Some(reader) = stream.next_row_group().await.unwrap() {
1142            readers.push(reader);
1143        }
1144
1145        let async_batches: Vec<_> = readers
1146            .into_iter()
1147            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1148            .collect();
1149
1150        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1151            .unwrap()
1152            .with_projection(mask)
1153            .with_batch_size(104)
1154            .build()
1155            .unwrap()
1156            .collect::<ArrowResult<Vec<_>>>()
1157            .unwrap();
1158
1159        assert_eq!(async_batches, sync_batches);
1160
1161        let requests = requests.lock().unwrap();
1162        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1163        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1164
1165        assert_eq!(
1166            &requests[..],
1167            &[
1168                offset_1 as usize..(offset_1 + length_1) as usize,
1169                offset_2 as usize..(offset_2 + length_2) as usize
1170            ]
1171        );
1172    }
1173
1174    #[tokio::test]
1175    async fn test_async_reader_with_index() {
1176        let testdata = arrow::util::test_util::parquet_test_data();
1177        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1178        let data = Bytes::from(std::fs::read(path).unwrap());
1179
1180        let async_reader = TestReader::new(data.clone());
1181
1182        let options = ArrowReaderOptions::new().with_page_index(true);
1183        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1184            .await
1185            .unwrap();
1186
1187        let metadata_with_index = builder.metadata();
1189        assert_eq!(metadata_with_index.num_row_groups(), 1);
1190
1191        let offset_index = metadata_with_index.offset_index().unwrap();
1193        let column_index = metadata_with_index.column_index().unwrap();
1194
1195        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1196        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1197
1198        let num_columns = metadata_with_index
1199            .file_metadata()
1200            .schema_descr()
1201            .num_columns();
1202
1203        offset_index
1205            .iter()
1206            .for_each(|x| assert_eq!(x.len(), num_columns));
1207        column_index
1208            .iter()
1209            .for_each(|x| assert_eq!(x.len(), num_columns));
1210
1211        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1212        let stream = builder
1213            .with_projection(mask.clone())
1214            .with_batch_size(1024)
1215            .build()
1216            .unwrap();
1217
1218        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1219
1220        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1221            .unwrap()
1222            .with_projection(mask)
1223            .with_batch_size(1024)
1224            .build()
1225            .unwrap()
1226            .collect::<ArrowResult<Vec<_>>>()
1227            .unwrap();
1228
1229        assert_eq!(async_batches, sync_batches);
1230    }
1231
1232    #[tokio::test]
1233    async fn test_async_reader_with_limit() {
1234        let testdata = arrow::util::test_util::parquet_test_data();
1235        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1236        let data = Bytes::from(std::fs::read(path).unwrap());
1237
1238        let metadata = ParquetMetaDataReader::new()
1239            .parse_and_finish(&data)
1240            .unwrap();
1241        let metadata = Arc::new(metadata);
1242
1243        assert_eq!(metadata.num_row_groups(), 1);
1244
1245        let async_reader = TestReader::new(data.clone());
1246
1247        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1248            .await
1249            .unwrap();
1250
1251        assert_eq!(builder.metadata().num_row_groups(), 1);
1252
1253        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1254        let stream = builder
1255            .with_projection(mask.clone())
1256            .with_batch_size(1024)
1257            .with_limit(1)
1258            .build()
1259            .unwrap();
1260
1261        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1262
1263        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1264            .unwrap()
1265            .with_projection(mask)
1266            .with_batch_size(1024)
1267            .with_limit(1)
1268            .build()
1269            .unwrap()
1270            .collect::<ArrowResult<Vec<_>>>()
1271            .unwrap();
1272
1273        assert_eq!(async_batches, sync_batches);
1274    }
1275
1276    #[tokio::test]
1277    async fn test_async_reader_skip_pages() {
1278        let testdata = arrow::util::test_util::parquet_test_data();
1279        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1280        let data = Bytes::from(std::fs::read(path).unwrap());
1281
1282        let async_reader = TestReader::new(data.clone());
1283
1284        let options = ArrowReaderOptions::new().with_page_index(true);
1285        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1286            .await
1287            .unwrap();
1288
1289        assert_eq!(builder.metadata().num_row_groups(), 1);
1290
1291        let selection = RowSelection::from(vec![
1292            RowSelector::skip(21),   RowSelector::select(21), RowSelector::skip(41),   RowSelector::select(41), RowSelector::skip(25),   RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1301
1302        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1303
1304        let stream = builder
1305            .with_projection(mask.clone())
1306            .with_row_selection(selection.clone())
1307            .build()
1308            .expect("building stream");
1309
1310        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1311
1312        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1313            .unwrap()
1314            .with_projection(mask)
1315            .with_batch_size(1024)
1316            .with_row_selection(selection)
1317            .build()
1318            .unwrap()
1319            .collect::<ArrowResult<Vec<_>>>()
1320            .unwrap();
1321
1322        assert_eq!(async_batches, sync_batches);
1323    }
1324
1325    #[tokio::test]
1326    async fn test_fuzz_async_reader_selection() {
1327        let testdata = arrow::util::test_util::parquet_test_data();
1328        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1329        let data = Bytes::from(std::fs::read(path).unwrap());
1330
1331        let mut rand = rng();
1332
1333        for _ in 0..100 {
1334            let mut expected_rows = 0;
1335            let mut total_rows = 0;
1336            let mut skip = false;
1337            let mut selectors = vec![];
1338
1339            while total_rows < 7300 {
1340                let row_count: usize = rand.random_range(1..100);
1341
1342                let row_count = row_count.min(7300 - total_rows);
1343
1344                selectors.push(RowSelector { row_count, skip });
1345
1346                total_rows += row_count;
1347                if !skip {
1348                    expected_rows += row_count;
1349                }
1350
1351                skip = !skip;
1352            }
1353
1354            let selection = RowSelection::from(selectors);
1355
1356            let async_reader = TestReader::new(data.clone());
1357
1358            let options = ArrowReaderOptions::new().with_page_index(true);
1359            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1360                .await
1361                .unwrap();
1362
1363            assert_eq!(builder.metadata().num_row_groups(), 1);
1364
1365            let col_idx: usize = rand.random_range(0..13);
1366            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1367
1368            let stream = builder
1369                .with_projection(mask.clone())
1370                .with_row_selection(selection.clone())
1371                .build()
1372                .expect("building stream");
1373
1374            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1375
1376            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1377
1378            assert_eq!(actual_rows, expected_rows);
1379        }
1380    }
1381
1382    #[tokio::test]
1383    async fn test_async_reader_zero_row_selector() {
1384        let testdata = arrow::util::test_util::parquet_test_data();
1386        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1387        let data = Bytes::from(std::fs::read(path).unwrap());
1388
1389        let mut rand = rng();
1390
1391        let mut expected_rows = 0;
1392        let mut total_rows = 0;
1393        let mut skip = false;
1394        let mut selectors = vec![];
1395
1396        selectors.push(RowSelector {
1397            row_count: 0,
1398            skip: false,
1399        });
1400
1401        while total_rows < 7300 {
1402            let row_count: usize = rand.random_range(1..100);
1403
1404            let row_count = row_count.min(7300 - total_rows);
1405
1406            selectors.push(RowSelector { row_count, skip });
1407
1408            total_rows += row_count;
1409            if !skip {
1410                expected_rows += row_count;
1411            }
1412
1413            skip = !skip;
1414        }
1415
1416        let selection = RowSelection::from(selectors);
1417
1418        let async_reader = TestReader::new(data.clone());
1419
1420        let options = ArrowReaderOptions::new().with_page_index(true);
1421        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1422            .await
1423            .unwrap();
1424
1425        assert_eq!(builder.metadata().num_row_groups(), 1);
1426
1427        let col_idx: usize = rand.random_range(0..13);
1428        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1429
1430        let stream = builder
1431            .with_projection(mask.clone())
1432            .with_row_selection(selection.clone())
1433            .build()
1434            .expect("building stream");
1435
1436        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1437
1438        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1439
1440        assert_eq!(actual_rows, expected_rows);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_row_filter() {
1445        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1446        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1447        let data = RecordBatch::try_from_iter([
1448            ("a", Arc::new(a) as ArrayRef),
1449            ("b", Arc::new(b) as ArrayRef),
1450        ])
1451        .unwrap();
1452
1453        let mut buf = Vec::with_capacity(1024);
1454        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1455        writer.write(&data).unwrap();
1456        writer.close().unwrap();
1457
1458        let data: Bytes = buf.into();
1459        let metadata = ParquetMetaDataReader::new()
1460            .parse_and_finish(&data)
1461            .unwrap();
1462        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1463
1464        let test = TestReader::new(data);
1465        let requests = test.requests.clone();
1466
1467        let a_scalar = StringArray::from_iter_values(["b"]);
1468        let a_filter = ArrowPredicateFn::new(
1469            ProjectionMask::leaves(&parquet_schema, vec![0]),
1470            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1471        );
1472
1473        let filter = RowFilter::new(vec![Box::new(a_filter)]);
1474
1475        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1476        let stream = ParquetRecordBatchStreamBuilder::new(test)
1477            .await
1478            .unwrap()
1479            .with_projection(mask.clone())
1480            .with_batch_size(1024)
1481            .with_row_filter(filter)
1482            .build()
1483            .unwrap();
1484
1485        let batches: Vec<_> = stream.try_collect().await.unwrap();
1486        assert_eq!(batches.len(), 1);
1487
1488        let batch = &batches[0];
1489        assert_eq!(batch.num_columns(), 2);
1490
1491        assert_eq!(
1493            batch.column(0).as_ref(),
1494            &StringArray::from_iter_values(["b", "b", "b"])
1495        );
1496        assert_eq!(
1497            batch.column(1).as_ref(),
1498            &StringArray::from_iter_values(["2", "3", "4"])
1499        );
1500
1501        assert_eq!(requests.lock().unwrap().len(), 2);
1505    }
1506
1507    #[tokio::test]
1508    async fn test_two_row_filters() {
1509        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1510        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1511        let c = Int32Array::from_iter(0..6);
1512        let data = RecordBatch::try_from_iter([
1513            ("a", Arc::new(a) as ArrayRef),
1514            ("b", Arc::new(b) as ArrayRef),
1515            ("c", Arc::new(c) as ArrayRef),
1516        ])
1517        .unwrap();
1518
1519        let mut buf = Vec::with_capacity(1024);
1520        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1521        writer.write(&data).unwrap();
1522        writer.close().unwrap();
1523
1524        let data: Bytes = buf.into();
1525        let metadata = ParquetMetaDataReader::new()
1526            .parse_and_finish(&data)
1527            .unwrap();
1528        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1529
1530        let test = TestReader::new(data);
1531        let requests = test.requests.clone();
1532
1533        let a_scalar = StringArray::from_iter_values(["b"]);
1534        let a_filter = ArrowPredicateFn::new(
1535            ProjectionMask::leaves(&parquet_schema, vec![0]),
1536            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1537        );
1538
1539        let b_scalar = StringArray::from_iter_values(["4"]);
1540        let b_filter = ArrowPredicateFn::new(
1541            ProjectionMask::leaves(&parquet_schema, vec![1]),
1542            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1543        );
1544
1545        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1546
1547        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1548        let stream = ParquetRecordBatchStreamBuilder::new(test)
1549            .await
1550            .unwrap()
1551            .with_projection(mask.clone())
1552            .with_batch_size(1024)
1553            .with_row_filter(filter)
1554            .build()
1555            .unwrap();
1556
1557        let batches: Vec<_> = stream.try_collect().await.unwrap();
1558        assert_eq!(batches.len(), 1);
1559
1560        let batch = &batches[0];
1561        assert_eq!(batch.num_rows(), 1);
1562        assert_eq!(batch.num_columns(), 2);
1563
1564        let col = batch.column(0);
1565        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1566        assert_eq!(val, "b");
1567
1568        let col = batch.column(1);
1569        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1570        assert_eq!(val, 3);
1571
1572        assert_eq!(requests.lock().unwrap().len(), 3);
1577    }
1578
1579    #[tokio::test]
1580    async fn test_limit_multiple_row_groups() {
1581        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1582        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1583        let c = Int32Array::from_iter(0..6);
1584        let data = RecordBatch::try_from_iter([
1585            ("a", Arc::new(a) as ArrayRef),
1586            ("b", Arc::new(b) as ArrayRef),
1587            ("c", Arc::new(c) as ArrayRef),
1588        ])
1589        .unwrap();
1590
1591        let mut buf = Vec::with_capacity(1024);
1592        let props = WriterProperties::builder()
1593            .set_max_row_group_size(3)
1594            .build();
1595        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1596        writer.write(&data).unwrap();
1597        writer.close().unwrap();
1598
1599        let data: Bytes = buf.into();
1600        let metadata = ParquetMetaDataReader::new()
1601            .parse_and_finish(&data)
1602            .unwrap();
1603
1604        assert_eq!(metadata.num_row_groups(), 2);
1605
1606        let test = TestReader::new(data);
1607
1608        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1609            .await
1610            .unwrap()
1611            .with_batch_size(1024)
1612            .with_limit(4)
1613            .build()
1614            .unwrap();
1615
1616        let batches: Vec<_> = stream.try_collect().await.unwrap();
1617        assert_eq!(batches.len(), 2);
1619
1620        let batch = &batches[0];
1621        assert_eq!(batch.num_rows(), 3);
1623        assert_eq!(batch.num_columns(), 3);
1624        let col2 = batch.column(2).as_primitive::<Int32Type>();
1625        assert_eq!(col2.values(), &[0, 1, 2]);
1626
1627        let batch = &batches[1];
1628        assert_eq!(batch.num_rows(), 1);
1630        assert_eq!(batch.num_columns(), 3);
1631        let col2 = batch.column(2).as_primitive::<Int32Type>();
1632        assert_eq!(col2.values(), &[3]);
1633
1634        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1635            .await
1636            .unwrap()
1637            .with_offset(2)
1638            .with_limit(3)
1639            .build()
1640            .unwrap();
1641
1642        let batches: Vec<_> = stream.try_collect().await.unwrap();
1643        assert_eq!(batches.len(), 2);
1645
1646        let batch = &batches[0];
1647        assert_eq!(batch.num_rows(), 1);
1649        assert_eq!(batch.num_columns(), 3);
1650        let col2 = batch.column(2).as_primitive::<Int32Type>();
1651        assert_eq!(col2.values(), &[2]);
1652
1653        let batch = &batches[1];
1654        assert_eq!(batch.num_rows(), 2);
1656        assert_eq!(batch.num_columns(), 3);
1657        let col2 = batch.column(2).as_primitive::<Int32Type>();
1658        assert_eq!(col2.values(), &[3, 4]);
1659
1660        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1661            .await
1662            .unwrap()
1663            .with_offset(4)
1664            .with_limit(20)
1665            .build()
1666            .unwrap();
1667
1668        let batches: Vec<_> = stream.try_collect().await.unwrap();
1669        assert_eq!(batches.len(), 1);
1671
1672        let batch = &batches[0];
1673        assert_eq!(batch.num_rows(), 2);
1675        assert_eq!(batch.num_columns(), 3);
1676        let col2 = batch.column(2).as_primitive::<Int32Type>();
1677        assert_eq!(col2.values(), &[4, 5]);
1678    }
1679
1680    #[tokio::test]
1681    async fn test_row_filter_with_index() {
1682        let testdata = arrow::util::test_util::parquet_test_data();
1683        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1684        let data = Bytes::from(std::fs::read(path).unwrap());
1685
1686        let metadata = ParquetMetaDataReader::new()
1687            .parse_and_finish(&data)
1688            .unwrap();
1689        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1690
1691        assert_eq!(metadata.num_row_groups(), 1);
1692
1693        let async_reader = TestReader::new(data.clone());
1694
1695        let a_filter =
1696            ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1697                Ok(batch.column(0).as_boolean().clone())
1698            });
1699
1700        let b_scalar = Int8Array::from(vec![2]);
1701        let b_filter = ArrowPredicateFn::new(
1702            ProjectionMask::leaves(&parquet_schema, vec![2]),
1703            move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1704        );
1705
1706        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1707
1708        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1709
1710        let options = ArrowReaderOptions::new().with_page_index(true);
1711        let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1712            .await
1713            .unwrap()
1714            .with_projection(mask.clone())
1715            .with_batch_size(1024)
1716            .with_row_filter(filter)
1717            .build()
1718            .unwrap();
1719
1720        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1721
1722        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1723
1724        assert_eq!(total_rows, 730);
1725    }
1726
1727    #[tokio::test]
1728    #[allow(deprecated)]
1729    async fn test_in_memory_row_group_sparse() {
1730        let testdata = arrow::util::test_util::parquet_test_data();
1731        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1732        let data = Bytes::from(std::fs::read(path).unwrap());
1733
1734        let metadata = ParquetMetaDataReader::new()
1735            .with_page_indexes(true)
1736            .parse_and_finish(&data)
1737            .unwrap();
1738
1739        let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1740
1741        let mut metadata_builder = metadata.into_builder();
1742        let mut row_groups = metadata_builder.take_row_groups();
1743        row_groups.truncate(1);
1744        let row_group_meta = row_groups.pop().unwrap();
1745
1746        let metadata = metadata_builder
1747            .add_row_group(row_group_meta)
1748            .set_column_index(None)
1749            .set_offset_index(Some(vec![offset_index.clone()]))
1750            .build();
1751
1752        let metadata = Arc::new(metadata);
1753
1754        let num_rows = metadata.row_group(0).num_rows();
1755
1756        assert_eq!(metadata.num_row_groups(), 1);
1757
1758        let async_reader = TestReader::new(data.clone());
1759
1760        let requests = async_reader.requests.clone();
1761        let (_, fields) = parquet_to_arrow_schema_and_fields(
1762            metadata.file_metadata().schema_descr(),
1763            ProjectionMask::all(),
1764            None,
1765        )
1766        .unwrap();
1767
1768        let _schema_desc = metadata.file_metadata().schema_descr();
1769
1770        let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1771
1772        let reader_factory = ReaderFactory {
1773            metadata,
1774            fields: fields.map(Arc::new),
1775            input: async_reader,
1776            filter: None,
1777            limit: None,
1778            offset: None,
1779            metrics: ArrowReaderMetrics::disabled(),
1780            max_predicate_cache_size: 0,
1781        };
1782
1783        let mut skip = true;
1784        let mut pages = offset_index[0].page_locations.iter().peekable();
1785
1786        let mut selectors = vec![];
1788        let mut expected_page_requests: Vec<Range<usize>> = vec![];
1789        while let Some(page) = pages.next() {
1790            let num_rows = if let Some(next_page) = pages.peek() {
1791                next_page.first_row_index - page.first_row_index
1792            } else {
1793                num_rows - page.first_row_index
1794            };
1795
1796            if skip {
1797                selectors.push(RowSelector::skip(num_rows as usize));
1798            } else {
1799                selectors.push(RowSelector::select(num_rows as usize));
1800                let start = page.offset as usize;
1801                let end = start + page.compressed_page_size as usize;
1802                expected_page_requests.push(start..end);
1803            }
1804            skip = !skip;
1805        }
1806
1807        let selection = RowSelection::from(selectors);
1808
1809        let (_factory, _reader) = reader_factory
1810            .read_row_group(0, Some(selection), projection.clone(), 48)
1811            .await
1812            .expect("reading row group");
1813
1814        let requests = requests.lock().unwrap();
1815
1816        assert_eq!(&requests[..], &expected_page_requests)
1817    }
1818
1819    #[tokio::test]
1820    async fn test_batch_size_overallocate() {
1821        let testdata = arrow::util::test_util::parquet_test_data();
1822        let path = format!("{testdata}/alltypes_plain.parquet");
1824        let data = Bytes::from(std::fs::read(path).unwrap());
1825
1826        let async_reader = TestReader::new(data.clone());
1827
1828        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1829            .await
1830            .unwrap();
1831
1832        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1833
1834        let stream = builder
1835            .with_projection(ProjectionMask::all())
1836            .with_batch_size(1024)
1837            .build()
1838            .unwrap();
1839        assert_ne!(1024, file_rows);
1840        assert_eq!(stream.batch_size, file_rows);
1841    }
1842
1843    #[tokio::test]
1844    async fn test_get_row_group_column_bloom_filter_without_length() {
1845        let testdata = arrow::util::test_util::parquet_test_data();
1846        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1847        let data = Bytes::from(std::fs::read(path).unwrap());
1848        test_get_row_group_column_bloom_filter(data, false).await;
1849    }
1850
1851    #[tokio::test]
1852    async fn test_parquet_record_batch_stream_schema() {
1853        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1854            schema.flattened_fields().iter().map(|f| f.name()).collect()
1855        }
1856
1857        let mut metadata = HashMap::with_capacity(1);
1866        metadata.insert("key".to_string(), "value".to_string());
1867
1868        let nested_struct_array = StructArray::from(vec![
1869            (
1870                Arc::new(Field::new("d", DataType::Utf8, true)),
1871                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1872            ),
1873            (
1874                Arc::new(Field::new("e", DataType::Utf8, true)),
1875                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1876            ),
1877        ]);
1878        let struct_array = StructArray::from(vec![
1879            (
1880                Arc::new(Field::new("a", DataType::Int32, true)),
1881                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1882            ),
1883            (
1884                Arc::new(Field::new("b", DataType::UInt64, true)),
1885                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1886            ),
1887            (
1888                Arc::new(Field::new(
1889                    "c",
1890                    nested_struct_array.data_type().clone(),
1891                    true,
1892                )),
1893                Arc::new(nested_struct_array) as ArrayRef,
1894            ),
1895        ]);
1896
1897        let schema =
1898            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1899        let record_batch = RecordBatch::from(struct_array)
1900            .with_schema(schema.clone())
1901            .unwrap();
1902
1903        let mut file = tempfile().unwrap();
1905        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1906        writer.write(&record_batch).unwrap();
1907        writer.close().unwrap();
1908
1909        let all_fields = ["a", "b", "c", "d", "e"];
1910        let projections = [
1912            (vec![], vec![]),
1913            (vec![0], vec!["a"]),
1914            (vec![0, 1], vec!["a", "b"]),
1915            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1916            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1917        ];
1918
1919        for (indices, expected_projected_names) in projections {
1921            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1922                assert_eq!(get_all_field_names(&builder), all_fields);
1924                assert_eq!(builder.metadata, metadata);
1925                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1927                assert_eq!(reader.metadata, HashMap::default());
1928                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1929                assert_eq!(batch.metadata, HashMap::default());
1930            };
1931
1932            let builder =
1933                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1934            let sync_builder_schema = builder.schema().clone();
1935            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1936            let mut reader = builder.with_projection(mask).build().unwrap();
1937            let sync_reader_schema = reader.schema();
1938            let batch = reader.next().unwrap().unwrap();
1939            let sync_batch_schema = batch.schema();
1940            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1941
1942            let file = tokio::fs::File::from(file.try_clone().unwrap());
1944            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1945            let async_builder_schema = builder.schema().clone();
1946            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1947            let mut reader = builder.with_projection(mask).build().unwrap();
1948            let async_reader_schema = reader.schema().clone();
1949            let batch = reader.next().await.unwrap().unwrap();
1950            let async_batch_schema = batch.schema();
1951            assert_schemas(
1952                async_builder_schema,
1953                async_reader_schema,
1954                async_batch_schema,
1955            );
1956        }
1957    }
1958
1959    #[tokio::test]
1960    async fn test_get_row_group_column_bloom_filter_with_length() {
1961        let testdata = arrow::util::test_util::parquet_test_data();
1963        let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1964        let data = Bytes::from(std::fs::read(path).unwrap());
1965        let async_reader = TestReader::new(data.clone());
1966        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1967            .await
1968            .unwrap();
1969        let schema = builder.schema().clone();
1970        let stream = builder.build().unwrap();
1971        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1972
1973        let mut parquet_data = Vec::new();
1974        let props = WriterProperties::builder()
1975            .set_bloom_filter_enabled(true)
1976            .build();
1977        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1978        for batch in batches {
1979            writer.write(&batch).unwrap();
1980        }
1981        writer.close().unwrap();
1982
1983        test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1985    }
1986
1987    async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1988        let async_reader = TestReader::new(data.clone());
1989
1990        let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1991            .await
1992            .unwrap();
1993
1994        let metadata = builder.metadata();
1995        assert_eq!(metadata.num_row_groups(), 1);
1996        let row_group = metadata.row_group(0);
1997        let column = row_group.column(0);
1998        assert_eq!(column.bloom_filter_length().is_some(), with_length);
1999
2000        let sbbf = builder
2001            .get_row_group_column_bloom_filter(0, 0)
2002            .await
2003            .unwrap()
2004            .unwrap();
2005        assert!(sbbf.check(&"Hello"));
2006        assert!(!sbbf.check(&"Hello_Not_Exists"));
2007    }
2008
2009    #[tokio::test]
2010    async fn test_nested_skip() {
2011        let schema = Arc::new(Schema::new(vec![
2012            Field::new("col_1", DataType::UInt64, false),
2013            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2014        ]));
2015
2016        let props = WriterProperties::builder()
2018            .set_data_page_row_count_limit(256)
2019            .set_write_batch_size(256)
2020            .set_max_row_group_size(1024);
2021
2022        let mut file = tempfile().unwrap();
2024        let mut writer =
2025            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2026
2027        let mut builder = ListBuilder::new(StringBuilder::new());
2028        for id in 0..1024 {
2029            match id % 3 {
2030                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2031                1 => builder.append_value([Some(format!("id_{id}"))]),
2032                _ => builder.append_null(),
2033            }
2034        }
2035        let refs = vec![
2036            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2037            Arc::new(builder.finish()) as ArrayRef,
2038        ];
2039
2040        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2041        writer.write(&batch).unwrap();
2042        writer.close().unwrap();
2043
2044        let selections = [
2045            RowSelection::from(vec![
2046                RowSelector::skip(313),
2047                RowSelector::select(1),
2048                RowSelector::skip(709),
2049                RowSelector::select(1),
2050            ]),
2051            RowSelection::from(vec![
2052                RowSelector::skip(255),
2053                RowSelector::select(1),
2054                RowSelector::skip(767),
2055                RowSelector::select(1),
2056            ]),
2057            RowSelection::from(vec![
2058                RowSelector::select(255),
2059                RowSelector::skip(1),
2060                RowSelector::select(767),
2061                RowSelector::skip(1),
2062            ]),
2063            RowSelection::from(vec![
2064                RowSelector::skip(254),
2065                RowSelector::select(1),
2066                RowSelector::select(1),
2067                RowSelector::skip(767),
2068                RowSelector::select(1),
2069            ]),
2070        ];
2071
2072        for selection in selections {
2073            let expected = selection.row_count();
2074            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2076                tokio::fs::File::from_std(file.try_clone().unwrap()),
2077                ArrowReaderOptions::new().with_page_index(true),
2078            )
2079            .await
2080            .unwrap();
2081
2082            reader = reader.with_row_selection(selection);
2083
2084            let mut stream = reader.build().unwrap();
2085
2086            let mut total_rows = 0;
2087            while let Some(rb) = stream.next().await {
2088                let rb = rb.unwrap();
2089                total_rows += rb.num_rows();
2090            }
2091            assert_eq!(total_rows, expected);
2092        }
2093    }
2094
2095    #[tokio::test]
2096    async fn test_row_filter_nested() {
2097        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2098        let b = StructArray::from(vec![
2099            (
2100                Arc::new(Field::new("aa", DataType::Utf8, true)),
2101                Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2102            ),
2103            (
2104                Arc::new(Field::new("bb", DataType::Utf8, true)),
2105                Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2106            ),
2107        ]);
2108        let c = Int32Array::from_iter(0..6);
2109        let data = RecordBatch::try_from_iter([
2110            ("a", Arc::new(a) as ArrayRef),
2111            ("b", Arc::new(b) as ArrayRef),
2112            ("c", Arc::new(c) as ArrayRef),
2113        ])
2114        .unwrap();
2115
2116        let mut buf = Vec::with_capacity(1024);
2117        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2118        writer.write(&data).unwrap();
2119        writer.close().unwrap();
2120
2121        let data: Bytes = buf.into();
2122        let metadata = ParquetMetaDataReader::new()
2123            .parse_and_finish(&data)
2124            .unwrap();
2125        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2126
2127        let test = TestReader::new(data);
2128        let requests = test.requests.clone();
2129
2130        let a_scalar = StringArray::from_iter_values(["b"]);
2131        let a_filter = ArrowPredicateFn::new(
2132            ProjectionMask::leaves(&parquet_schema, vec![0]),
2133            move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2134        );
2135
2136        let b_scalar = StringArray::from_iter_values(["4"]);
2137        let b_filter = ArrowPredicateFn::new(
2138            ProjectionMask::leaves(&parquet_schema, vec![2]),
2139            move |batch| {
2140                let struct_array = batch
2142                    .column(0)
2143                    .as_any()
2144                    .downcast_ref::<StructArray>()
2145                    .unwrap();
2146                eq(struct_array.column(0), &Scalar::new(&b_scalar))
2147            },
2148        );
2149
2150        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2151
2152        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2153        let stream = ParquetRecordBatchStreamBuilder::new(test)
2154            .await
2155            .unwrap()
2156            .with_projection(mask.clone())
2157            .with_batch_size(1024)
2158            .with_row_filter(filter)
2159            .build()
2160            .unwrap();
2161
2162        let batches: Vec<_> = stream.try_collect().await.unwrap();
2163        assert_eq!(batches.len(), 1);
2164
2165        let batch = &batches[0];
2166        assert_eq!(batch.num_rows(), 1);
2167        assert_eq!(batch.num_columns(), 2);
2168
2169        let col = batch.column(0);
2170        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2171        assert_eq!(val, "b");
2172
2173        let col = batch.column(1);
2174        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2175        assert_eq!(val, 3);
2176
2177        assert_eq!(requests.lock().unwrap().len(), 3);
2182    }
2183
2184    #[tokio::test]
2185    #[allow(deprecated)]
2186    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2187        use tokio::fs::File;
2188        let testdata = arrow::util::test_util::parquet_test_data();
2189        let path = format!("{testdata}/alltypes_plain.parquet");
2190        let mut file = File::open(&path).await.unwrap();
2191        let file_size = file.metadata().await.unwrap().len();
2192        let mut metadata = ParquetMetaDataReader::new()
2193            .with_page_indexes(true)
2194            .load_and_finish(&mut file, file_size)
2195            .await
2196            .unwrap();
2197
2198        metadata.set_offset_index(Some(vec![]));
2199        let options = ArrowReaderOptions::new().with_page_index(true);
2200        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2201        let reader =
2202            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2203                .build()
2204                .unwrap();
2205
2206        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2207        assert_eq!(result.len(), 1);
2208    }
2209
2210    #[tokio::test]
2211    #[allow(deprecated)]
2212    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2213        use tokio::fs::File;
2214        let testdata = arrow::util::test_util::parquet_test_data();
2215        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2216        let mut file = File::open(&path).await.unwrap();
2217        let file_size = file.metadata().await.unwrap().len();
2218        let metadata = ParquetMetaDataReader::new()
2219            .with_page_indexes(true)
2220            .load_and_finish(&mut file, file_size)
2221            .await
2222            .unwrap();
2223
2224        let options = ArrowReaderOptions::new().with_page_index(true);
2225        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2226        let reader =
2227            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2228                .build()
2229                .unwrap();
2230
2231        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2232        assert_eq!(result.len(), 8);
2233    }
2234
2235    #[tokio::test]
2236    #[allow(deprecated)]
2237    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2238        use tempfile::TempDir;
2239        use tokio::fs::File;
2240        fn write_metadata_to_local_file(
2241            metadata: ParquetMetaData,
2242            file: impl AsRef<std::path::Path>,
2243        ) {
2244            use crate::file::metadata::ParquetMetaDataWriter;
2245            use std::fs::File;
2246            let file = File::create(file).unwrap();
2247            ParquetMetaDataWriter::new(file, &metadata)
2248                .finish()
2249                .unwrap()
2250        }
2251
2252        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2253            use std::fs::File;
2254            let file = File::open(file).unwrap();
2255            ParquetMetaDataReader::new()
2256                .with_page_indexes(true)
2257                .parse_and_finish(&file)
2258                .unwrap()
2259        }
2260
2261        let testdata = arrow::util::test_util::parquet_test_data();
2262        let path = format!("{testdata}/alltypes_plain.parquet");
2263        let mut file = File::open(&path).await.unwrap();
2264        let file_size = file.metadata().await.unwrap().len();
2265        let metadata = ParquetMetaDataReader::new()
2266            .with_page_indexes(true)
2267            .load_and_finish(&mut file, file_size)
2268            .await
2269            .unwrap();
2270
2271        let tempdir = TempDir::new().unwrap();
2272        let metadata_path = tempdir.path().join("thrift_metadata.dat");
2273        write_metadata_to_local_file(metadata, &metadata_path);
2274        let metadata = read_metadata_from_local_file(&metadata_path);
2275
2276        let options = ArrowReaderOptions::new().with_page_index(true);
2277        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2278        let reader =
2279            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2280                .build()
2281                .unwrap();
2282
2283        let result = reader.try_collect::<Vec<_>>().await.unwrap();
2285        assert_eq!(result.len(), 1);
2286    }
2287
2288    #[tokio::test]
2289    async fn test_cached_array_reader_sparse_offset_error() {
2290        use futures::TryStreamExt;
2291
2292        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2293        use arrow_array::{BooleanArray, RecordBatch};
2294
2295        let testdata = arrow::util::test_util::parquet_test_data();
2296        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2297        let data = Bytes::from(std::fs::read(path).unwrap());
2298
2299        let async_reader = TestReader::new(data);
2300
2301        let options = ArrowReaderOptions::new().with_page_index(true);
2303        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2304            .await
2305            .unwrap();
2306
2307        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2311
2312        let parquet_schema = builder.parquet_schema();
2316        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2317        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2318            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2319        });
2320        let filter = RowFilter::new(vec![Box::new(always_true)]);
2321
2322        let stream = builder
2325            .with_batch_size(8)
2326            .with_projection(proj)
2327            .with_row_selection(selection)
2328            .with_row_filter(filter)
2329            .build()
2330            .unwrap();
2331
2332        let _result: Vec<_> = stream.try_collect().await.unwrap();
2335    }
2336
2337    #[tokio::test]
2338    async fn test_predicate_cache_disabled() {
2339        let k = Int32Array::from_iter_values(0..10);
2340        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2341
2342        let mut buf = Vec::new();
2343        let props = WriterProperties::builder()
2345            .set_data_page_row_count_limit(1)
2346            .set_write_batch_size(1)
2347            .set_max_row_group_size(10)
2348            .set_write_page_header_statistics(true)
2349            .build();
2350        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2351        writer.write(&data).unwrap();
2352        writer.close().unwrap();
2353
2354        let data = Bytes::from(buf);
2355        let metadata = ParquetMetaDataReader::new()
2356            .with_page_index_policy(PageIndexPolicy::Required)
2357            .parse_and_finish(&data)
2358            .unwrap();
2359        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2360
2361        let build_filter = || {
2363            let scalar = Int32Array::from_iter_values([5]);
2364            let predicate = ArrowPredicateFn::new(
2365                ProjectionMask::leaves(&parquet_schema, vec![0]),
2366                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2367            );
2368            RowFilter::new(vec![Box::new(predicate)])
2369        };
2370
2371        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2373
2374        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2375        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2376
2377        let reader_with_cache = TestReader::new(data.clone());
2379        let requests_with_cache = reader_with_cache.requests.clone();
2380        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2381            reader_with_cache,
2382            reader_metadata.clone(),
2383        )
2384        .with_batch_size(1000)
2385        .with_row_selection(selection.clone())
2386        .with_row_filter(build_filter())
2387        .build()
2388        .unwrap();
2389        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2390
2391        let reader_without_cache = TestReader::new(data);
2393        let requests_without_cache = reader_without_cache.requests.clone();
2394        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2395            reader_without_cache,
2396            reader_metadata,
2397        )
2398        .with_batch_size(1000)
2399        .with_row_selection(selection)
2400        .with_row_filter(build_filter())
2401        .with_max_predicate_cache_size(0) .build()
2403        .unwrap();
2404        let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2405
2406        assert_eq!(batches_with_cache, batches_without_cache);
2407
2408        let requests_with_cache = requests_with_cache.lock().unwrap();
2409        let requests_without_cache = requests_without_cache.lock().unwrap();
2410
2411        assert_eq!(requests_with_cache.len(), 11);
2413        assert_eq!(requests_without_cache.len(), 2);
2414
2415        assert_eq!(
2417            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2418            433
2419        );
2420        assert_eq!(
2421            requests_without_cache
2422                .iter()
2423                .map(|r| r.len())
2424                .sum::<usize>(),
2425            92
2426        );
2427    }
2428}