1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42 ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46 RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72pub trait AsyncFileReader: Send {
86 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91 async move {
92 let mut result = Vec::with_capacity(ranges.len());
93
94 for range in ranges.into_iter() {
95 let data = self.get_bytes(range).await?;
96 result.push(data);
97 }
98
99 Ok(result)
100 }
101 .boxed()
102 }
103
104 fn get_metadata<'a>(
121 &'a mut self,
122 options: Option<&'a ArrowReaderOptions>,
123 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129 self.as_mut().get_bytes(range)
130 }
131
132 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133 self.as_mut().get_byte_ranges(ranges)
134 }
135
136 fn get_metadata<'a>(
137 &'a mut self,
138 options: Option<&'a ArrowReaderOptions>,
139 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140 self.as_mut().get_metadata(options)
141 }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146 async move {
147 self.seek(SeekFrom::End(-(suffix as i64))).await?;
148 let mut buf = Vec::with_capacity(suffix);
149 self.take(suffix as _).read_to_end(&mut buf).await?;
150 Ok(buf.into())
151 }
152 .boxed()
153 }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158 async move {
159 self.seek(SeekFrom::Start(range.start)).await?;
160
161 let to_read = range.end - range.start;
162 let mut buffer = Vec::with_capacity(to_read.try_into()?);
163 let read = self.take(to_read).read_to_end(&mut buffer).await?;
164 if read as u64 != to_read {
165 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166 }
167
168 Ok(buffer.into())
169 }
170 .boxed()
171 }
172
173 fn get_metadata<'a>(
174 &'a mut self,
175 options: Option<&'a ArrowReaderOptions>,
176 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177 async move {
178 let metadata_reader = ParquetMetaDataReader::new()
179 .with_page_indexes(options.is_some_and(|o| o.page_index));
180
181 #[cfg(feature = "encryption")]
182 let metadata_reader = metadata_reader.with_decryption_properties(
183 options.and_then(|o| o.file_decryption_properties.as_ref()),
184 );
185
186 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
187 Ok(Arc::new(parquet_metadata))
188 }
189 .boxed()
190 }
191}
192
193impl ArrowReaderMetadata {
194 pub async fn load_async<T: AsyncFileReader>(
198 input: &mut T,
199 options: ArrowReaderOptions,
200 ) -> Result<Self> {
201 let metadata = input.get_metadata(Some(&options)).await?;
202 Self::try_new(metadata, options)
203 }
204}
205
206#[doc(hidden)]
207pub struct AsyncReader<T>(T);
212
213pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
226
227impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
228 pub async fn new(input: T) -> Result<Self> {
359 Self::new_with_options(input, Default::default()).await
360 }
361
362 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
365 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
366 Ok(Self::new_with_metadata(input, metadata))
367 }
368
369 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
415 Self::new_builder(AsyncReader(input), metadata)
416 }
417
418 pub async fn get_row_group_column_bloom_filter(
424 &mut self,
425 row_group_idx: usize,
426 column_idx: usize,
427 ) -> Result<Option<Sbbf>> {
428 let metadata = self.metadata.row_group(row_group_idx);
429 let column_metadata = metadata.column(column_idx);
430
431 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
432 offset
433 .try_into()
434 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
435 } else {
436 return Ok(None);
437 };
438
439 let buffer = match column_metadata.bloom_filter_length() {
440 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
441 None => self
442 .input
443 .0
444 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
445 }
446 .await?;
447
448 let (header, bitset_offset) =
449 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
450
451 match header.algorithm {
452 BloomFilterAlgorithm::BLOCK(_) => {
453 }
455 }
456 match header.compression {
457 BloomFilterCompression::UNCOMPRESSED(_) => {
458 }
460 }
461 match header.hash {
462 BloomFilterHash::XXHASH(_) => {
463 }
465 }
466
467 let bitset = match column_metadata.bloom_filter_length() {
468 Some(_) => buffer.slice(
469 (TryInto::<usize>::try_into(bitset_offset).unwrap()
470 - TryInto::<usize>::try_into(offset).unwrap())..,
471 ),
472 None => {
473 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
474 ParquetError::General("Bloom filter length is invalid".to_string())
475 })?;
476 self.input
477 .0
478 .get_bytes(bitset_offset..bitset_offset + bitset_length)
479 .await?
480 }
481 };
482 Ok(Some(Sbbf::new(&bitset)))
483 }
484
485 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
489 let num_row_groups = self.metadata.row_groups().len();
490
491 let row_groups = match self.row_groups {
492 Some(row_groups) => {
493 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
494 return Err(general_err!(
495 "row group {} out of bounds 0..{}",
496 col,
497 num_row_groups
498 ));
499 }
500 row_groups.into()
501 }
502 None => (0..self.metadata.row_groups().len()).collect(),
503 };
504
505 let batch_size = self
507 .batch_size
508 .min(self.metadata.file_metadata().num_rows() as usize);
509 let reader_factory = ReaderFactory {
510 input: self.input.0,
511 filter: self.filter,
512 metadata: self.metadata.clone(),
513 fields: self.fields,
514 limit: self.limit,
515 offset: self.offset,
516 metrics: self.metrics,
517 max_predicate_cache_size: self.max_predicate_cache_size,
518 };
519
520 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
523 Some(DataType::Struct(fields)) => {
524 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
525 }
526 None => Fields::empty(),
527 _ => unreachable!("Must be Struct for root type"),
528 };
529 let schema = Arc::new(Schema::new(projected_fields));
530
531 Ok(ParquetRecordBatchStream {
532 metadata: self.metadata,
533 batch_size,
534 row_groups,
535 projection: self.projection,
536 selection: self.selection,
537 schema,
538 reader_factory: Some(reader_factory),
539 state: StreamState::Init,
540 })
541 }
542}
543
544type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
549
550struct ReaderFactory<T> {
553 metadata: Arc<ParquetMetaData>,
554
555 fields: Option<Arc<ParquetField>>,
557
558 input: T,
559
560 filter: Option<RowFilter>,
562
563 limit: Option<usize>,
565
566 offset: Option<usize>,
568
569 metrics: ArrowReaderMetrics,
571
572 max_predicate_cache_size: usize,
574}
575
576impl<T> ReaderFactory<T>
577where
578 T: AsyncFileReader + Send,
579{
580 async fn read_row_group(
586 mut self,
587 row_group_idx: usize,
588 selection: Option<RowSelection>,
589 projection: ProjectionMask,
590 batch_size: usize,
591 ) -> ReadResult<T> {
592 let meta = self.metadata.row_group(row_group_idx);
595 let offset_index = self
596 .metadata
597 .offset_index()
598 .filter(|index| !index.is_empty())
600 .map(|x| x[row_group_idx].as_slice());
601
602 let cache_projection = match self.compute_cache_projection(&projection) {
604 Some(projection) => projection,
605 None => ProjectionMask::none(meta.columns().len()),
606 };
607 let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
608 batch_size,
609 self.max_predicate_cache_size,
610 )));
611
612 let mut row_group = InMemoryRowGroup {
613 row_count: meta.num_rows() as usize,
615 column_chunks: vec![None; meta.columns().len()],
616 offset_index,
617 row_group_idx,
618 metadata: self.metadata.as_ref(),
619 };
620
621 let cache_options_builder =
622 CacheOptionsBuilder::new(&cache_projection, row_group_cache.clone());
623
624 let filter = self.filter.as_mut();
625 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
626
627 if let Some(filter) = filter {
629 let cache_options = cache_options_builder.clone().producer();
630
631 for predicate in filter.predicates.iter_mut() {
632 if !plan_builder.selects_any() {
633 return Ok((self, None)); }
635
636 let selection = plan_builder.selection();
638 let cache_mask = Some(&cache_projection);
640 row_group
641 .fetch(
642 &mut self.input,
643 predicate.projection(),
644 selection,
645 batch_size,
646 cache_mask,
647 )
648 .await?;
649
650 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
651 .with_cache_options(Some(&cache_options))
652 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
653
654 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
655 }
656 }
657
658 let rows_before = plan_builder
660 .num_rows_selected()
661 .unwrap_or(row_group.row_count);
662
663 if rows_before == 0 {
664 return Ok((self, None)); }
666
667 let plan_builder = plan_builder
669 .limited(row_group.row_count)
670 .with_offset(self.offset)
671 .with_limit(self.limit)
672 .build_limited();
673
674 let rows_after = plan_builder
675 .num_rows_selected()
676 .unwrap_or(row_group.row_count);
677
678 if let Some(offset) = &mut self.offset {
680 *offset = offset.saturating_sub(rows_before - rows_after)
683 }
684
685 if rows_after == 0 {
686 return Ok((self, None)); }
688
689 if let Some(limit) = &mut self.limit {
690 *limit -= rows_after;
691 }
692 row_group
694 .fetch(
696 &mut self.input,
697 &projection,
698 plan_builder.selection(),
699 batch_size,
700 None,
701 )
702 .await?;
703
704 let plan = plan_builder.build();
705
706 let cache_options = cache_options_builder.consumer();
707 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
708 .with_cache_options(Some(&cache_options))
709 .build_array_reader(self.fields.as_deref(), &projection)?;
710
711 let reader = ParquetRecordBatchReader::new(array_reader, plan);
712
713 Ok((self, Some(reader)))
714 }
715
716 fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
718 let filters = self.filter.as_ref()?;
719 let mut cache_projection = filters.predicates.first()?.projection().clone();
720 for predicate in filters.predicates.iter() {
721 cache_projection.union(predicate.projection());
722 }
723 cache_projection.intersect(projection);
724 self.exclude_nested_columns_from_cache(&cache_projection)
725 }
726
727 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
729 let schema = self.metadata.file_metadata().schema_descr();
730 let num_leaves = schema.num_columns();
731
732 let num_roots = schema.root_schema().get_fields().len();
734 let mut root_leaf_counts = vec![0usize; num_roots];
735 for leaf_idx in 0..num_leaves {
736 let root_idx = schema.get_column_root_idx(leaf_idx);
737 root_leaf_counts[root_idx] += 1;
738 }
739
740 let mut included_leaves = Vec::new();
742 for leaf_idx in 0..num_leaves {
743 if mask.leaf_included(leaf_idx) {
744 let root_idx = schema.get_column_root_idx(leaf_idx);
745 if root_leaf_counts[root_idx] == 1 {
746 included_leaves.push(leaf_idx);
747 }
748 }
749 }
750
751 if included_leaves.is_empty() {
752 None
753 } else {
754 Some(ProjectionMask::leaves(schema, included_leaves))
755 }
756 }
757}
758
759enum StreamState<T> {
760 Init,
762 Decoding(ParquetRecordBatchReader),
764 Reading(BoxFuture<'static, ReadResult<T>>),
766 Error,
768}
769
770impl<T> std::fmt::Debug for StreamState<T> {
771 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
772 match self {
773 StreamState::Init => write!(f, "StreamState::Init"),
774 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
775 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
776 StreamState::Error => write!(f, "StreamState::Error"),
777 }
778 }
779}
780
781pub struct ParquetRecordBatchStream<T> {
799 metadata: Arc<ParquetMetaData>,
800
801 schema: SchemaRef,
802
803 row_groups: VecDeque<usize>,
804
805 projection: ProjectionMask,
806
807 batch_size: usize,
808
809 selection: Option<RowSelection>,
810
811 reader_factory: Option<ReaderFactory<T>>,
813
814 state: StreamState<T>,
815}
816
817impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
818 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
819 f.debug_struct("ParquetRecordBatchStream")
820 .field("metadata", &self.metadata)
821 .field("schema", &self.schema)
822 .field("batch_size", &self.batch_size)
823 .field("projection", &self.projection)
824 .field("state", &self.state)
825 .finish()
826 }
827}
828
829impl<T> ParquetRecordBatchStream<T> {
830 pub fn schema(&self) -> &SchemaRef {
835 &self.schema
836 }
837}
838
839impl<T> ParquetRecordBatchStream<T>
840where
841 T: AsyncFileReader + Unpin + Send + 'static,
842{
843 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
857 loop {
858 match &mut self.state {
859 StreamState::Decoding(_) | StreamState::Reading(_) => {
860 return Err(ParquetError::General(
861 "Cannot combine the use of next_row_group with the Stream API".to_string(),
862 ))
863 }
864 StreamState::Init => {
865 let row_group_idx = match self.row_groups.pop_front() {
866 Some(idx) => idx,
867 None => return Ok(None),
868 };
869
870 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
871
872 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
873
874 let reader_factory = self.reader_factory.take().expect("lost reader factory");
875
876 let (reader_factory, maybe_reader) = reader_factory
877 .read_row_group(
878 row_group_idx,
879 selection,
880 self.projection.clone(),
881 self.batch_size,
882 )
883 .await
884 .inspect_err(|_| {
885 self.state = StreamState::Error;
886 })?;
887 self.reader_factory = Some(reader_factory);
888
889 if let Some(reader) = maybe_reader {
890 return Ok(Some(reader));
891 } else {
892 continue;
894 }
895 }
896 StreamState::Error => return Ok(None), }
898 }
899 }
900}
901
902impl<T> Stream for ParquetRecordBatchStream<T>
903where
904 T: AsyncFileReader + Unpin + Send + 'static,
905{
906 type Item = Result<RecordBatch>;
907
908 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
909 loop {
910 match &mut self.state {
911 StreamState::Decoding(batch_reader) => match batch_reader.next() {
912 Some(Ok(batch)) => {
913 return Poll::Ready(Some(Ok(batch)));
914 }
915 Some(Err(e)) => {
916 self.state = StreamState::Error;
917 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
918 }
919 None => self.state = StreamState::Init,
920 },
921 StreamState::Init => {
922 let row_group_idx = match self.row_groups.pop_front() {
923 Some(idx) => idx,
924 None => return Poll::Ready(None),
925 };
926
927 let reader = self.reader_factory.take().expect("lost reader factory");
928
929 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
930
931 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
932
933 let fut = reader
934 .read_row_group(
935 row_group_idx,
936 selection,
937 self.projection.clone(),
938 self.batch_size,
939 )
940 .boxed();
941
942 self.state = StreamState::Reading(fut)
943 }
944 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
945 Ok((reader_factory, maybe_reader)) => {
946 self.reader_factory = Some(reader_factory);
947 match maybe_reader {
948 Some(reader) => self.state = StreamState::Decoding(reader),
950 None => self.state = StreamState::Init,
952 }
953 }
954 Err(e) => {
955 self.state = StreamState::Error;
956 return Poll::Ready(Some(Err(e)));
957 }
958 },
959 StreamState::Error => return Poll::Ready(None), }
961 }
962 }
963}
964
965struct InMemoryRowGroup<'a> {
967 offset_index: Option<&'a [OffsetIndexMetaData]>,
968 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
970 row_count: usize,
971 row_group_idx: usize,
972 metadata: &'a ParquetMetaData,
973}
974
975impl InMemoryRowGroup<'_> {
976 async fn fetch<T: AsyncFileReader + Send>(
982 &mut self,
983 input: &mut T,
984 projection: &ProjectionMask,
985 selection: Option<&RowSelection>,
986 batch_size: usize,
987 cache_mask: Option<&ProjectionMask>,
988 ) -> Result<()> {
989 let metadata = self.metadata.row_group(self.row_group_idx);
990 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
991 let expanded_selection =
992 selection.expand_to_batch_boundaries(batch_size, self.row_count);
993 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
996
997 let fetch_ranges = self
998 .column_chunks
999 .iter()
1000 .zip(metadata.columns())
1001 .enumerate()
1002 .filter(|&(idx, (chunk, _chunk_meta))| {
1003 chunk.is_none() && projection.leaf_included(idx)
1004 })
1005 .flat_map(|(idx, (_chunk, chunk_meta))| {
1006 let mut ranges: Vec<Range<u64>> = vec![];
1009 let (start, _len) = chunk_meta.byte_range();
1010 match offset_index[idx].page_locations.first() {
1011 Some(first) if first.offset as u64 != start => {
1012 ranges.push(start..first.offset as u64);
1013 }
1014 _ => (),
1015 }
1016
1017 let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1019 if use_expanded {
1020 ranges.extend(
1021 expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1022 );
1023 } else {
1024 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1025 }
1026 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1027
1028 ranges
1029 })
1030 .collect();
1031
1032 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1033 let mut page_start_offsets = page_start_offsets.into_iter();
1034
1035 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1036 if chunk.is_some() || !projection.leaf_included(idx) {
1037 continue;
1038 }
1039
1040 if let Some(offsets) = page_start_offsets.next() {
1041 let mut chunks = Vec::with_capacity(offsets.len());
1042 for _ in 0..offsets.len() {
1043 chunks.push(chunk_data.next().unwrap());
1044 }
1045
1046 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1047 length: metadata.column(idx).byte_range().1 as usize,
1048 data: offsets
1049 .into_iter()
1050 .map(|x| x as usize)
1051 .zip(chunks.into_iter())
1052 .collect(),
1053 }))
1054 }
1055 }
1056 } else {
1057 let fetch_ranges = self
1058 .column_chunks
1059 .iter()
1060 .enumerate()
1061 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1062 .map(|(idx, _chunk)| {
1063 let column = metadata.column(idx);
1064 let (start, length) = column.byte_range();
1065 start..(start + length)
1066 })
1067 .collect();
1068
1069 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1070
1071 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1072 if chunk.is_some() || !projection.leaf_included(idx) {
1073 continue;
1074 }
1075
1076 if let Some(data) = chunk_data.next() {
1077 *chunk = Some(Arc::new(ColumnChunkData::Dense {
1078 offset: metadata.column(idx).byte_range().0 as usize,
1079 data,
1080 }));
1081 }
1082 }
1083 }
1084
1085 Ok(())
1086 }
1087}
1088
1089impl RowGroups for InMemoryRowGroup<'_> {
1090 fn num_rows(&self) -> usize {
1091 self.row_count
1092 }
1093
1094 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1096 match &self.column_chunks[i] {
1097 None => Err(ParquetError::General(format!(
1098 "Invalid column index {i}, column was not fetched"
1099 ))),
1100 Some(data) => {
1101 let page_locations = self
1102 .offset_index
1103 .filter(|index| !index.is_empty())
1105 .map(|index| index[i].page_locations.clone());
1106 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1107 let page_reader = SerializedPageReader::new(
1108 data.clone(),
1109 column_chunk_metadata,
1110 self.row_count,
1111 page_locations,
1112 )?;
1113 let page_reader = page_reader.add_crypto_context(
1114 self.row_group_idx,
1115 i,
1116 self.metadata,
1117 column_chunk_metadata,
1118 )?;
1119
1120 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1121
1122 Ok(Box::new(ColumnChunkIterator {
1123 reader: Some(Ok(page_reader)),
1124 }))
1125 }
1126 }
1127 }
1128}
1129
1130#[derive(Clone)]
1132enum ColumnChunkData {
1133 Sparse {
1135 length: usize,
1137 data: Vec<(usize, Bytes)>,
1142 },
1143 Dense { offset: usize, data: Bytes },
1145}
1146
1147impl ColumnChunkData {
1148 fn get(&self, start: u64) -> Result<Bytes> {
1150 match &self {
1151 ColumnChunkData::Sparse { data, .. } => data
1152 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1153 .map(|idx| data[idx].1.clone())
1154 .map_err(|_| {
1155 ParquetError::General(format!(
1156 "Invalid offset in sparse column chunk data: {start}"
1157 ))
1158 }),
1159 ColumnChunkData::Dense { offset, data } => {
1160 let start = start as usize - *offset;
1161 Ok(data.slice(start..))
1162 }
1163 }
1164 }
1165}
1166
1167impl Length for ColumnChunkData {
1168 fn len(&self) -> u64 {
1170 match &self {
1171 ColumnChunkData::Sparse { length, .. } => *length as u64,
1172 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1173 }
1174 }
1175}
1176
1177impl ChunkReader for ColumnChunkData {
1178 type T = bytes::buf::Reader<Bytes>;
1179
1180 fn get_read(&self, start: u64) -> Result<Self::T> {
1181 Ok(self.get(start)?.reader())
1182 }
1183
1184 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1185 Ok(self.get(start)?.slice(..length))
1186 }
1187}
1188
1189struct ColumnChunkIterator {
1191 reader: Option<Result<Box<dyn PageReader>>>,
1192}
1193
1194impl Iterator for ColumnChunkIterator {
1195 type Item = Result<Box<dyn PageReader>>;
1196
1197 fn next(&mut self) -> Option<Self::Item> {
1198 self.reader.take()
1199 }
1200}
1201
1202impl PageIterator for ColumnChunkIterator {}
1203
1204#[cfg(test)]
1205mod tests {
1206 use super::*;
1207 use crate::arrow::arrow_reader::{
1208 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1209 };
1210 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1211 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1212 use crate::arrow::ArrowWriter;
1213 use crate::file::metadata::ParquetMetaDataReader;
1214 use crate::file::properties::WriterProperties;
1215 use arrow::compute::kernels::cmp::eq;
1216 use arrow::error::Result as ArrowResult;
1217 use arrow_array::builder::{ListBuilder, StringBuilder};
1218 use arrow_array::cast::AsArray;
1219 use arrow_array::types::Int32Type;
1220 use arrow_array::{
1221 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1222 StructArray, UInt64Array,
1223 };
1224 use arrow_schema::{DataType, Field, Schema};
1225 use futures::{StreamExt, TryStreamExt};
1226 use rand::{rng, Rng};
1227 use std::collections::HashMap;
1228 use std::sync::{Arc, Mutex};
1229 use tempfile::tempfile;
1230
1231 #[derive(Clone)]
1232 struct TestReader {
1233 data: Bytes,
1234 metadata: Option<Arc<ParquetMetaData>>,
1235 requests: Arc<Mutex<Vec<Range<usize>>>>,
1236 }
1237
1238 impl TestReader {
1239 fn new(data: Bytes) -> Self {
1240 Self {
1241 data,
1242 metadata: Default::default(),
1243 requests: Default::default(),
1244 }
1245 }
1246 }
1247
1248 impl AsyncFileReader for TestReader {
1249 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1250 let range = range.clone();
1251 self.requests
1252 .lock()
1253 .unwrap()
1254 .push(range.start as usize..range.end as usize);
1255 futures::future::ready(Ok(self
1256 .data
1257 .slice(range.start as usize..range.end as usize)))
1258 .boxed()
1259 }
1260
1261 fn get_metadata<'a>(
1262 &'a mut self,
1263 options: Option<&'a ArrowReaderOptions>,
1264 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1265 let metadata_reader = ParquetMetaDataReader::new()
1266 .with_page_indexes(options.is_some_and(|o| o.page_index));
1267 self.metadata = Some(Arc::new(
1268 metadata_reader.parse_and_finish(&self.data).unwrap(),
1269 ));
1270 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1271 }
1272 }
1273
1274 #[tokio::test]
1275 async fn test_async_reader() {
1276 let testdata = arrow::util::test_util::parquet_test_data();
1277 let path = format!("{testdata}/alltypes_plain.parquet");
1278 let data = Bytes::from(std::fs::read(path).unwrap());
1279
1280 let async_reader = TestReader::new(data.clone());
1281
1282 let requests = async_reader.requests.clone();
1283 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1284 .await
1285 .unwrap();
1286
1287 let metadata = builder.metadata().clone();
1288 assert_eq!(metadata.num_row_groups(), 1);
1289
1290 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1291 let stream = builder
1292 .with_projection(mask.clone())
1293 .with_batch_size(1024)
1294 .build()
1295 .unwrap();
1296
1297 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1298
1299 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1300 .unwrap()
1301 .with_projection(mask)
1302 .with_batch_size(104)
1303 .build()
1304 .unwrap()
1305 .collect::<ArrowResult<Vec<_>>>()
1306 .unwrap();
1307
1308 assert_eq!(async_batches, sync_batches);
1309
1310 let requests = requests.lock().unwrap();
1311 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1312 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1313
1314 assert_eq!(
1315 &requests[..],
1316 &[
1317 offset_1 as usize..(offset_1 + length_1) as usize,
1318 offset_2 as usize..(offset_2 + length_2) as usize
1319 ]
1320 );
1321 }
1322
1323 #[tokio::test]
1324 async fn test_async_reader_with_next_row_group() {
1325 let testdata = arrow::util::test_util::parquet_test_data();
1326 let path = format!("{testdata}/alltypes_plain.parquet");
1327 let data = Bytes::from(std::fs::read(path).unwrap());
1328
1329 let async_reader = TestReader::new(data.clone());
1330
1331 let requests = async_reader.requests.clone();
1332 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1333 .await
1334 .unwrap();
1335
1336 let metadata = builder.metadata().clone();
1337 assert_eq!(metadata.num_row_groups(), 1);
1338
1339 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1340 let mut stream = builder
1341 .with_projection(mask.clone())
1342 .with_batch_size(1024)
1343 .build()
1344 .unwrap();
1345
1346 let mut readers = vec![];
1347 while let Some(reader) = stream.next_row_group().await.unwrap() {
1348 readers.push(reader);
1349 }
1350
1351 let async_batches: Vec<_> = readers
1352 .into_iter()
1353 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1354 .collect();
1355
1356 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1357 .unwrap()
1358 .with_projection(mask)
1359 .with_batch_size(104)
1360 .build()
1361 .unwrap()
1362 .collect::<ArrowResult<Vec<_>>>()
1363 .unwrap();
1364
1365 assert_eq!(async_batches, sync_batches);
1366
1367 let requests = requests.lock().unwrap();
1368 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1369 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1370
1371 assert_eq!(
1372 &requests[..],
1373 &[
1374 offset_1 as usize..(offset_1 + length_1) as usize,
1375 offset_2 as usize..(offset_2 + length_2) as usize
1376 ]
1377 );
1378 }
1379
1380 #[tokio::test]
1381 async fn test_async_reader_with_index() {
1382 let testdata = arrow::util::test_util::parquet_test_data();
1383 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1384 let data = Bytes::from(std::fs::read(path).unwrap());
1385
1386 let async_reader = TestReader::new(data.clone());
1387
1388 let options = ArrowReaderOptions::new().with_page_index(true);
1389 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1390 .await
1391 .unwrap();
1392
1393 let metadata_with_index = builder.metadata();
1395 assert_eq!(metadata_with_index.num_row_groups(), 1);
1396
1397 let offset_index = metadata_with_index.offset_index().unwrap();
1399 let column_index = metadata_with_index.column_index().unwrap();
1400
1401 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1402 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1403
1404 let num_columns = metadata_with_index
1405 .file_metadata()
1406 .schema_descr()
1407 .num_columns();
1408
1409 offset_index
1411 .iter()
1412 .for_each(|x| assert_eq!(x.len(), num_columns));
1413 column_index
1414 .iter()
1415 .for_each(|x| assert_eq!(x.len(), num_columns));
1416
1417 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1418 let stream = builder
1419 .with_projection(mask.clone())
1420 .with_batch_size(1024)
1421 .build()
1422 .unwrap();
1423
1424 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1425
1426 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1427 .unwrap()
1428 .with_projection(mask)
1429 .with_batch_size(1024)
1430 .build()
1431 .unwrap()
1432 .collect::<ArrowResult<Vec<_>>>()
1433 .unwrap();
1434
1435 assert_eq!(async_batches, sync_batches);
1436 }
1437
1438 #[tokio::test]
1439 async fn test_async_reader_with_limit() {
1440 let testdata = arrow::util::test_util::parquet_test_data();
1441 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1442 let data = Bytes::from(std::fs::read(path).unwrap());
1443
1444 let metadata = ParquetMetaDataReader::new()
1445 .parse_and_finish(&data)
1446 .unwrap();
1447 let metadata = Arc::new(metadata);
1448
1449 assert_eq!(metadata.num_row_groups(), 1);
1450
1451 let async_reader = TestReader::new(data.clone());
1452
1453 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1454 .await
1455 .unwrap();
1456
1457 assert_eq!(builder.metadata().num_row_groups(), 1);
1458
1459 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1460 let stream = builder
1461 .with_projection(mask.clone())
1462 .with_batch_size(1024)
1463 .with_limit(1)
1464 .build()
1465 .unwrap();
1466
1467 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1468
1469 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1470 .unwrap()
1471 .with_projection(mask)
1472 .with_batch_size(1024)
1473 .with_limit(1)
1474 .build()
1475 .unwrap()
1476 .collect::<ArrowResult<Vec<_>>>()
1477 .unwrap();
1478
1479 assert_eq!(async_batches, sync_batches);
1480 }
1481
1482 #[tokio::test]
1483 async fn test_async_reader_skip_pages() {
1484 let testdata = arrow::util::test_util::parquet_test_data();
1485 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1486 let data = Bytes::from(std::fs::read(path).unwrap());
1487
1488 let async_reader = TestReader::new(data.clone());
1489
1490 let options = ArrowReaderOptions::new().with_page_index(true);
1491 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1492 .await
1493 .unwrap();
1494
1495 assert_eq!(builder.metadata().num_row_groups(), 1);
1496
1497 let selection = RowSelection::from(vec![
1498 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1507
1508 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1509
1510 let stream = builder
1511 .with_projection(mask.clone())
1512 .with_row_selection(selection.clone())
1513 .build()
1514 .expect("building stream");
1515
1516 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1517
1518 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1519 .unwrap()
1520 .with_projection(mask)
1521 .with_batch_size(1024)
1522 .with_row_selection(selection)
1523 .build()
1524 .unwrap()
1525 .collect::<ArrowResult<Vec<_>>>()
1526 .unwrap();
1527
1528 assert_eq!(async_batches, sync_batches);
1529 }
1530
1531 #[tokio::test]
1532 async fn test_fuzz_async_reader_selection() {
1533 let testdata = arrow::util::test_util::parquet_test_data();
1534 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1535 let data = Bytes::from(std::fs::read(path).unwrap());
1536
1537 let mut rand = rng();
1538
1539 for _ in 0..100 {
1540 let mut expected_rows = 0;
1541 let mut total_rows = 0;
1542 let mut skip = false;
1543 let mut selectors = vec![];
1544
1545 while total_rows < 7300 {
1546 let row_count: usize = rand.random_range(1..100);
1547
1548 let row_count = row_count.min(7300 - total_rows);
1549
1550 selectors.push(RowSelector { row_count, skip });
1551
1552 total_rows += row_count;
1553 if !skip {
1554 expected_rows += row_count;
1555 }
1556
1557 skip = !skip;
1558 }
1559
1560 let selection = RowSelection::from(selectors);
1561
1562 let async_reader = TestReader::new(data.clone());
1563
1564 let options = ArrowReaderOptions::new().with_page_index(true);
1565 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1566 .await
1567 .unwrap();
1568
1569 assert_eq!(builder.metadata().num_row_groups(), 1);
1570
1571 let col_idx: usize = rand.random_range(0..13);
1572 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1573
1574 let stream = builder
1575 .with_projection(mask.clone())
1576 .with_row_selection(selection.clone())
1577 .build()
1578 .expect("building stream");
1579
1580 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1581
1582 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1583
1584 assert_eq!(actual_rows, expected_rows);
1585 }
1586 }
1587
1588 #[tokio::test]
1589 async fn test_async_reader_zero_row_selector() {
1590 let testdata = arrow::util::test_util::parquet_test_data();
1592 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1593 let data = Bytes::from(std::fs::read(path).unwrap());
1594
1595 let mut rand = rng();
1596
1597 let mut expected_rows = 0;
1598 let mut total_rows = 0;
1599 let mut skip = false;
1600 let mut selectors = vec![];
1601
1602 selectors.push(RowSelector {
1603 row_count: 0,
1604 skip: false,
1605 });
1606
1607 while total_rows < 7300 {
1608 let row_count: usize = rand.random_range(1..100);
1609
1610 let row_count = row_count.min(7300 - total_rows);
1611
1612 selectors.push(RowSelector { row_count, skip });
1613
1614 total_rows += row_count;
1615 if !skip {
1616 expected_rows += row_count;
1617 }
1618
1619 skip = !skip;
1620 }
1621
1622 let selection = RowSelection::from(selectors);
1623
1624 let async_reader = TestReader::new(data.clone());
1625
1626 let options = ArrowReaderOptions::new().with_page_index(true);
1627 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1628 .await
1629 .unwrap();
1630
1631 assert_eq!(builder.metadata().num_row_groups(), 1);
1632
1633 let col_idx: usize = rand.random_range(0..13);
1634 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1635
1636 let stream = builder
1637 .with_projection(mask.clone())
1638 .with_row_selection(selection.clone())
1639 .build()
1640 .expect("building stream");
1641
1642 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1643
1644 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1645
1646 assert_eq!(actual_rows, expected_rows);
1647 }
1648
1649 #[tokio::test]
1650 async fn test_row_filter() {
1651 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1652 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1653 let data = RecordBatch::try_from_iter([
1654 ("a", Arc::new(a) as ArrayRef),
1655 ("b", Arc::new(b) as ArrayRef),
1656 ])
1657 .unwrap();
1658
1659 let mut buf = Vec::with_capacity(1024);
1660 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1661 writer.write(&data).unwrap();
1662 writer.close().unwrap();
1663
1664 let data: Bytes = buf.into();
1665 let metadata = ParquetMetaDataReader::new()
1666 .parse_and_finish(&data)
1667 .unwrap();
1668 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1669
1670 let test = TestReader::new(data);
1671 let requests = test.requests.clone();
1672
1673 let a_scalar = StringArray::from_iter_values(["b"]);
1674 let a_filter = ArrowPredicateFn::new(
1675 ProjectionMask::leaves(&parquet_schema, vec![0]),
1676 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1677 );
1678
1679 let filter = RowFilter::new(vec![Box::new(a_filter)]);
1680
1681 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1682 let stream = ParquetRecordBatchStreamBuilder::new(test)
1683 .await
1684 .unwrap()
1685 .with_projection(mask.clone())
1686 .with_batch_size(1024)
1687 .with_row_filter(filter)
1688 .build()
1689 .unwrap();
1690
1691 let batches: Vec<_> = stream.try_collect().await.unwrap();
1692 assert_eq!(batches.len(), 1);
1693
1694 let batch = &batches[0];
1695 assert_eq!(batch.num_columns(), 2);
1696
1697 assert_eq!(
1699 batch.column(0).as_ref(),
1700 &StringArray::from_iter_values(["b", "b", "b"])
1701 );
1702 assert_eq!(
1703 batch.column(1).as_ref(),
1704 &StringArray::from_iter_values(["2", "3", "4"])
1705 );
1706
1707 assert_eq!(requests.lock().unwrap().len(), 2);
1711 }
1712
1713 #[tokio::test]
1714 async fn test_two_row_filters() {
1715 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1716 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1717 let c = Int32Array::from_iter(0..6);
1718 let data = RecordBatch::try_from_iter([
1719 ("a", Arc::new(a) as ArrayRef),
1720 ("b", Arc::new(b) as ArrayRef),
1721 ("c", Arc::new(c) as ArrayRef),
1722 ])
1723 .unwrap();
1724
1725 let mut buf = Vec::with_capacity(1024);
1726 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1727 writer.write(&data).unwrap();
1728 writer.close().unwrap();
1729
1730 let data: Bytes = buf.into();
1731 let metadata = ParquetMetaDataReader::new()
1732 .parse_and_finish(&data)
1733 .unwrap();
1734 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1735
1736 let test = TestReader::new(data);
1737 let requests = test.requests.clone();
1738
1739 let a_scalar = StringArray::from_iter_values(["b"]);
1740 let a_filter = ArrowPredicateFn::new(
1741 ProjectionMask::leaves(&parquet_schema, vec![0]),
1742 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1743 );
1744
1745 let b_scalar = StringArray::from_iter_values(["4"]);
1746 let b_filter = ArrowPredicateFn::new(
1747 ProjectionMask::leaves(&parquet_schema, vec![1]),
1748 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1749 );
1750
1751 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1752
1753 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1754 let stream = ParquetRecordBatchStreamBuilder::new(test)
1755 .await
1756 .unwrap()
1757 .with_projection(mask.clone())
1758 .with_batch_size(1024)
1759 .with_row_filter(filter)
1760 .build()
1761 .unwrap();
1762
1763 let batches: Vec<_> = stream.try_collect().await.unwrap();
1764 assert_eq!(batches.len(), 1);
1765
1766 let batch = &batches[0];
1767 assert_eq!(batch.num_rows(), 1);
1768 assert_eq!(batch.num_columns(), 2);
1769
1770 let col = batch.column(0);
1771 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1772 assert_eq!(val, "b");
1773
1774 let col = batch.column(1);
1775 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1776 assert_eq!(val, 3);
1777
1778 assert_eq!(requests.lock().unwrap().len(), 3);
1783 }
1784
1785 #[tokio::test]
1786 async fn test_limit_multiple_row_groups() {
1787 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1788 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1789 let c = Int32Array::from_iter(0..6);
1790 let data = RecordBatch::try_from_iter([
1791 ("a", Arc::new(a) as ArrayRef),
1792 ("b", Arc::new(b) as ArrayRef),
1793 ("c", Arc::new(c) as ArrayRef),
1794 ])
1795 .unwrap();
1796
1797 let mut buf = Vec::with_capacity(1024);
1798 let props = WriterProperties::builder()
1799 .set_max_row_group_size(3)
1800 .build();
1801 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1802 writer.write(&data).unwrap();
1803 writer.close().unwrap();
1804
1805 let data: Bytes = buf.into();
1806 let metadata = ParquetMetaDataReader::new()
1807 .parse_and_finish(&data)
1808 .unwrap();
1809
1810 assert_eq!(metadata.num_row_groups(), 2);
1811
1812 let test = TestReader::new(data);
1813
1814 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1815 .await
1816 .unwrap()
1817 .with_batch_size(1024)
1818 .with_limit(4)
1819 .build()
1820 .unwrap();
1821
1822 let batches: Vec<_> = stream.try_collect().await.unwrap();
1823 assert_eq!(batches.len(), 2);
1825
1826 let batch = &batches[0];
1827 assert_eq!(batch.num_rows(), 3);
1829 assert_eq!(batch.num_columns(), 3);
1830 let col2 = batch.column(2).as_primitive::<Int32Type>();
1831 assert_eq!(col2.values(), &[0, 1, 2]);
1832
1833 let batch = &batches[1];
1834 assert_eq!(batch.num_rows(), 1);
1836 assert_eq!(batch.num_columns(), 3);
1837 let col2 = batch.column(2).as_primitive::<Int32Type>();
1838 assert_eq!(col2.values(), &[3]);
1839
1840 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1841 .await
1842 .unwrap()
1843 .with_offset(2)
1844 .with_limit(3)
1845 .build()
1846 .unwrap();
1847
1848 let batches: Vec<_> = stream.try_collect().await.unwrap();
1849 assert_eq!(batches.len(), 2);
1851
1852 let batch = &batches[0];
1853 assert_eq!(batch.num_rows(), 1);
1855 assert_eq!(batch.num_columns(), 3);
1856 let col2 = batch.column(2).as_primitive::<Int32Type>();
1857 assert_eq!(col2.values(), &[2]);
1858
1859 let batch = &batches[1];
1860 assert_eq!(batch.num_rows(), 2);
1862 assert_eq!(batch.num_columns(), 3);
1863 let col2 = batch.column(2).as_primitive::<Int32Type>();
1864 assert_eq!(col2.values(), &[3, 4]);
1865
1866 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1867 .await
1868 .unwrap()
1869 .with_offset(4)
1870 .with_limit(20)
1871 .build()
1872 .unwrap();
1873
1874 let batches: Vec<_> = stream.try_collect().await.unwrap();
1875 assert_eq!(batches.len(), 1);
1877
1878 let batch = &batches[0];
1879 assert_eq!(batch.num_rows(), 2);
1881 assert_eq!(batch.num_columns(), 3);
1882 let col2 = batch.column(2).as_primitive::<Int32Type>();
1883 assert_eq!(col2.values(), &[4, 5]);
1884 }
1885
1886 #[tokio::test]
1887 async fn test_row_filter_with_index() {
1888 let testdata = arrow::util::test_util::parquet_test_data();
1889 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1890 let data = Bytes::from(std::fs::read(path).unwrap());
1891
1892 let metadata = ParquetMetaDataReader::new()
1893 .parse_and_finish(&data)
1894 .unwrap();
1895 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1896
1897 assert_eq!(metadata.num_row_groups(), 1);
1898
1899 let async_reader = TestReader::new(data.clone());
1900
1901 let a_filter =
1902 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1903 Ok(batch.column(0).as_boolean().clone())
1904 });
1905
1906 let b_scalar = Int8Array::from(vec![2]);
1907 let b_filter = ArrowPredicateFn::new(
1908 ProjectionMask::leaves(&parquet_schema, vec![2]),
1909 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1910 );
1911
1912 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1913
1914 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1915
1916 let options = ArrowReaderOptions::new().with_page_index(true);
1917 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1918 .await
1919 .unwrap()
1920 .with_projection(mask.clone())
1921 .with_batch_size(1024)
1922 .with_row_filter(filter)
1923 .build()
1924 .unwrap();
1925
1926 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1927
1928 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1929
1930 assert_eq!(total_rows, 730);
1931 }
1932
1933 #[tokio::test]
1934 async fn test_in_memory_row_group_sparse() {
1935 let testdata = arrow::util::test_util::parquet_test_data();
1936 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1937 let data = Bytes::from(std::fs::read(path).unwrap());
1938
1939 let metadata = ParquetMetaDataReader::new()
1940 .with_page_indexes(true)
1941 .parse_and_finish(&data)
1942 .unwrap();
1943
1944 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1945
1946 let mut metadata_builder = metadata.into_builder();
1947 let mut row_groups = metadata_builder.take_row_groups();
1948 row_groups.truncate(1);
1949 let row_group_meta = row_groups.pop().unwrap();
1950
1951 let metadata = metadata_builder
1952 .add_row_group(row_group_meta)
1953 .set_column_index(None)
1954 .set_offset_index(Some(vec![offset_index.clone()]))
1955 .build();
1956
1957 let metadata = Arc::new(metadata);
1958
1959 let num_rows = metadata.row_group(0).num_rows();
1960
1961 assert_eq!(metadata.num_row_groups(), 1);
1962
1963 let async_reader = TestReader::new(data.clone());
1964
1965 let requests = async_reader.requests.clone();
1966 let (_, fields) = parquet_to_arrow_schema_and_fields(
1967 metadata.file_metadata().schema_descr(),
1968 ProjectionMask::all(),
1969 None,
1970 )
1971 .unwrap();
1972
1973 let _schema_desc = metadata.file_metadata().schema_descr();
1974
1975 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1976
1977 let reader_factory = ReaderFactory {
1978 metadata,
1979 fields: fields.map(Arc::new),
1980 input: async_reader,
1981 filter: None,
1982 limit: None,
1983 offset: None,
1984 metrics: ArrowReaderMetrics::disabled(),
1985 max_predicate_cache_size: 0,
1986 };
1987
1988 let mut skip = true;
1989 let mut pages = offset_index[0].page_locations.iter().peekable();
1990
1991 let mut selectors = vec![];
1993 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1994 while let Some(page) = pages.next() {
1995 let num_rows = if let Some(next_page) = pages.peek() {
1996 next_page.first_row_index - page.first_row_index
1997 } else {
1998 num_rows - page.first_row_index
1999 };
2000
2001 if skip {
2002 selectors.push(RowSelector::skip(num_rows as usize));
2003 } else {
2004 selectors.push(RowSelector::select(num_rows as usize));
2005 let start = page.offset as usize;
2006 let end = start + page.compressed_page_size as usize;
2007 expected_page_requests.push(start..end);
2008 }
2009 skip = !skip;
2010 }
2011
2012 let selection = RowSelection::from(selectors);
2013
2014 let (_factory, _reader) = reader_factory
2015 .read_row_group(0, Some(selection), projection.clone(), 48)
2016 .await
2017 .expect("reading row group");
2018
2019 let requests = requests.lock().unwrap();
2020
2021 assert_eq!(&requests[..], &expected_page_requests)
2022 }
2023
2024 #[tokio::test]
2025 async fn test_batch_size_overallocate() {
2026 let testdata = arrow::util::test_util::parquet_test_data();
2027 let path = format!("{testdata}/alltypes_plain.parquet");
2029 let data = Bytes::from(std::fs::read(path).unwrap());
2030
2031 let async_reader = TestReader::new(data.clone());
2032
2033 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2034 .await
2035 .unwrap();
2036
2037 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2038
2039 let stream = builder
2040 .with_projection(ProjectionMask::all())
2041 .with_batch_size(1024)
2042 .build()
2043 .unwrap();
2044 assert_ne!(1024, file_rows);
2045 assert_eq!(stream.batch_size, file_rows);
2046 }
2047
2048 #[tokio::test]
2049 async fn test_get_row_group_column_bloom_filter_without_length() {
2050 let testdata = arrow::util::test_util::parquet_test_data();
2051 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2052 let data = Bytes::from(std::fs::read(path).unwrap());
2053 test_get_row_group_column_bloom_filter(data, false).await;
2054 }
2055
2056 #[tokio::test]
2057 async fn test_parquet_record_batch_stream_schema() {
2058 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2059 schema.flattened_fields().iter().map(|f| f.name()).collect()
2060 }
2061
2062 let mut metadata = HashMap::with_capacity(1);
2071 metadata.insert("key".to_string(), "value".to_string());
2072
2073 let nested_struct_array = StructArray::from(vec![
2074 (
2075 Arc::new(Field::new("d", DataType::Utf8, true)),
2076 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2077 ),
2078 (
2079 Arc::new(Field::new("e", DataType::Utf8, true)),
2080 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2081 ),
2082 ]);
2083 let struct_array = StructArray::from(vec![
2084 (
2085 Arc::new(Field::new("a", DataType::Int32, true)),
2086 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2087 ),
2088 (
2089 Arc::new(Field::new("b", DataType::UInt64, true)),
2090 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2091 ),
2092 (
2093 Arc::new(Field::new(
2094 "c",
2095 nested_struct_array.data_type().clone(),
2096 true,
2097 )),
2098 Arc::new(nested_struct_array) as ArrayRef,
2099 ),
2100 ]);
2101
2102 let schema =
2103 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2104 let record_batch = RecordBatch::from(struct_array)
2105 .with_schema(schema.clone())
2106 .unwrap();
2107
2108 let mut file = tempfile().unwrap();
2110 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2111 writer.write(&record_batch).unwrap();
2112 writer.close().unwrap();
2113
2114 let all_fields = ["a", "b", "c", "d", "e"];
2115 let projections = [
2117 (vec![], vec![]),
2118 (vec![0], vec!["a"]),
2119 (vec![0, 1], vec!["a", "b"]),
2120 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2121 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2122 ];
2123
2124 for (indices, expected_projected_names) in projections {
2126 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2127 assert_eq!(get_all_field_names(&builder), all_fields);
2129 assert_eq!(builder.metadata, metadata);
2130 assert_eq!(get_all_field_names(&reader), expected_projected_names);
2132 assert_eq!(reader.metadata, HashMap::default());
2133 assert_eq!(get_all_field_names(&batch), expected_projected_names);
2134 assert_eq!(batch.metadata, HashMap::default());
2135 };
2136
2137 let builder =
2138 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2139 let sync_builder_schema = builder.schema().clone();
2140 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2141 let mut reader = builder.with_projection(mask).build().unwrap();
2142 let sync_reader_schema = reader.schema();
2143 let batch = reader.next().unwrap().unwrap();
2144 let sync_batch_schema = batch.schema();
2145 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2146
2147 let file = tokio::fs::File::from(file.try_clone().unwrap());
2149 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2150 let async_builder_schema = builder.schema().clone();
2151 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2152 let mut reader = builder.with_projection(mask).build().unwrap();
2153 let async_reader_schema = reader.schema().clone();
2154 let batch = reader.next().await.unwrap().unwrap();
2155 let async_batch_schema = batch.schema();
2156 assert_schemas(
2157 async_builder_schema,
2158 async_reader_schema,
2159 async_batch_schema,
2160 );
2161 }
2162 }
2163
2164 #[tokio::test]
2165 async fn test_get_row_group_column_bloom_filter_with_length() {
2166 let testdata = arrow::util::test_util::parquet_test_data();
2168 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2169 let data = Bytes::from(std::fs::read(path).unwrap());
2170 let async_reader = TestReader::new(data.clone());
2171 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2172 .await
2173 .unwrap();
2174 let schema = builder.schema().clone();
2175 let stream = builder.build().unwrap();
2176 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2177
2178 let mut parquet_data = Vec::new();
2179 let props = WriterProperties::builder()
2180 .set_bloom_filter_enabled(true)
2181 .build();
2182 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2183 for batch in batches {
2184 writer.write(&batch).unwrap();
2185 }
2186 writer.close().unwrap();
2187
2188 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2190 }
2191
2192 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2193 let async_reader = TestReader::new(data.clone());
2194
2195 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2196 .await
2197 .unwrap();
2198
2199 let metadata = builder.metadata();
2200 assert_eq!(metadata.num_row_groups(), 1);
2201 let row_group = metadata.row_group(0);
2202 let column = row_group.column(0);
2203 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2204
2205 let sbbf = builder
2206 .get_row_group_column_bloom_filter(0, 0)
2207 .await
2208 .unwrap()
2209 .unwrap();
2210 assert!(sbbf.check(&"Hello"));
2211 assert!(!sbbf.check(&"Hello_Not_Exists"));
2212 }
2213
2214 #[tokio::test]
2215 async fn test_nested_skip() {
2216 let schema = Arc::new(Schema::new(vec![
2217 Field::new("col_1", DataType::UInt64, false),
2218 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2219 ]));
2220
2221 let props = WriterProperties::builder()
2223 .set_data_page_row_count_limit(256)
2224 .set_write_batch_size(256)
2225 .set_max_row_group_size(1024);
2226
2227 let mut file = tempfile().unwrap();
2229 let mut writer =
2230 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2231
2232 let mut builder = ListBuilder::new(StringBuilder::new());
2233 for id in 0..1024 {
2234 match id % 3 {
2235 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2236 1 => builder.append_value([Some(format!("id_{id}"))]),
2237 _ => builder.append_null(),
2238 }
2239 }
2240 let refs = vec![
2241 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2242 Arc::new(builder.finish()) as ArrayRef,
2243 ];
2244
2245 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2246 writer.write(&batch).unwrap();
2247 writer.close().unwrap();
2248
2249 let selections = [
2250 RowSelection::from(vec![
2251 RowSelector::skip(313),
2252 RowSelector::select(1),
2253 RowSelector::skip(709),
2254 RowSelector::select(1),
2255 ]),
2256 RowSelection::from(vec![
2257 RowSelector::skip(255),
2258 RowSelector::select(1),
2259 RowSelector::skip(767),
2260 RowSelector::select(1),
2261 ]),
2262 RowSelection::from(vec![
2263 RowSelector::select(255),
2264 RowSelector::skip(1),
2265 RowSelector::select(767),
2266 RowSelector::skip(1),
2267 ]),
2268 RowSelection::from(vec![
2269 RowSelector::skip(254),
2270 RowSelector::select(1),
2271 RowSelector::select(1),
2272 RowSelector::skip(767),
2273 RowSelector::select(1),
2274 ]),
2275 ];
2276
2277 for selection in selections {
2278 let expected = selection.row_count();
2279 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2281 tokio::fs::File::from_std(file.try_clone().unwrap()),
2282 ArrowReaderOptions::new().with_page_index(true),
2283 )
2284 .await
2285 .unwrap();
2286
2287 reader = reader.with_row_selection(selection);
2288
2289 let mut stream = reader.build().unwrap();
2290
2291 let mut total_rows = 0;
2292 while let Some(rb) = stream.next().await {
2293 let rb = rb.unwrap();
2294 total_rows += rb.num_rows();
2295 }
2296 assert_eq!(total_rows, expected);
2297 }
2298 }
2299
2300 #[tokio::test]
2301 async fn test_row_filter_nested() {
2302 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2303 let b = StructArray::from(vec![
2304 (
2305 Arc::new(Field::new("aa", DataType::Utf8, true)),
2306 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2307 ),
2308 (
2309 Arc::new(Field::new("bb", DataType::Utf8, true)),
2310 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2311 ),
2312 ]);
2313 let c = Int32Array::from_iter(0..6);
2314 let data = RecordBatch::try_from_iter([
2315 ("a", Arc::new(a) as ArrayRef),
2316 ("b", Arc::new(b) as ArrayRef),
2317 ("c", Arc::new(c) as ArrayRef),
2318 ])
2319 .unwrap();
2320
2321 let mut buf = Vec::with_capacity(1024);
2322 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2323 writer.write(&data).unwrap();
2324 writer.close().unwrap();
2325
2326 let data: Bytes = buf.into();
2327 let metadata = ParquetMetaDataReader::new()
2328 .parse_and_finish(&data)
2329 .unwrap();
2330 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2331
2332 let test = TestReader::new(data);
2333 let requests = test.requests.clone();
2334
2335 let a_scalar = StringArray::from_iter_values(["b"]);
2336 let a_filter = ArrowPredicateFn::new(
2337 ProjectionMask::leaves(&parquet_schema, vec![0]),
2338 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2339 );
2340
2341 let b_scalar = StringArray::from_iter_values(["4"]);
2342 let b_filter = ArrowPredicateFn::new(
2343 ProjectionMask::leaves(&parquet_schema, vec![2]),
2344 move |batch| {
2345 let struct_array = batch
2347 .column(0)
2348 .as_any()
2349 .downcast_ref::<StructArray>()
2350 .unwrap();
2351 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2352 },
2353 );
2354
2355 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2356
2357 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2358 let stream = ParquetRecordBatchStreamBuilder::new(test)
2359 .await
2360 .unwrap()
2361 .with_projection(mask.clone())
2362 .with_batch_size(1024)
2363 .with_row_filter(filter)
2364 .build()
2365 .unwrap();
2366
2367 let batches: Vec<_> = stream.try_collect().await.unwrap();
2368 assert_eq!(batches.len(), 1);
2369
2370 let batch = &batches[0];
2371 assert_eq!(batch.num_rows(), 1);
2372 assert_eq!(batch.num_columns(), 2);
2373
2374 let col = batch.column(0);
2375 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2376 assert_eq!(val, "b");
2377
2378 let col = batch.column(1);
2379 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2380 assert_eq!(val, 3);
2381
2382 assert_eq!(requests.lock().unwrap().len(), 3);
2387 }
2388
2389 #[tokio::test]
2390 async fn test_cache_projection_excludes_nested_columns() {
2391 use arrow_array::{ArrayRef, StringArray};
2392
2393 let a = StringArray::from_iter_values(["r1", "r2"]);
2395 let b = StructArray::from(vec![
2396 (
2397 Arc::new(Field::new("aa", DataType::Utf8, true)),
2398 Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2399 ),
2400 (
2401 Arc::new(Field::new("bb", DataType::Utf8, true)),
2402 Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2403 ),
2404 ]);
2405
2406 let schema = Arc::new(Schema::new(vec![
2407 Field::new("a", DataType::Utf8, true),
2408 Field::new("b", b.data_type().clone(), true),
2409 ]));
2410
2411 let mut buf = Vec::new();
2412 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2413 let batch = RecordBatch::try_from_iter([
2414 ("a", Arc::new(a) as ArrayRef),
2415 ("b", Arc::new(b) as ArrayRef),
2416 ])
2417 .unwrap();
2418 writer.write(&batch).unwrap();
2419 writer.close().unwrap();
2420
2421 let data: Bytes = buf.into();
2423 let metadata = ParquetMetaDataReader::new()
2424 .parse_and_finish(&data)
2425 .unwrap();
2426 let metadata = Arc::new(metadata);
2427
2428 let parquet_schema = metadata.file_metadata().schema_descr();
2431 let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2432
2433 let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2434 Ok(arrow_array::BooleanArray::from(vec![
2435 true;
2436 batch.num_rows()
2437 ]))
2438 });
2439 let filter = RowFilter::new(vec![Box::new(always_true)]);
2440
2441 let reader_factory = ReaderFactory {
2443 metadata: Arc::clone(&metadata),
2444 fields: None,
2445 input: TestReader::new(data),
2446 filter: Some(filter),
2447 limit: None,
2448 offset: None,
2449 metrics: ArrowReaderMetrics::disabled(),
2450 max_predicate_cache_size: 0,
2451 };
2452
2453 let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2455
2456 assert!(cache_projection.is_none());
2458 }
2459
2460 #[tokio::test]
2461 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2462 use tokio::fs::File;
2463 let testdata = arrow::util::test_util::parquet_test_data();
2464 let path = format!("{testdata}/alltypes_plain.parquet");
2465 let mut file = File::open(&path).await.unwrap();
2466 let file_size = file.metadata().await.unwrap().len();
2467 let mut metadata = ParquetMetaDataReader::new()
2468 .with_page_indexes(true)
2469 .load_and_finish(&mut file, file_size)
2470 .await
2471 .unwrap();
2472
2473 metadata.set_offset_index(Some(vec![]));
2474 let options = ArrowReaderOptions::new().with_page_index(true);
2475 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2476 let reader =
2477 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2478 .build()
2479 .unwrap();
2480
2481 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2482 assert_eq!(result.len(), 1);
2483 }
2484
2485 #[tokio::test]
2486 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2487 use tokio::fs::File;
2488 let testdata = arrow::util::test_util::parquet_test_data();
2489 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2490 let mut file = File::open(&path).await.unwrap();
2491 let file_size = file.metadata().await.unwrap().len();
2492 let metadata = ParquetMetaDataReader::new()
2493 .with_page_indexes(true)
2494 .load_and_finish(&mut file, file_size)
2495 .await
2496 .unwrap();
2497
2498 let options = ArrowReaderOptions::new().with_page_index(true);
2499 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2500 let reader =
2501 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2502 .build()
2503 .unwrap();
2504
2505 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2506 assert_eq!(result.len(), 8);
2507 }
2508
2509 #[tokio::test]
2510 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2511 use tempfile::TempDir;
2512 use tokio::fs::File;
2513 fn write_metadata_to_local_file(
2514 metadata: ParquetMetaData,
2515 file: impl AsRef<std::path::Path>,
2516 ) {
2517 use crate::file::metadata::ParquetMetaDataWriter;
2518 use std::fs::File;
2519 let file = File::create(file).unwrap();
2520 ParquetMetaDataWriter::new(file, &metadata)
2521 .finish()
2522 .unwrap()
2523 }
2524
2525 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2526 use std::fs::File;
2527 let file = File::open(file).unwrap();
2528 ParquetMetaDataReader::new()
2529 .with_page_indexes(true)
2530 .parse_and_finish(&file)
2531 .unwrap()
2532 }
2533
2534 let testdata = arrow::util::test_util::parquet_test_data();
2535 let path = format!("{testdata}/alltypes_plain.parquet");
2536 let mut file = File::open(&path).await.unwrap();
2537 let file_size = file.metadata().await.unwrap().len();
2538 let metadata = ParquetMetaDataReader::new()
2539 .with_page_indexes(true)
2540 .load_and_finish(&mut file, file_size)
2541 .await
2542 .unwrap();
2543
2544 let tempdir = TempDir::new().unwrap();
2545 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2546 write_metadata_to_local_file(metadata, &metadata_path);
2547 let metadata = read_metadata_from_local_file(&metadata_path);
2548
2549 let options = ArrowReaderOptions::new().with_page_index(true);
2550 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2551 let reader =
2552 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2553 .build()
2554 .unwrap();
2555
2556 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2558 assert_eq!(result.len(), 1);
2559 }
2560
2561 #[tokio::test]
2562 async fn test_cached_array_reader_sparse_offset_error() {
2563 use futures::TryStreamExt;
2564
2565 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2566 use arrow_array::{BooleanArray, RecordBatch};
2567
2568 let testdata = arrow::util::test_util::parquet_test_data();
2569 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2570 let data = Bytes::from(std::fs::read(path).unwrap());
2571
2572 let async_reader = TestReader::new(data);
2573
2574 let options = ArrowReaderOptions::new().with_page_index(true);
2576 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2577 .await
2578 .unwrap();
2579
2580 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2584
2585 let parquet_schema = builder.parquet_schema();
2589 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2590 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2591 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2592 });
2593 let filter = RowFilter::new(vec![Box::new(always_true)]);
2594
2595 let stream = builder
2598 .with_batch_size(8)
2599 .with_projection(proj)
2600 .with_row_selection(selection)
2601 .with_row_filter(filter)
2602 .build()
2603 .unwrap();
2604
2605 let _result: Vec<_> = stream.try_collect().await.unwrap();
2608 }
2609}