1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42 ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46 RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
51use crate::bloom_filter::{
52 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
53};
54use crate::column::page::{PageIterator, PageReader};
55use crate::errors::{ParquetError, Result};
56use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
57use crate::file::page_index::offset_index::OffsetIndexMetaData;
58use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72pub trait AsyncFileReader: Send {
86 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91 async move {
92 let mut result = Vec::with_capacity(ranges.len());
93
94 for range in ranges.into_iter() {
95 let data = self.get_bytes(range).await?;
96 result.push(data);
97 }
98
99 Ok(result)
100 }
101 .boxed()
102 }
103
104 fn get_metadata<'a>(
121 &'a mut self,
122 options: Option<&'a ArrowReaderOptions>,
123 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129 self.as_mut().get_bytes(range)
130 }
131
132 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133 self.as_mut().get_byte_ranges(ranges)
134 }
135
136 fn get_metadata<'a>(
137 &'a mut self,
138 options: Option<&'a ArrowReaderOptions>,
139 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140 self.as_mut().get_metadata(options)
141 }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146 async move {
147 self.seek(SeekFrom::End(-(suffix as i64))).await?;
148 let mut buf = Vec::with_capacity(suffix);
149 self.take(suffix as _).read_to_end(&mut buf).await?;
150 Ok(buf.into())
151 }
152 .boxed()
153 }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158 async move {
159 self.seek(SeekFrom::Start(range.start)).await?;
160
161 let to_read = range.end - range.start;
162 let mut buffer = Vec::with_capacity(to_read.try_into()?);
163 let read = self.take(to_read).read_to_end(&mut buffer).await?;
164 if read as u64 != to_read {
165 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166 }
167
168 Ok(buffer.into())
169 }
170 .boxed()
171 }
172
173 fn get_metadata<'a>(
174 &'a mut self,
175 options: Option<&'a ArrowReaderOptions>,
176 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177 async move {
178 let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
179 PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
180 );
181
182 #[cfg(feature = "encryption")]
183 let metadata_reader = metadata_reader.with_decryption_properties(
184 options.and_then(|o| o.file_decryption_properties.as_ref()),
185 );
186
187 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
188 Ok(Arc::new(parquet_metadata))
189 }
190 .boxed()
191 }
192}
193
194impl ArrowReaderMetadata {
195 pub async fn load_async<T: AsyncFileReader>(
199 input: &mut T,
200 options: ArrowReaderOptions,
201 ) -> Result<Self> {
202 let metadata = input.get_metadata(Some(&options)).await?;
203 Self::try_new(metadata, options)
204 }
205}
206
207#[doc(hidden)]
208pub struct AsyncReader<T>(T);
213
214pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
227
228impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
229 pub async fn new(input: T) -> Result<Self> {
360 Self::new_with_options(input, Default::default()).await
361 }
362
363 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
366 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
367 Ok(Self::new_with_metadata(input, metadata))
368 }
369
370 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
416 Self::new_builder(AsyncReader(input), metadata)
417 }
418
419 pub async fn get_row_group_column_bloom_filter(
425 &mut self,
426 row_group_idx: usize,
427 column_idx: usize,
428 ) -> Result<Option<Sbbf>> {
429 let metadata = self.metadata.row_group(row_group_idx);
430 let column_metadata = metadata.column(column_idx);
431
432 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
433 offset
434 .try_into()
435 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
436 } else {
437 return Ok(None);
438 };
439
440 let buffer = match column_metadata.bloom_filter_length() {
441 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
442 None => self
443 .input
444 .0
445 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
446 }
447 .await?;
448
449 let (header, bitset_offset) =
450 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
451
452 match header.algorithm {
453 BloomFilterAlgorithm::BLOCK => {
454 }
456 }
457 match header.compression {
458 BloomFilterCompression::UNCOMPRESSED => {
459 }
461 }
462 match header.hash {
463 BloomFilterHash::XXHASH => {
464 }
466 }
467
468 let bitset = match column_metadata.bloom_filter_length() {
469 Some(_) => buffer.slice(
470 (TryInto::<usize>::try_into(bitset_offset).unwrap()
471 - TryInto::<usize>::try_into(offset).unwrap())..,
472 ),
473 None => {
474 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
475 ParquetError::General("Bloom filter length is invalid".to_string())
476 })?;
477 self.input
478 .0
479 .get_bytes(bitset_offset..bitset_offset + bitset_length)
480 .await?
481 }
482 };
483 Ok(Some(Sbbf::new(&bitset)))
484 }
485
486 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
490 let num_row_groups = self.metadata.row_groups().len();
491
492 let row_groups = match self.row_groups {
493 Some(row_groups) => {
494 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
495 return Err(general_err!(
496 "row group {} out of bounds 0..{}",
497 col,
498 num_row_groups
499 ));
500 }
501 row_groups.into()
502 }
503 None => (0..self.metadata.row_groups().len()).collect(),
504 };
505
506 let batch_size = self
508 .batch_size
509 .min(self.metadata.file_metadata().num_rows() as usize);
510 let reader_factory = ReaderFactory {
511 input: self.input.0,
512 filter: self.filter,
513 metadata: self.metadata.clone(),
514 fields: self.fields,
515 limit: self.limit,
516 offset: self.offset,
517 metrics: self.metrics,
518 max_predicate_cache_size: self.max_predicate_cache_size,
519 };
520
521 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
524 Some(DataType::Struct(fields)) => {
525 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
526 }
527 None => Fields::empty(),
528 _ => unreachable!("Must be Struct for root type"),
529 };
530 let schema = Arc::new(Schema::new(projected_fields));
531
532 Ok(ParquetRecordBatchStream {
533 metadata: self.metadata,
534 batch_size,
535 row_groups,
536 projection: self.projection,
537 selection: self.selection,
538 schema,
539 reader_factory: Some(reader_factory),
540 state: StreamState::Init,
541 })
542 }
543}
544
545type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
550
551struct ReaderFactory<T> {
554 metadata: Arc<ParquetMetaData>,
555
556 fields: Option<Arc<ParquetField>>,
558
559 input: T,
560
561 filter: Option<RowFilter>,
563
564 limit: Option<usize>,
566
567 offset: Option<usize>,
569
570 metrics: ArrowReaderMetrics,
572
573 max_predicate_cache_size: usize,
575}
576
577impl<T> ReaderFactory<T>
578where
579 T: AsyncFileReader + Send,
580{
581 async fn read_row_group(
587 mut self,
588 row_group_idx: usize,
589 selection: Option<RowSelection>,
590 projection: ProjectionMask,
591 batch_size: usize,
592 ) -> ReadResult<T> {
593 let meta = self.metadata.row_group(row_group_idx);
596 let offset_index = self
597 .metadata
598 .offset_index()
599 .filter(|index| !index.is_empty())
601 .map(|x| x[row_group_idx].as_slice());
602
603 let cache_projection = match self.compute_cache_projection(&projection) {
605 Some(projection) => projection,
606 None => ProjectionMask::none(meta.columns().len()),
607 };
608 let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
609 batch_size,
610 self.max_predicate_cache_size,
611 )));
612
613 let mut row_group = InMemoryRowGroup {
614 row_count: meta.num_rows() as usize,
616 column_chunks: vec![None; meta.columns().len()],
617 offset_index,
618 row_group_idx,
619 metadata: self.metadata.as_ref(),
620 };
621
622 let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
623
624 let filter = self.filter.as_mut();
625 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
626
627 if let Some(filter) = filter {
629 let cache_options = cache_options_builder.clone().producer();
630
631 for predicate in filter.predicates.iter_mut() {
632 if !plan_builder.selects_any() {
633 return Ok((self, None)); }
635
636 let selection = plan_builder.selection();
638 let cache_mask = Some(&cache_projection);
640 row_group
641 .fetch(
642 &mut self.input,
643 predicate.projection(),
644 selection,
645 batch_size,
646 cache_mask,
647 )
648 .await?;
649
650 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
651 .with_cache_options(Some(&cache_options))
652 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
653
654 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
655 }
656 }
657
658 let rows_before = plan_builder
660 .num_rows_selected()
661 .unwrap_or(row_group.row_count);
662
663 if rows_before == 0 {
664 return Ok((self, None)); }
666
667 let plan_builder = plan_builder
669 .limited(row_group.row_count)
670 .with_offset(self.offset)
671 .with_limit(self.limit)
672 .build_limited();
673
674 let rows_after = plan_builder
675 .num_rows_selected()
676 .unwrap_or(row_group.row_count);
677
678 if let Some(offset) = &mut self.offset {
680 *offset = offset.saturating_sub(rows_before - rows_after)
683 }
684
685 if rows_after == 0 {
686 return Ok((self, None)); }
688
689 if let Some(limit) = &mut self.limit {
690 *limit -= rows_after;
691 }
692 row_group
694 .fetch(
696 &mut self.input,
697 &projection,
698 plan_builder.selection(),
699 batch_size,
700 None,
701 )
702 .await?;
703
704 let plan = plan_builder.build();
705
706 let cache_options = cache_options_builder.consumer();
707 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
708 .with_cache_options(Some(&cache_options))
709 .build_array_reader(self.fields.as_deref(), &projection)?;
710
711 let reader = ParquetRecordBatchReader::new(array_reader, plan);
712
713 Ok((self, Some(reader)))
714 }
715
716 fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
718 if self.max_predicate_cache_size == 0 {
720 return None;
721 }
722
723 let filters = self.filter.as_ref()?;
724 let mut cache_projection = filters.predicates.first()?.projection().clone();
725 for predicate in filters.predicates.iter() {
726 cache_projection.union(predicate.projection());
727 }
728 cache_projection.intersect(projection);
729 self.exclude_nested_columns_from_cache(&cache_projection)
730 }
731
732 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
734 let schema = self.metadata.file_metadata().schema_descr();
735 let num_leaves = schema.num_columns();
736
737 let num_roots = schema.root_schema().get_fields().len();
739 let mut root_leaf_counts = vec![0usize; num_roots];
740 for leaf_idx in 0..num_leaves {
741 let root_idx = schema.get_column_root_idx(leaf_idx);
742 root_leaf_counts[root_idx] += 1;
743 }
744
745 let mut included_leaves = Vec::new();
747 for leaf_idx in 0..num_leaves {
748 if mask.leaf_included(leaf_idx) {
749 let root_idx = schema.get_column_root_idx(leaf_idx);
750 if root_leaf_counts[root_idx] == 1 {
751 included_leaves.push(leaf_idx);
752 }
753 }
754 }
755
756 if included_leaves.is_empty() {
757 None
758 } else {
759 Some(ProjectionMask::leaves(schema, included_leaves))
760 }
761 }
762}
763
764enum StreamState<T> {
765 Init,
767 Decoding(ParquetRecordBatchReader),
769 Reading(BoxFuture<'static, ReadResult<T>>),
771 Error,
773}
774
775impl<T> std::fmt::Debug for StreamState<T> {
776 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
777 match self {
778 StreamState::Init => write!(f, "StreamState::Init"),
779 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
780 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
781 StreamState::Error => write!(f, "StreamState::Error"),
782 }
783 }
784}
785
786pub struct ParquetRecordBatchStream<T> {
804 metadata: Arc<ParquetMetaData>,
805
806 schema: SchemaRef,
807
808 row_groups: VecDeque<usize>,
809
810 projection: ProjectionMask,
811
812 batch_size: usize,
813
814 selection: Option<RowSelection>,
815
816 reader_factory: Option<ReaderFactory<T>>,
818
819 state: StreamState<T>,
820}
821
822impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
823 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
824 f.debug_struct("ParquetRecordBatchStream")
825 .field("metadata", &self.metadata)
826 .field("schema", &self.schema)
827 .field("batch_size", &self.batch_size)
828 .field("projection", &self.projection)
829 .field("state", &self.state)
830 .finish()
831 }
832}
833
834impl<T> ParquetRecordBatchStream<T> {
835 pub fn schema(&self) -> &SchemaRef {
840 &self.schema
841 }
842}
843
844impl<T> ParquetRecordBatchStream<T>
845where
846 T: AsyncFileReader + Unpin + Send + 'static,
847{
848 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
862 loop {
863 match &mut self.state {
864 StreamState::Decoding(_) | StreamState::Reading(_) => {
865 return Err(ParquetError::General(
866 "Cannot combine the use of next_row_group with the Stream API".to_string(),
867 ))
868 }
869 StreamState::Init => {
870 let row_group_idx = match self.row_groups.pop_front() {
871 Some(idx) => idx,
872 None => return Ok(None),
873 };
874
875 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
876
877 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
878
879 let reader_factory = self.reader_factory.take().expect("lost reader factory");
880
881 let (reader_factory, maybe_reader) = reader_factory
882 .read_row_group(
883 row_group_idx,
884 selection,
885 self.projection.clone(),
886 self.batch_size,
887 )
888 .await
889 .inspect_err(|_| {
890 self.state = StreamState::Error;
891 })?;
892 self.reader_factory = Some(reader_factory);
893
894 if let Some(reader) = maybe_reader {
895 return Ok(Some(reader));
896 } else {
897 continue;
899 }
900 }
901 StreamState::Error => return Ok(None), }
903 }
904 }
905}
906
907impl<T> Stream for ParquetRecordBatchStream<T>
908where
909 T: AsyncFileReader + Unpin + Send + 'static,
910{
911 type Item = Result<RecordBatch>;
912
913 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
914 loop {
915 match &mut self.state {
916 StreamState::Decoding(batch_reader) => match batch_reader.next() {
917 Some(Ok(batch)) => {
918 return Poll::Ready(Some(Ok(batch)));
919 }
920 Some(Err(e)) => {
921 self.state = StreamState::Error;
922 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
923 }
924 None => self.state = StreamState::Init,
925 },
926 StreamState::Init => {
927 let row_group_idx = match self.row_groups.pop_front() {
928 Some(idx) => idx,
929 None => return Poll::Ready(None),
930 };
931
932 let reader = self.reader_factory.take().expect("lost reader factory");
933
934 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
935
936 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
937
938 let fut = reader
939 .read_row_group(
940 row_group_idx,
941 selection,
942 self.projection.clone(),
943 self.batch_size,
944 )
945 .boxed();
946
947 self.state = StreamState::Reading(fut)
948 }
949 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
950 Ok((reader_factory, maybe_reader)) => {
951 self.reader_factory = Some(reader_factory);
952 match maybe_reader {
953 Some(reader) => self.state = StreamState::Decoding(reader),
955 None => self.state = StreamState::Init,
957 }
958 }
959 Err(e) => {
960 self.state = StreamState::Error;
961 return Poll::Ready(Some(Err(e)));
962 }
963 },
964 StreamState::Error => return Poll::Ready(None), }
966 }
967 }
968}
969
970struct InMemoryRowGroup<'a> {
972 offset_index: Option<&'a [OffsetIndexMetaData]>,
973 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
975 row_count: usize,
976 row_group_idx: usize,
977 metadata: &'a ParquetMetaData,
978}
979
980impl InMemoryRowGroup<'_> {
981 async fn fetch<T: AsyncFileReader + Send>(
987 &mut self,
988 input: &mut T,
989 projection: &ProjectionMask,
990 selection: Option<&RowSelection>,
991 batch_size: usize,
992 cache_mask: Option<&ProjectionMask>,
993 ) -> Result<()> {
994 let metadata = self.metadata.row_group(self.row_group_idx);
995 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
996 let expanded_selection =
997 selection.expand_to_batch_boundaries(batch_size, self.row_count);
998 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
1001
1002 let fetch_ranges = self
1003 .column_chunks
1004 .iter()
1005 .zip(metadata.columns())
1006 .enumerate()
1007 .filter(|&(idx, (chunk, _chunk_meta))| {
1008 chunk.is_none() && projection.leaf_included(idx)
1009 })
1010 .flat_map(|(idx, (_chunk, chunk_meta))| {
1011 let mut ranges: Vec<Range<u64>> = vec![];
1014 let (start, _len) = chunk_meta.byte_range();
1015 match offset_index[idx].page_locations.first() {
1016 Some(first) if first.offset as u64 != start => {
1017 ranges.push(start..first.offset as u64);
1018 }
1019 _ => (),
1020 }
1021
1022 let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1024 if use_expanded {
1025 ranges.extend(
1026 expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1027 );
1028 } else {
1029 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1030 }
1031 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1032
1033 ranges
1034 })
1035 .collect();
1036
1037 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1038 let mut page_start_offsets = page_start_offsets.into_iter();
1039
1040 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1041 if chunk.is_some() || !projection.leaf_included(idx) {
1042 continue;
1043 }
1044
1045 if let Some(offsets) = page_start_offsets.next() {
1046 let mut chunks = Vec::with_capacity(offsets.len());
1047 for _ in 0..offsets.len() {
1048 chunks.push(chunk_data.next().unwrap());
1049 }
1050
1051 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1052 length: metadata.column(idx).byte_range().1 as usize,
1053 data: offsets
1054 .into_iter()
1055 .map(|x| x as usize)
1056 .zip(chunks.into_iter())
1057 .collect(),
1058 }))
1059 }
1060 }
1061 } else {
1062 let fetch_ranges = self
1063 .column_chunks
1064 .iter()
1065 .enumerate()
1066 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1067 .map(|(idx, _chunk)| {
1068 let column = metadata.column(idx);
1069 let (start, length) = column.byte_range();
1070 start..(start + length)
1071 })
1072 .collect();
1073
1074 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1075
1076 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1077 if chunk.is_some() || !projection.leaf_included(idx) {
1078 continue;
1079 }
1080
1081 if let Some(data) = chunk_data.next() {
1082 *chunk = Some(Arc::new(ColumnChunkData::Dense {
1083 offset: metadata.column(idx).byte_range().0 as usize,
1084 data,
1085 }));
1086 }
1087 }
1088 }
1089
1090 Ok(())
1091 }
1092}
1093
1094impl RowGroups for InMemoryRowGroup<'_> {
1095 fn num_rows(&self) -> usize {
1096 self.row_count
1097 }
1098
1099 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1101 match &self.column_chunks[i] {
1102 None => Err(ParquetError::General(format!(
1103 "Invalid column index {i}, column was not fetched"
1104 ))),
1105 Some(data) => {
1106 let page_locations = self
1107 .offset_index
1108 .filter(|index| !index.is_empty())
1110 .map(|index| index[i].page_locations.clone());
1111 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1112 let page_reader = SerializedPageReader::new(
1113 data.clone(),
1114 column_chunk_metadata,
1115 self.row_count,
1116 page_locations,
1117 )?;
1118 let page_reader = page_reader.add_crypto_context(
1119 self.row_group_idx,
1120 i,
1121 self.metadata,
1122 column_chunk_metadata,
1123 )?;
1124
1125 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1126
1127 Ok(Box::new(ColumnChunkIterator {
1128 reader: Some(Ok(page_reader)),
1129 }))
1130 }
1131 }
1132 }
1133}
1134
1135#[derive(Clone)]
1137enum ColumnChunkData {
1138 Sparse {
1140 length: usize,
1142 data: Vec<(usize, Bytes)>,
1147 },
1148 Dense { offset: usize, data: Bytes },
1150}
1151
1152impl ColumnChunkData {
1153 fn get(&self, start: u64) -> Result<Bytes> {
1155 match &self {
1156 ColumnChunkData::Sparse { data, .. } => data
1157 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1158 .map(|idx| data[idx].1.clone())
1159 .map_err(|_| {
1160 ParquetError::General(format!(
1161 "Invalid offset in sparse column chunk data: {start}"
1162 ))
1163 }),
1164 ColumnChunkData::Dense { offset, data } => {
1165 let start = start as usize - *offset;
1166 Ok(data.slice(start..))
1167 }
1168 }
1169 }
1170}
1171
1172impl Length for ColumnChunkData {
1173 fn len(&self) -> u64 {
1175 match &self {
1176 ColumnChunkData::Sparse { length, .. } => *length as u64,
1177 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1178 }
1179 }
1180}
1181
1182impl ChunkReader for ColumnChunkData {
1183 type T = bytes::buf::Reader<Bytes>;
1184
1185 fn get_read(&self, start: u64) -> Result<Self::T> {
1186 Ok(self.get(start)?.reader())
1187 }
1188
1189 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1190 Ok(self.get(start)?.slice(..length))
1191 }
1192}
1193
1194struct ColumnChunkIterator {
1196 reader: Option<Result<Box<dyn PageReader>>>,
1197}
1198
1199impl Iterator for ColumnChunkIterator {
1200 type Item = Result<Box<dyn PageReader>>;
1201
1202 fn next(&mut self) -> Option<Self::Item> {
1203 self.reader.take()
1204 }
1205}
1206
1207impl PageIterator for ColumnChunkIterator {}
1208
1209#[cfg(test)]
1210mod tests {
1211 use super::*;
1212 use crate::arrow::arrow_reader::{
1213 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1214 };
1215 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1216 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1217 use crate::arrow::ArrowWriter;
1218 use crate::file::metadata::ParquetMetaDataReader;
1219 use crate::file::properties::WriterProperties;
1220 use arrow::compute::kernels::cmp::eq;
1221 use arrow::error::Result as ArrowResult;
1222 use arrow_array::builder::{ListBuilder, StringBuilder};
1223 use arrow_array::cast::AsArray;
1224 use arrow_array::types::Int32Type;
1225 use arrow_array::{
1226 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1227 StructArray, UInt64Array,
1228 };
1229 use arrow_schema::{DataType, Field, Schema};
1230 use futures::{StreamExt, TryStreamExt};
1231 use rand::{rng, Rng};
1232 use std::collections::HashMap;
1233 use std::sync::{Arc, Mutex};
1234 use tempfile::tempfile;
1235
1236 #[derive(Clone)]
1237 struct TestReader {
1238 data: Bytes,
1239 metadata: Option<Arc<ParquetMetaData>>,
1240 requests: Arc<Mutex<Vec<Range<usize>>>>,
1241 }
1242
1243 impl TestReader {
1244 fn new(data: Bytes) -> Self {
1245 Self {
1246 data,
1247 metadata: Default::default(),
1248 requests: Default::default(),
1249 }
1250 }
1251 }
1252
1253 impl AsyncFileReader for TestReader {
1254 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1255 let range = range.clone();
1256 self.requests
1257 .lock()
1258 .unwrap()
1259 .push(range.start as usize..range.end as usize);
1260 futures::future::ready(Ok(self
1261 .data
1262 .slice(range.start as usize..range.end as usize)))
1263 .boxed()
1264 }
1265
1266 fn get_metadata<'a>(
1267 &'a mut self,
1268 options: Option<&'a ArrowReaderOptions>,
1269 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1270 let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1271 PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1272 );
1273 self.metadata = Some(Arc::new(
1274 metadata_reader.parse_and_finish(&self.data).unwrap(),
1275 ));
1276 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1277 }
1278 }
1279
1280 #[tokio::test]
1281 async fn test_async_reader() {
1282 let testdata = arrow::util::test_util::parquet_test_data();
1283 let path = format!("{testdata}/alltypes_plain.parquet");
1284 let data = Bytes::from(std::fs::read(path).unwrap());
1285
1286 let async_reader = TestReader::new(data.clone());
1287
1288 let requests = async_reader.requests.clone();
1289 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1290 .await
1291 .unwrap();
1292
1293 let metadata = builder.metadata().clone();
1294 assert_eq!(metadata.num_row_groups(), 1);
1295
1296 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1297 let stream = builder
1298 .with_projection(mask.clone())
1299 .with_batch_size(1024)
1300 .build()
1301 .unwrap();
1302
1303 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1304
1305 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1306 .unwrap()
1307 .with_projection(mask)
1308 .with_batch_size(104)
1309 .build()
1310 .unwrap()
1311 .collect::<ArrowResult<Vec<_>>>()
1312 .unwrap();
1313
1314 assert_eq!(async_batches, sync_batches);
1315
1316 let requests = requests.lock().unwrap();
1317 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1318 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1319
1320 assert_eq!(
1321 &requests[..],
1322 &[
1323 offset_1 as usize..(offset_1 + length_1) as usize,
1324 offset_2 as usize..(offset_2 + length_2) as usize
1325 ]
1326 );
1327 }
1328
1329 #[tokio::test]
1330 async fn test_async_reader_with_next_row_group() {
1331 let testdata = arrow::util::test_util::parquet_test_data();
1332 let path = format!("{testdata}/alltypes_plain.parquet");
1333 let data = Bytes::from(std::fs::read(path).unwrap());
1334
1335 let async_reader = TestReader::new(data.clone());
1336
1337 let requests = async_reader.requests.clone();
1338 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1339 .await
1340 .unwrap();
1341
1342 let metadata = builder.metadata().clone();
1343 assert_eq!(metadata.num_row_groups(), 1);
1344
1345 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1346 let mut stream = builder
1347 .with_projection(mask.clone())
1348 .with_batch_size(1024)
1349 .build()
1350 .unwrap();
1351
1352 let mut readers = vec![];
1353 while let Some(reader) = stream.next_row_group().await.unwrap() {
1354 readers.push(reader);
1355 }
1356
1357 let async_batches: Vec<_> = readers
1358 .into_iter()
1359 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1360 .collect();
1361
1362 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1363 .unwrap()
1364 .with_projection(mask)
1365 .with_batch_size(104)
1366 .build()
1367 .unwrap()
1368 .collect::<ArrowResult<Vec<_>>>()
1369 .unwrap();
1370
1371 assert_eq!(async_batches, sync_batches);
1372
1373 let requests = requests.lock().unwrap();
1374 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1375 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1376
1377 assert_eq!(
1378 &requests[..],
1379 &[
1380 offset_1 as usize..(offset_1 + length_1) as usize,
1381 offset_2 as usize..(offset_2 + length_2) as usize
1382 ]
1383 );
1384 }
1385
1386 #[tokio::test]
1387 async fn test_async_reader_with_index() {
1388 let testdata = arrow::util::test_util::parquet_test_data();
1389 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1390 let data = Bytes::from(std::fs::read(path).unwrap());
1391
1392 let async_reader = TestReader::new(data.clone());
1393
1394 let options = ArrowReaderOptions::new().with_page_index(true);
1395 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1396 .await
1397 .unwrap();
1398
1399 let metadata_with_index = builder.metadata();
1401 assert_eq!(metadata_with_index.num_row_groups(), 1);
1402
1403 let offset_index = metadata_with_index.offset_index().unwrap();
1405 let column_index = metadata_with_index.column_index().unwrap();
1406
1407 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1408 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1409
1410 let num_columns = metadata_with_index
1411 .file_metadata()
1412 .schema_descr()
1413 .num_columns();
1414
1415 offset_index
1417 .iter()
1418 .for_each(|x| assert_eq!(x.len(), num_columns));
1419 column_index
1420 .iter()
1421 .for_each(|x| assert_eq!(x.len(), num_columns));
1422
1423 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1424 let stream = builder
1425 .with_projection(mask.clone())
1426 .with_batch_size(1024)
1427 .build()
1428 .unwrap();
1429
1430 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1431
1432 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1433 .unwrap()
1434 .with_projection(mask)
1435 .with_batch_size(1024)
1436 .build()
1437 .unwrap()
1438 .collect::<ArrowResult<Vec<_>>>()
1439 .unwrap();
1440
1441 assert_eq!(async_batches, sync_batches);
1442 }
1443
1444 #[tokio::test]
1445 async fn test_async_reader_with_limit() {
1446 let testdata = arrow::util::test_util::parquet_test_data();
1447 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1448 let data = Bytes::from(std::fs::read(path).unwrap());
1449
1450 let metadata = ParquetMetaDataReader::new()
1451 .parse_and_finish(&data)
1452 .unwrap();
1453 let metadata = Arc::new(metadata);
1454
1455 assert_eq!(metadata.num_row_groups(), 1);
1456
1457 let async_reader = TestReader::new(data.clone());
1458
1459 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1460 .await
1461 .unwrap();
1462
1463 assert_eq!(builder.metadata().num_row_groups(), 1);
1464
1465 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1466 let stream = builder
1467 .with_projection(mask.clone())
1468 .with_batch_size(1024)
1469 .with_limit(1)
1470 .build()
1471 .unwrap();
1472
1473 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1474
1475 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1476 .unwrap()
1477 .with_projection(mask)
1478 .with_batch_size(1024)
1479 .with_limit(1)
1480 .build()
1481 .unwrap()
1482 .collect::<ArrowResult<Vec<_>>>()
1483 .unwrap();
1484
1485 assert_eq!(async_batches, sync_batches);
1486 }
1487
1488 #[tokio::test]
1489 async fn test_async_reader_skip_pages() {
1490 let testdata = arrow::util::test_util::parquet_test_data();
1491 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1492 let data = Bytes::from(std::fs::read(path).unwrap());
1493
1494 let async_reader = TestReader::new(data.clone());
1495
1496 let options = ArrowReaderOptions::new().with_page_index(true);
1497 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1498 .await
1499 .unwrap();
1500
1501 assert_eq!(builder.metadata().num_row_groups(), 1);
1502
1503 let selection = RowSelection::from(vec![
1504 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1513
1514 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1515
1516 let stream = builder
1517 .with_projection(mask.clone())
1518 .with_row_selection(selection.clone())
1519 .build()
1520 .expect("building stream");
1521
1522 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1523
1524 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1525 .unwrap()
1526 .with_projection(mask)
1527 .with_batch_size(1024)
1528 .with_row_selection(selection)
1529 .build()
1530 .unwrap()
1531 .collect::<ArrowResult<Vec<_>>>()
1532 .unwrap();
1533
1534 assert_eq!(async_batches, sync_batches);
1535 }
1536
1537 #[tokio::test]
1538 async fn test_fuzz_async_reader_selection() {
1539 let testdata = arrow::util::test_util::parquet_test_data();
1540 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1541 let data = Bytes::from(std::fs::read(path).unwrap());
1542
1543 let mut rand = rng();
1544
1545 for _ in 0..100 {
1546 let mut expected_rows = 0;
1547 let mut total_rows = 0;
1548 let mut skip = false;
1549 let mut selectors = vec![];
1550
1551 while total_rows < 7300 {
1552 let row_count: usize = rand.random_range(1..100);
1553
1554 let row_count = row_count.min(7300 - total_rows);
1555
1556 selectors.push(RowSelector { row_count, skip });
1557
1558 total_rows += row_count;
1559 if !skip {
1560 expected_rows += row_count;
1561 }
1562
1563 skip = !skip;
1564 }
1565
1566 let selection = RowSelection::from(selectors);
1567
1568 let async_reader = TestReader::new(data.clone());
1569
1570 let options = ArrowReaderOptions::new().with_page_index(true);
1571 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1572 .await
1573 .unwrap();
1574
1575 assert_eq!(builder.metadata().num_row_groups(), 1);
1576
1577 let col_idx: usize = rand.random_range(0..13);
1578 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1579
1580 let stream = builder
1581 .with_projection(mask.clone())
1582 .with_row_selection(selection.clone())
1583 .build()
1584 .expect("building stream");
1585
1586 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1587
1588 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1589
1590 assert_eq!(actual_rows, expected_rows);
1591 }
1592 }
1593
1594 #[tokio::test]
1595 async fn test_async_reader_zero_row_selector() {
1596 let testdata = arrow::util::test_util::parquet_test_data();
1598 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1599 let data = Bytes::from(std::fs::read(path).unwrap());
1600
1601 let mut rand = rng();
1602
1603 let mut expected_rows = 0;
1604 let mut total_rows = 0;
1605 let mut skip = false;
1606 let mut selectors = vec![];
1607
1608 selectors.push(RowSelector {
1609 row_count: 0,
1610 skip: false,
1611 });
1612
1613 while total_rows < 7300 {
1614 let row_count: usize = rand.random_range(1..100);
1615
1616 let row_count = row_count.min(7300 - total_rows);
1617
1618 selectors.push(RowSelector { row_count, skip });
1619
1620 total_rows += row_count;
1621 if !skip {
1622 expected_rows += row_count;
1623 }
1624
1625 skip = !skip;
1626 }
1627
1628 let selection = RowSelection::from(selectors);
1629
1630 let async_reader = TestReader::new(data.clone());
1631
1632 let options = ArrowReaderOptions::new().with_page_index(true);
1633 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1634 .await
1635 .unwrap();
1636
1637 assert_eq!(builder.metadata().num_row_groups(), 1);
1638
1639 let col_idx: usize = rand.random_range(0..13);
1640 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1641
1642 let stream = builder
1643 .with_projection(mask.clone())
1644 .with_row_selection(selection.clone())
1645 .build()
1646 .expect("building stream");
1647
1648 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1649
1650 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1651
1652 assert_eq!(actual_rows, expected_rows);
1653 }
1654
1655 #[tokio::test]
1656 async fn test_row_filter() {
1657 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1658 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1659 let data = RecordBatch::try_from_iter([
1660 ("a", Arc::new(a) as ArrayRef),
1661 ("b", Arc::new(b) as ArrayRef),
1662 ])
1663 .unwrap();
1664
1665 let mut buf = Vec::with_capacity(1024);
1666 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1667 writer.write(&data).unwrap();
1668 writer.close().unwrap();
1669
1670 let data: Bytes = buf.into();
1671 let metadata = ParquetMetaDataReader::new()
1672 .parse_and_finish(&data)
1673 .unwrap();
1674 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1675
1676 let test = TestReader::new(data);
1677 let requests = test.requests.clone();
1678
1679 let a_scalar = StringArray::from_iter_values(["b"]);
1680 let a_filter = ArrowPredicateFn::new(
1681 ProjectionMask::leaves(&parquet_schema, vec![0]),
1682 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1683 );
1684
1685 let filter = RowFilter::new(vec![Box::new(a_filter)]);
1686
1687 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1688 let stream = ParquetRecordBatchStreamBuilder::new(test)
1689 .await
1690 .unwrap()
1691 .with_projection(mask.clone())
1692 .with_batch_size(1024)
1693 .with_row_filter(filter)
1694 .build()
1695 .unwrap();
1696
1697 let batches: Vec<_> = stream.try_collect().await.unwrap();
1698 assert_eq!(batches.len(), 1);
1699
1700 let batch = &batches[0];
1701 assert_eq!(batch.num_columns(), 2);
1702
1703 assert_eq!(
1705 batch.column(0).as_ref(),
1706 &StringArray::from_iter_values(["b", "b", "b"])
1707 );
1708 assert_eq!(
1709 batch.column(1).as_ref(),
1710 &StringArray::from_iter_values(["2", "3", "4"])
1711 );
1712
1713 assert_eq!(requests.lock().unwrap().len(), 2);
1717 }
1718
1719 #[tokio::test]
1720 async fn test_two_row_filters() {
1721 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1722 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1723 let c = Int32Array::from_iter(0..6);
1724 let data = RecordBatch::try_from_iter([
1725 ("a", Arc::new(a) as ArrayRef),
1726 ("b", Arc::new(b) as ArrayRef),
1727 ("c", Arc::new(c) as ArrayRef),
1728 ])
1729 .unwrap();
1730
1731 let mut buf = Vec::with_capacity(1024);
1732 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1733 writer.write(&data).unwrap();
1734 writer.close().unwrap();
1735
1736 let data: Bytes = buf.into();
1737 let metadata = ParquetMetaDataReader::new()
1738 .parse_and_finish(&data)
1739 .unwrap();
1740 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1741
1742 let test = TestReader::new(data);
1743 let requests = test.requests.clone();
1744
1745 let a_scalar = StringArray::from_iter_values(["b"]);
1746 let a_filter = ArrowPredicateFn::new(
1747 ProjectionMask::leaves(&parquet_schema, vec![0]),
1748 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1749 );
1750
1751 let b_scalar = StringArray::from_iter_values(["4"]);
1752 let b_filter = ArrowPredicateFn::new(
1753 ProjectionMask::leaves(&parquet_schema, vec![1]),
1754 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1755 );
1756
1757 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1758
1759 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1760 let stream = ParquetRecordBatchStreamBuilder::new(test)
1761 .await
1762 .unwrap()
1763 .with_projection(mask.clone())
1764 .with_batch_size(1024)
1765 .with_row_filter(filter)
1766 .build()
1767 .unwrap();
1768
1769 let batches: Vec<_> = stream.try_collect().await.unwrap();
1770 assert_eq!(batches.len(), 1);
1771
1772 let batch = &batches[0];
1773 assert_eq!(batch.num_rows(), 1);
1774 assert_eq!(batch.num_columns(), 2);
1775
1776 let col = batch.column(0);
1777 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1778 assert_eq!(val, "b");
1779
1780 let col = batch.column(1);
1781 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1782 assert_eq!(val, 3);
1783
1784 assert_eq!(requests.lock().unwrap().len(), 3);
1789 }
1790
1791 #[tokio::test]
1792 async fn test_limit_multiple_row_groups() {
1793 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1794 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1795 let c = Int32Array::from_iter(0..6);
1796 let data = RecordBatch::try_from_iter([
1797 ("a", Arc::new(a) as ArrayRef),
1798 ("b", Arc::new(b) as ArrayRef),
1799 ("c", Arc::new(c) as ArrayRef),
1800 ])
1801 .unwrap();
1802
1803 let mut buf = Vec::with_capacity(1024);
1804 let props = WriterProperties::builder()
1805 .set_max_row_group_size(3)
1806 .build();
1807 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1808 writer.write(&data).unwrap();
1809 writer.close().unwrap();
1810
1811 let data: Bytes = buf.into();
1812 let metadata = ParquetMetaDataReader::new()
1813 .parse_and_finish(&data)
1814 .unwrap();
1815
1816 assert_eq!(metadata.num_row_groups(), 2);
1817
1818 let test = TestReader::new(data);
1819
1820 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1821 .await
1822 .unwrap()
1823 .with_batch_size(1024)
1824 .with_limit(4)
1825 .build()
1826 .unwrap();
1827
1828 let batches: Vec<_> = stream.try_collect().await.unwrap();
1829 assert_eq!(batches.len(), 2);
1831
1832 let batch = &batches[0];
1833 assert_eq!(batch.num_rows(), 3);
1835 assert_eq!(batch.num_columns(), 3);
1836 let col2 = batch.column(2).as_primitive::<Int32Type>();
1837 assert_eq!(col2.values(), &[0, 1, 2]);
1838
1839 let batch = &batches[1];
1840 assert_eq!(batch.num_rows(), 1);
1842 assert_eq!(batch.num_columns(), 3);
1843 let col2 = batch.column(2).as_primitive::<Int32Type>();
1844 assert_eq!(col2.values(), &[3]);
1845
1846 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1847 .await
1848 .unwrap()
1849 .with_offset(2)
1850 .with_limit(3)
1851 .build()
1852 .unwrap();
1853
1854 let batches: Vec<_> = stream.try_collect().await.unwrap();
1855 assert_eq!(batches.len(), 2);
1857
1858 let batch = &batches[0];
1859 assert_eq!(batch.num_rows(), 1);
1861 assert_eq!(batch.num_columns(), 3);
1862 let col2 = batch.column(2).as_primitive::<Int32Type>();
1863 assert_eq!(col2.values(), &[2]);
1864
1865 let batch = &batches[1];
1866 assert_eq!(batch.num_rows(), 2);
1868 assert_eq!(batch.num_columns(), 3);
1869 let col2 = batch.column(2).as_primitive::<Int32Type>();
1870 assert_eq!(col2.values(), &[3, 4]);
1871
1872 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1873 .await
1874 .unwrap()
1875 .with_offset(4)
1876 .with_limit(20)
1877 .build()
1878 .unwrap();
1879
1880 let batches: Vec<_> = stream.try_collect().await.unwrap();
1881 assert_eq!(batches.len(), 1);
1883
1884 let batch = &batches[0];
1885 assert_eq!(batch.num_rows(), 2);
1887 assert_eq!(batch.num_columns(), 3);
1888 let col2 = batch.column(2).as_primitive::<Int32Type>();
1889 assert_eq!(col2.values(), &[4, 5]);
1890 }
1891
1892 #[tokio::test]
1893 async fn test_row_filter_with_index() {
1894 let testdata = arrow::util::test_util::parquet_test_data();
1895 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1896 let data = Bytes::from(std::fs::read(path).unwrap());
1897
1898 let metadata = ParquetMetaDataReader::new()
1899 .parse_and_finish(&data)
1900 .unwrap();
1901 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1902
1903 assert_eq!(metadata.num_row_groups(), 1);
1904
1905 let async_reader = TestReader::new(data.clone());
1906
1907 let a_filter =
1908 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1909 Ok(batch.column(0).as_boolean().clone())
1910 });
1911
1912 let b_scalar = Int8Array::from(vec![2]);
1913 let b_filter = ArrowPredicateFn::new(
1914 ProjectionMask::leaves(&parquet_schema, vec![2]),
1915 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1916 );
1917
1918 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1919
1920 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1921
1922 let options = ArrowReaderOptions::new().with_page_index(true);
1923 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1924 .await
1925 .unwrap()
1926 .with_projection(mask.clone())
1927 .with_batch_size(1024)
1928 .with_row_filter(filter)
1929 .build()
1930 .unwrap();
1931
1932 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1933
1934 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1935
1936 assert_eq!(total_rows, 730);
1937 }
1938
1939 #[tokio::test]
1940 #[allow(deprecated)]
1941 async fn test_in_memory_row_group_sparse() {
1942 let testdata = arrow::util::test_util::parquet_test_data();
1943 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1944 let data = Bytes::from(std::fs::read(path).unwrap());
1945
1946 let metadata = ParquetMetaDataReader::new()
1947 .with_page_indexes(true)
1948 .parse_and_finish(&data)
1949 .unwrap();
1950
1951 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1952
1953 let mut metadata_builder = metadata.into_builder();
1954 let mut row_groups = metadata_builder.take_row_groups();
1955 row_groups.truncate(1);
1956 let row_group_meta = row_groups.pop().unwrap();
1957
1958 let metadata = metadata_builder
1959 .add_row_group(row_group_meta)
1960 .set_column_index(None)
1961 .set_offset_index(Some(vec![offset_index.clone()]))
1962 .build();
1963
1964 let metadata = Arc::new(metadata);
1965
1966 let num_rows = metadata.row_group(0).num_rows();
1967
1968 assert_eq!(metadata.num_row_groups(), 1);
1969
1970 let async_reader = TestReader::new(data.clone());
1971
1972 let requests = async_reader.requests.clone();
1973 let (_, fields) = parquet_to_arrow_schema_and_fields(
1974 metadata.file_metadata().schema_descr(),
1975 ProjectionMask::all(),
1976 None,
1977 )
1978 .unwrap();
1979
1980 let _schema_desc = metadata.file_metadata().schema_descr();
1981
1982 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1983
1984 let reader_factory = ReaderFactory {
1985 metadata,
1986 fields: fields.map(Arc::new),
1987 input: async_reader,
1988 filter: None,
1989 limit: None,
1990 offset: None,
1991 metrics: ArrowReaderMetrics::disabled(),
1992 max_predicate_cache_size: 0,
1993 };
1994
1995 let mut skip = true;
1996 let mut pages = offset_index[0].page_locations.iter().peekable();
1997
1998 let mut selectors = vec![];
2000 let mut expected_page_requests: Vec<Range<usize>> = vec![];
2001 while let Some(page) = pages.next() {
2002 let num_rows = if let Some(next_page) = pages.peek() {
2003 next_page.first_row_index - page.first_row_index
2004 } else {
2005 num_rows - page.first_row_index
2006 };
2007
2008 if skip {
2009 selectors.push(RowSelector::skip(num_rows as usize));
2010 } else {
2011 selectors.push(RowSelector::select(num_rows as usize));
2012 let start = page.offset as usize;
2013 let end = start + page.compressed_page_size as usize;
2014 expected_page_requests.push(start..end);
2015 }
2016 skip = !skip;
2017 }
2018
2019 let selection = RowSelection::from(selectors);
2020
2021 let (_factory, _reader) = reader_factory
2022 .read_row_group(0, Some(selection), projection.clone(), 48)
2023 .await
2024 .expect("reading row group");
2025
2026 let requests = requests.lock().unwrap();
2027
2028 assert_eq!(&requests[..], &expected_page_requests)
2029 }
2030
2031 #[tokio::test]
2032 async fn test_batch_size_overallocate() {
2033 let testdata = arrow::util::test_util::parquet_test_data();
2034 let path = format!("{testdata}/alltypes_plain.parquet");
2036 let data = Bytes::from(std::fs::read(path).unwrap());
2037
2038 let async_reader = TestReader::new(data.clone());
2039
2040 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2041 .await
2042 .unwrap();
2043
2044 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2045
2046 let stream = builder
2047 .with_projection(ProjectionMask::all())
2048 .with_batch_size(1024)
2049 .build()
2050 .unwrap();
2051 assert_ne!(1024, file_rows);
2052 assert_eq!(stream.batch_size, file_rows);
2053 }
2054
2055 #[tokio::test]
2056 async fn test_get_row_group_column_bloom_filter_without_length() {
2057 let testdata = arrow::util::test_util::parquet_test_data();
2058 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2059 let data = Bytes::from(std::fs::read(path).unwrap());
2060 test_get_row_group_column_bloom_filter(data, false).await;
2061 }
2062
2063 #[tokio::test]
2064 async fn test_parquet_record_batch_stream_schema() {
2065 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2066 schema.flattened_fields().iter().map(|f| f.name()).collect()
2067 }
2068
2069 let mut metadata = HashMap::with_capacity(1);
2078 metadata.insert("key".to_string(), "value".to_string());
2079
2080 let nested_struct_array = StructArray::from(vec![
2081 (
2082 Arc::new(Field::new("d", DataType::Utf8, true)),
2083 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2084 ),
2085 (
2086 Arc::new(Field::new("e", DataType::Utf8, true)),
2087 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2088 ),
2089 ]);
2090 let struct_array = StructArray::from(vec![
2091 (
2092 Arc::new(Field::new("a", DataType::Int32, true)),
2093 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2094 ),
2095 (
2096 Arc::new(Field::new("b", DataType::UInt64, true)),
2097 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2098 ),
2099 (
2100 Arc::new(Field::new(
2101 "c",
2102 nested_struct_array.data_type().clone(),
2103 true,
2104 )),
2105 Arc::new(nested_struct_array) as ArrayRef,
2106 ),
2107 ]);
2108
2109 let schema =
2110 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2111 let record_batch = RecordBatch::from(struct_array)
2112 .with_schema(schema.clone())
2113 .unwrap();
2114
2115 let mut file = tempfile().unwrap();
2117 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2118 writer.write(&record_batch).unwrap();
2119 writer.close().unwrap();
2120
2121 let all_fields = ["a", "b", "c", "d", "e"];
2122 let projections = [
2124 (vec![], vec![]),
2125 (vec![0], vec!["a"]),
2126 (vec![0, 1], vec!["a", "b"]),
2127 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2128 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2129 ];
2130
2131 for (indices, expected_projected_names) in projections {
2133 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2134 assert_eq!(get_all_field_names(&builder), all_fields);
2136 assert_eq!(builder.metadata, metadata);
2137 assert_eq!(get_all_field_names(&reader), expected_projected_names);
2139 assert_eq!(reader.metadata, HashMap::default());
2140 assert_eq!(get_all_field_names(&batch), expected_projected_names);
2141 assert_eq!(batch.metadata, HashMap::default());
2142 };
2143
2144 let builder =
2145 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2146 let sync_builder_schema = builder.schema().clone();
2147 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2148 let mut reader = builder.with_projection(mask).build().unwrap();
2149 let sync_reader_schema = reader.schema();
2150 let batch = reader.next().unwrap().unwrap();
2151 let sync_batch_schema = batch.schema();
2152 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2153
2154 let file = tokio::fs::File::from(file.try_clone().unwrap());
2156 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2157 let async_builder_schema = builder.schema().clone();
2158 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2159 let mut reader = builder.with_projection(mask).build().unwrap();
2160 let async_reader_schema = reader.schema().clone();
2161 let batch = reader.next().await.unwrap().unwrap();
2162 let async_batch_schema = batch.schema();
2163 assert_schemas(
2164 async_builder_schema,
2165 async_reader_schema,
2166 async_batch_schema,
2167 );
2168 }
2169 }
2170
2171 #[tokio::test]
2172 async fn test_get_row_group_column_bloom_filter_with_length() {
2173 let testdata = arrow::util::test_util::parquet_test_data();
2175 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2176 let data = Bytes::from(std::fs::read(path).unwrap());
2177 let async_reader = TestReader::new(data.clone());
2178 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2179 .await
2180 .unwrap();
2181 let schema = builder.schema().clone();
2182 let stream = builder.build().unwrap();
2183 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2184
2185 let mut parquet_data = Vec::new();
2186 let props = WriterProperties::builder()
2187 .set_bloom_filter_enabled(true)
2188 .build();
2189 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2190 for batch in batches {
2191 writer.write(&batch).unwrap();
2192 }
2193 writer.close().unwrap();
2194
2195 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2197 }
2198
2199 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2200 let async_reader = TestReader::new(data.clone());
2201
2202 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2203 .await
2204 .unwrap();
2205
2206 let metadata = builder.metadata();
2207 assert_eq!(metadata.num_row_groups(), 1);
2208 let row_group = metadata.row_group(0);
2209 let column = row_group.column(0);
2210 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2211
2212 let sbbf = builder
2213 .get_row_group_column_bloom_filter(0, 0)
2214 .await
2215 .unwrap()
2216 .unwrap();
2217 assert!(sbbf.check(&"Hello"));
2218 assert!(!sbbf.check(&"Hello_Not_Exists"));
2219 }
2220
2221 #[tokio::test]
2222 async fn test_nested_skip() {
2223 let schema = Arc::new(Schema::new(vec![
2224 Field::new("col_1", DataType::UInt64, false),
2225 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2226 ]));
2227
2228 let props = WriterProperties::builder()
2230 .set_data_page_row_count_limit(256)
2231 .set_write_batch_size(256)
2232 .set_max_row_group_size(1024);
2233
2234 let mut file = tempfile().unwrap();
2236 let mut writer =
2237 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2238
2239 let mut builder = ListBuilder::new(StringBuilder::new());
2240 for id in 0..1024 {
2241 match id % 3 {
2242 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2243 1 => builder.append_value([Some(format!("id_{id}"))]),
2244 _ => builder.append_null(),
2245 }
2246 }
2247 let refs = vec![
2248 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2249 Arc::new(builder.finish()) as ArrayRef,
2250 ];
2251
2252 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2253 writer.write(&batch).unwrap();
2254 writer.close().unwrap();
2255
2256 let selections = [
2257 RowSelection::from(vec![
2258 RowSelector::skip(313),
2259 RowSelector::select(1),
2260 RowSelector::skip(709),
2261 RowSelector::select(1),
2262 ]),
2263 RowSelection::from(vec![
2264 RowSelector::skip(255),
2265 RowSelector::select(1),
2266 RowSelector::skip(767),
2267 RowSelector::select(1),
2268 ]),
2269 RowSelection::from(vec![
2270 RowSelector::select(255),
2271 RowSelector::skip(1),
2272 RowSelector::select(767),
2273 RowSelector::skip(1),
2274 ]),
2275 RowSelection::from(vec![
2276 RowSelector::skip(254),
2277 RowSelector::select(1),
2278 RowSelector::select(1),
2279 RowSelector::skip(767),
2280 RowSelector::select(1),
2281 ]),
2282 ];
2283
2284 for selection in selections {
2285 let expected = selection.row_count();
2286 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2288 tokio::fs::File::from_std(file.try_clone().unwrap()),
2289 ArrowReaderOptions::new().with_page_index(true),
2290 )
2291 .await
2292 .unwrap();
2293
2294 reader = reader.with_row_selection(selection);
2295
2296 let mut stream = reader.build().unwrap();
2297
2298 let mut total_rows = 0;
2299 while let Some(rb) = stream.next().await {
2300 let rb = rb.unwrap();
2301 total_rows += rb.num_rows();
2302 }
2303 assert_eq!(total_rows, expected);
2304 }
2305 }
2306
2307 #[tokio::test]
2308 async fn test_row_filter_nested() {
2309 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2310 let b = StructArray::from(vec![
2311 (
2312 Arc::new(Field::new("aa", DataType::Utf8, true)),
2313 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2314 ),
2315 (
2316 Arc::new(Field::new("bb", DataType::Utf8, true)),
2317 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2318 ),
2319 ]);
2320 let c = Int32Array::from_iter(0..6);
2321 let data = RecordBatch::try_from_iter([
2322 ("a", Arc::new(a) as ArrayRef),
2323 ("b", Arc::new(b) as ArrayRef),
2324 ("c", Arc::new(c) as ArrayRef),
2325 ])
2326 .unwrap();
2327
2328 let mut buf = Vec::with_capacity(1024);
2329 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2330 writer.write(&data).unwrap();
2331 writer.close().unwrap();
2332
2333 let data: Bytes = buf.into();
2334 let metadata = ParquetMetaDataReader::new()
2335 .parse_and_finish(&data)
2336 .unwrap();
2337 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2338
2339 let test = TestReader::new(data);
2340 let requests = test.requests.clone();
2341
2342 let a_scalar = StringArray::from_iter_values(["b"]);
2343 let a_filter = ArrowPredicateFn::new(
2344 ProjectionMask::leaves(&parquet_schema, vec![0]),
2345 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2346 );
2347
2348 let b_scalar = StringArray::from_iter_values(["4"]);
2349 let b_filter = ArrowPredicateFn::new(
2350 ProjectionMask::leaves(&parquet_schema, vec![2]),
2351 move |batch| {
2352 let struct_array = batch
2354 .column(0)
2355 .as_any()
2356 .downcast_ref::<StructArray>()
2357 .unwrap();
2358 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2359 },
2360 );
2361
2362 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2363
2364 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2365 let stream = ParquetRecordBatchStreamBuilder::new(test)
2366 .await
2367 .unwrap()
2368 .with_projection(mask.clone())
2369 .with_batch_size(1024)
2370 .with_row_filter(filter)
2371 .build()
2372 .unwrap();
2373
2374 let batches: Vec<_> = stream.try_collect().await.unwrap();
2375 assert_eq!(batches.len(), 1);
2376
2377 let batch = &batches[0];
2378 assert_eq!(batch.num_rows(), 1);
2379 assert_eq!(batch.num_columns(), 2);
2380
2381 let col = batch.column(0);
2382 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2383 assert_eq!(val, "b");
2384
2385 let col = batch.column(1);
2386 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2387 assert_eq!(val, 3);
2388
2389 assert_eq!(requests.lock().unwrap().len(), 3);
2394 }
2395
2396 #[tokio::test]
2397 async fn test_cache_projection_excludes_nested_columns() {
2398 use arrow_array::{ArrayRef, StringArray};
2399
2400 let a = StringArray::from_iter_values(["r1", "r2"]);
2402 let b = StructArray::from(vec![
2403 (
2404 Arc::new(Field::new("aa", DataType::Utf8, true)),
2405 Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2406 ),
2407 (
2408 Arc::new(Field::new("bb", DataType::Utf8, true)),
2409 Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2410 ),
2411 ]);
2412
2413 let schema = Arc::new(Schema::new(vec![
2414 Field::new("a", DataType::Utf8, true),
2415 Field::new("b", b.data_type().clone(), true),
2416 ]));
2417
2418 let mut buf = Vec::new();
2419 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2420 let batch = RecordBatch::try_from_iter([
2421 ("a", Arc::new(a) as ArrayRef),
2422 ("b", Arc::new(b) as ArrayRef),
2423 ])
2424 .unwrap();
2425 writer.write(&batch).unwrap();
2426 writer.close().unwrap();
2427
2428 let data: Bytes = buf.into();
2430 let metadata = ParquetMetaDataReader::new()
2431 .parse_and_finish(&data)
2432 .unwrap();
2433 let metadata = Arc::new(metadata);
2434
2435 let parquet_schema = metadata.file_metadata().schema_descr();
2438 let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2439
2440 let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2441 Ok(arrow_array::BooleanArray::from(vec![
2442 true;
2443 batch.num_rows()
2444 ]))
2445 });
2446 let filter = RowFilter::new(vec![Box::new(always_true)]);
2447
2448 let reader_factory = ReaderFactory {
2450 metadata: Arc::clone(&metadata),
2451 fields: None,
2452 input: TestReader::new(data),
2453 filter: Some(filter),
2454 limit: None,
2455 offset: None,
2456 metrics: ArrowReaderMetrics::disabled(),
2457 max_predicate_cache_size: 0,
2458 };
2459
2460 let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2462
2463 assert!(cache_projection.is_none());
2465 }
2466
2467 #[tokio::test]
2468 #[allow(deprecated)]
2469 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2470 use tokio::fs::File;
2471 let testdata = arrow::util::test_util::parquet_test_data();
2472 let path = format!("{testdata}/alltypes_plain.parquet");
2473 let mut file = File::open(&path).await.unwrap();
2474 let file_size = file.metadata().await.unwrap().len();
2475 let mut metadata = ParquetMetaDataReader::new()
2476 .with_page_indexes(true)
2477 .load_and_finish(&mut file, file_size)
2478 .await
2479 .unwrap();
2480
2481 metadata.set_offset_index(Some(vec![]));
2482 let options = ArrowReaderOptions::new().with_page_index(true);
2483 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2484 let reader =
2485 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2486 .build()
2487 .unwrap();
2488
2489 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2490 assert_eq!(result.len(), 1);
2491 }
2492
2493 #[tokio::test]
2494 #[allow(deprecated)]
2495 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2496 use tokio::fs::File;
2497 let testdata = arrow::util::test_util::parquet_test_data();
2498 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2499 let mut file = File::open(&path).await.unwrap();
2500 let file_size = file.metadata().await.unwrap().len();
2501 let metadata = ParquetMetaDataReader::new()
2502 .with_page_indexes(true)
2503 .load_and_finish(&mut file, file_size)
2504 .await
2505 .unwrap();
2506
2507 let options = ArrowReaderOptions::new().with_page_index(true);
2508 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2509 let reader =
2510 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2511 .build()
2512 .unwrap();
2513
2514 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2515 assert_eq!(result.len(), 8);
2516 }
2517
2518 #[tokio::test]
2519 #[allow(deprecated)]
2520 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2521 use tempfile::TempDir;
2522 use tokio::fs::File;
2523 fn write_metadata_to_local_file(
2524 metadata: ParquetMetaData,
2525 file: impl AsRef<std::path::Path>,
2526 ) {
2527 use crate::file::metadata::ParquetMetaDataWriter;
2528 use std::fs::File;
2529 let file = File::create(file).unwrap();
2530 ParquetMetaDataWriter::new(file, &metadata)
2531 .finish()
2532 .unwrap()
2533 }
2534
2535 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2536 use std::fs::File;
2537 let file = File::open(file).unwrap();
2538 ParquetMetaDataReader::new()
2539 .with_page_indexes(true)
2540 .parse_and_finish(&file)
2541 .unwrap()
2542 }
2543
2544 let testdata = arrow::util::test_util::parquet_test_data();
2545 let path = format!("{testdata}/alltypes_plain.parquet");
2546 let mut file = File::open(&path).await.unwrap();
2547 let file_size = file.metadata().await.unwrap().len();
2548 let metadata = ParquetMetaDataReader::new()
2549 .with_page_indexes(true)
2550 .load_and_finish(&mut file, file_size)
2551 .await
2552 .unwrap();
2553
2554 let tempdir = TempDir::new().unwrap();
2555 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2556 write_metadata_to_local_file(metadata, &metadata_path);
2557 let metadata = read_metadata_from_local_file(&metadata_path);
2558
2559 let options = ArrowReaderOptions::new().with_page_index(true);
2560 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2561 let reader =
2562 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2563 .build()
2564 .unwrap();
2565
2566 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2568 assert_eq!(result.len(), 1);
2569 }
2570
2571 #[tokio::test]
2572 async fn test_cached_array_reader_sparse_offset_error() {
2573 use futures::TryStreamExt;
2574
2575 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2576 use arrow_array::{BooleanArray, RecordBatch};
2577
2578 let testdata = arrow::util::test_util::parquet_test_data();
2579 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2580 let data = Bytes::from(std::fs::read(path).unwrap());
2581
2582 let async_reader = TestReader::new(data);
2583
2584 let options = ArrowReaderOptions::new().with_page_index(true);
2586 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2587 .await
2588 .unwrap();
2589
2590 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2594
2595 let parquet_schema = builder.parquet_schema();
2599 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2600 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2601 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2602 });
2603 let filter = RowFilter::new(vec![Box::new(always_true)]);
2604
2605 let stream = builder
2608 .with_batch_size(8)
2609 .with_projection(proj)
2610 .with_row_selection(selection)
2611 .with_row_filter(filter)
2612 .build()
2613 .unwrap();
2614
2615 let _result: Vec<_> = stream.try_collect().await.unwrap();
2618 }
2619
2620 #[tokio::test]
2621 async fn test_predicate_cache_disabled() {
2622 let k = Int32Array::from_iter_values(0..10);
2623 let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2624
2625 let mut buf = Vec::new();
2626 let props = WriterProperties::builder()
2628 .set_data_page_row_count_limit(1)
2629 .set_write_batch_size(1)
2630 .set_max_row_group_size(10)
2631 .set_write_page_header_statistics(true)
2632 .build();
2633 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2634 writer.write(&data).unwrap();
2635 writer.close().unwrap();
2636
2637 let data = Bytes::from(buf);
2638 let metadata = ParquetMetaDataReader::new()
2639 .with_page_index_policy(PageIndexPolicy::Required)
2640 .parse_and_finish(&data)
2641 .unwrap();
2642 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2643
2644 let build_filter = || {
2646 let scalar = Int32Array::from_iter_values([5]);
2647 let predicate = ArrowPredicateFn::new(
2648 ProjectionMask::leaves(&parquet_schema, vec![0]),
2649 move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2650 );
2651 RowFilter::new(vec![Box::new(predicate)])
2652 };
2653
2654 let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2656
2657 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2658 let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2659
2660 let reader_with_cache = TestReader::new(data.clone());
2662 let requests_with_cache = reader_with_cache.requests.clone();
2663 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2664 reader_with_cache,
2665 reader_metadata.clone(),
2666 )
2667 .with_batch_size(1000)
2668 .with_row_selection(selection.clone())
2669 .with_row_filter(build_filter())
2670 .build()
2671 .unwrap();
2672 let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2673
2674 let reader_without_cache = TestReader::new(data);
2676 let requests_without_cache = reader_without_cache.requests.clone();
2677 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2678 reader_without_cache,
2679 reader_metadata,
2680 )
2681 .with_batch_size(1000)
2682 .with_row_selection(selection)
2683 .with_row_filter(build_filter())
2684 .with_max_predicate_cache_size(0) .build()
2686 .unwrap();
2687 let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2688
2689 assert_eq!(batches_with_cache, batches_without_cache);
2690
2691 let requests_with_cache = requests_with_cache.lock().unwrap();
2692 let requests_without_cache = requests_without_cache.lock().unwrap();
2693
2694 assert_eq!(requests_with_cache.len(), 11);
2696 assert_eq!(requests_without_cache.len(), 2);
2697
2698 assert_eq!(
2700 requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2701 433
2702 );
2703 assert_eq!(
2704 requests_without_cache
2705 .iter()
2706 .map(|r| r.len())
2707 .sum::<usize>(),
2708 92
2709 );
2710 }
2711}