1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42 ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46 RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72pub trait AsyncFileReader: Send {
86 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91 async move {
92 let mut result = Vec::with_capacity(ranges.len());
93
94 for range in ranges.into_iter() {
95 let data = self.get_bytes(range).await?;
96 result.push(data);
97 }
98
99 Ok(result)
100 }
101 .boxed()
102 }
103
104 fn get_metadata<'a>(
121 &'a mut self,
122 options: Option<&'a ArrowReaderOptions>,
123 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129 self.as_mut().get_bytes(range)
130 }
131
132 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133 self.as_mut().get_byte_ranges(ranges)
134 }
135
136 fn get_metadata<'a>(
137 &'a mut self,
138 options: Option<&'a ArrowReaderOptions>,
139 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140 self.as_mut().get_metadata(options)
141 }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146 async move {
147 self.seek(SeekFrom::End(-(suffix as i64))).await?;
148 let mut buf = Vec::with_capacity(suffix);
149 self.take(suffix as _).read_to_end(&mut buf).await?;
150 Ok(buf.into())
151 }
152 .boxed()
153 }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158 async move {
159 self.seek(SeekFrom::Start(range.start)).await?;
160
161 let to_read = range.end - range.start;
162 let mut buffer = Vec::with_capacity(to_read.try_into()?);
163 let read = self.take(to_read).read_to_end(&mut buffer).await?;
164 if read as u64 != to_read {
165 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166 }
167
168 Ok(buffer.into())
169 }
170 .boxed()
171 }
172
173 fn get_metadata<'a>(
174 &'a mut self,
175 options: Option<&'a ArrowReaderOptions>,
176 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177 async move {
178 let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
179 PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
180 );
181
182 #[cfg(feature = "encryption")]
183 let metadata_reader = metadata_reader.with_decryption_properties(
184 options.and_then(|o| o.file_decryption_properties.as_ref()),
185 );
186
187 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
188 Ok(Arc::new(parquet_metadata))
189 }
190 .boxed()
191 }
192}
193
194impl ArrowReaderMetadata {
195 pub async fn load_async<T: AsyncFileReader>(
199 input: &mut T,
200 options: ArrowReaderOptions,
201 ) -> Result<Self> {
202 let metadata = input.get_metadata(Some(&options)).await?;
203 Self::try_new(metadata, options)
204 }
205}
206
207#[doc(hidden)]
208pub struct AsyncReader<T>(T);
213
214pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
227
228impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
229 pub async fn new(input: T) -> Result<Self> {
360 Self::new_with_options(input, Default::default()).await
361 }
362
363 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
366 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
367 Ok(Self::new_with_metadata(input, metadata))
368 }
369
370 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
416 Self::new_builder(AsyncReader(input), metadata)
417 }
418
419 pub async fn get_row_group_column_bloom_filter(
425 &mut self,
426 row_group_idx: usize,
427 column_idx: usize,
428 ) -> Result<Option<Sbbf>> {
429 let metadata = self.metadata.row_group(row_group_idx);
430 let column_metadata = metadata.column(column_idx);
431
432 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
433 offset
434 .try_into()
435 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
436 } else {
437 return Ok(None);
438 };
439
440 let buffer = match column_metadata.bloom_filter_length() {
441 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
442 None => self
443 .input
444 .0
445 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
446 }
447 .await?;
448
449 let (header, bitset_offset) =
450 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
451
452 match header.algorithm {
453 BloomFilterAlgorithm::BLOCK(_) => {
454 }
456 }
457 match header.compression {
458 BloomFilterCompression::UNCOMPRESSED(_) => {
459 }
461 }
462 match header.hash {
463 BloomFilterHash::XXHASH(_) => {
464 }
466 }
467
468 let bitset = match column_metadata.bloom_filter_length() {
469 Some(_) => buffer.slice(
470 (TryInto::<usize>::try_into(bitset_offset).unwrap()
471 - TryInto::<usize>::try_into(offset).unwrap())..,
472 ),
473 None => {
474 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
475 ParquetError::General("Bloom filter length is invalid".to_string())
476 })?;
477 self.input
478 .0
479 .get_bytes(bitset_offset..bitset_offset + bitset_length)
480 .await?
481 }
482 };
483 Ok(Some(Sbbf::new(&bitset)))
484 }
485
486 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
490 let num_row_groups = self.metadata.row_groups().len();
491
492 let row_groups = match self.row_groups {
493 Some(row_groups) => {
494 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
495 return Err(general_err!(
496 "row group {} out of bounds 0..{}",
497 col,
498 num_row_groups
499 ));
500 }
501 row_groups.into()
502 }
503 None => (0..self.metadata.row_groups().len()).collect(),
504 };
505
506 let batch_size = self
508 .batch_size
509 .min(self.metadata.file_metadata().num_rows() as usize);
510 let reader_factory = ReaderFactory {
511 input: self.input.0,
512 filter: self.filter,
513 metadata: self.metadata.clone(),
514 fields: self.fields,
515 limit: self.limit,
516 offset: self.offset,
517 metrics: self.metrics,
518 max_predicate_cache_size: self.max_predicate_cache_size,
519 };
520
521 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
524 Some(DataType::Struct(fields)) => {
525 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
526 }
527 None => Fields::empty(),
528 _ => unreachable!("Must be Struct for root type"),
529 };
530 let schema = Arc::new(Schema::new(projected_fields));
531
532 Ok(ParquetRecordBatchStream {
533 metadata: self.metadata,
534 batch_size,
535 row_groups,
536 projection: self.projection,
537 selection: self.selection,
538 schema,
539 reader_factory: Some(reader_factory),
540 state: StreamState::Init,
541 })
542 }
543}
544
545type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
550
551struct ReaderFactory<T> {
554 metadata: Arc<ParquetMetaData>,
555
556 fields: Option<Arc<ParquetField>>,
558
559 input: T,
560
561 filter: Option<RowFilter>,
563
564 limit: Option<usize>,
566
567 offset: Option<usize>,
569
570 metrics: ArrowReaderMetrics,
572
573 max_predicate_cache_size: usize,
575}
576
577impl<T> ReaderFactory<T>
578where
579 T: AsyncFileReader + Send,
580{
581 async fn read_row_group(
587 mut self,
588 row_group_idx: usize,
589 selection: Option<RowSelection>,
590 projection: ProjectionMask,
591 batch_size: usize,
592 ) -> ReadResult<T> {
593 let meta = self.metadata.row_group(row_group_idx);
596 let offset_index = self
597 .metadata
598 .offset_index()
599 .filter(|index| !index.is_empty())
601 .map(|x| x[row_group_idx].as_slice());
602
603 let cache_projection = match self.compute_cache_projection(&projection) {
605 Some(projection) => projection,
606 None => ProjectionMask::none(meta.columns().len()),
607 };
608 let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
609 batch_size,
610 self.max_predicate_cache_size,
611 )));
612
613 let mut row_group = InMemoryRowGroup {
614 row_count: meta.num_rows() as usize,
616 column_chunks: vec![None; meta.columns().len()],
617 offset_index,
618 row_group_idx,
619 metadata: self.metadata.as_ref(),
620 };
621
622 let cache_options_builder =
623 CacheOptionsBuilder::new(&cache_projection, row_group_cache.clone());
624
625 let filter = self.filter.as_mut();
626 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
627
628 if let Some(filter) = filter {
630 let cache_options = cache_options_builder.clone().producer();
631
632 for predicate in filter.predicates.iter_mut() {
633 if !plan_builder.selects_any() {
634 return Ok((self, None)); }
636
637 let selection = plan_builder.selection();
639 let cache_mask = Some(&cache_projection);
641 row_group
642 .fetch(
643 &mut self.input,
644 predicate.projection(),
645 selection,
646 batch_size,
647 cache_mask,
648 )
649 .await?;
650
651 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
652 .with_cache_options(Some(&cache_options))
653 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
654
655 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
656 }
657 }
658
659 let rows_before = plan_builder
661 .num_rows_selected()
662 .unwrap_or(row_group.row_count);
663
664 if rows_before == 0 {
665 return Ok((self, None)); }
667
668 let plan_builder = plan_builder
670 .limited(row_group.row_count)
671 .with_offset(self.offset)
672 .with_limit(self.limit)
673 .build_limited();
674
675 let rows_after = plan_builder
676 .num_rows_selected()
677 .unwrap_or(row_group.row_count);
678
679 if let Some(offset) = &mut self.offset {
681 *offset = offset.saturating_sub(rows_before - rows_after)
684 }
685
686 if rows_after == 0 {
687 return Ok((self, None)); }
689
690 if let Some(limit) = &mut self.limit {
691 *limit -= rows_after;
692 }
693 row_group
695 .fetch(
697 &mut self.input,
698 &projection,
699 plan_builder.selection(),
700 batch_size,
701 None,
702 )
703 .await?;
704
705 let plan = plan_builder.build();
706
707 let cache_options = cache_options_builder.consumer();
708 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
709 .with_cache_options(Some(&cache_options))
710 .build_array_reader(self.fields.as_deref(), &projection)?;
711
712 let reader = ParquetRecordBatchReader::new(array_reader, plan);
713
714 Ok((self, Some(reader)))
715 }
716
717 fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
719 let filters = self.filter.as_ref()?;
720 let mut cache_projection = filters.predicates.first()?.projection().clone();
721 for predicate in filters.predicates.iter() {
722 cache_projection.union(predicate.projection());
723 }
724 cache_projection.intersect(projection);
725 self.exclude_nested_columns_from_cache(&cache_projection)
726 }
727
728 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
730 let schema = self.metadata.file_metadata().schema_descr();
731 let num_leaves = schema.num_columns();
732
733 let num_roots = schema.root_schema().get_fields().len();
735 let mut root_leaf_counts = vec![0usize; num_roots];
736 for leaf_idx in 0..num_leaves {
737 let root_idx = schema.get_column_root_idx(leaf_idx);
738 root_leaf_counts[root_idx] += 1;
739 }
740
741 let mut included_leaves = Vec::new();
743 for leaf_idx in 0..num_leaves {
744 if mask.leaf_included(leaf_idx) {
745 let root_idx = schema.get_column_root_idx(leaf_idx);
746 if root_leaf_counts[root_idx] == 1 {
747 included_leaves.push(leaf_idx);
748 }
749 }
750 }
751
752 if included_leaves.is_empty() {
753 None
754 } else {
755 Some(ProjectionMask::leaves(schema, included_leaves))
756 }
757 }
758}
759
760enum StreamState<T> {
761 Init,
763 Decoding(ParquetRecordBatchReader),
765 Reading(BoxFuture<'static, ReadResult<T>>),
767 Error,
769}
770
771impl<T> std::fmt::Debug for StreamState<T> {
772 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
773 match self {
774 StreamState::Init => write!(f, "StreamState::Init"),
775 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
776 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
777 StreamState::Error => write!(f, "StreamState::Error"),
778 }
779 }
780}
781
782pub struct ParquetRecordBatchStream<T> {
800 metadata: Arc<ParquetMetaData>,
801
802 schema: SchemaRef,
803
804 row_groups: VecDeque<usize>,
805
806 projection: ProjectionMask,
807
808 batch_size: usize,
809
810 selection: Option<RowSelection>,
811
812 reader_factory: Option<ReaderFactory<T>>,
814
815 state: StreamState<T>,
816}
817
818impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
819 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
820 f.debug_struct("ParquetRecordBatchStream")
821 .field("metadata", &self.metadata)
822 .field("schema", &self.schema)
823 .field("batch_size", &self.batch_size)
824 .field("projection", &self.projection)
825 .field("state", &self.state)
826 .finish()
827 }
828}
829
830impl<T> ParquetRecordBatchStream<T> {
831 pub fn schema(&self) -> &SchemaRef {
836 &self.schema
837 }
838}
839
840impl<T> ParquetRecordBatchStream<T>
841where
842 T: AsyncFileReader + Unpin + Send + 'static,
843{
844 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
858 loop {
859 match &mut self.state {
860 StreamState::Decoding(_) | StreamState::Reading(_) => {
861 return Err(ParquetError::General(
862 "Cannot combine the use of next_row_group with the Stream API".to_string(),
863 ))
864 }
865 StreamState::Init => {
866 let row_group_idx = match self.row_groups.pop_front() {
867 Some(idx) => idx,
868 None => return Ok(None),
869 };
870
871 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
872
873 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
874
875 let reader_factory = self.reader_factory.take().expect("lost reader factory");
876
877 let (reader_factory, maybe_reader) = reader_factory
878 .read_row_group(
879 row_group_idx,
880 selection,
881 self.projection.clone(),
882 self.batch_size,
883 )
884 .await
885 .inspect_err(|_| {
886 self.state = StreamState::Error;
887 })?;
888 self.reader_factory = Some(reader_factory);
889
890 if let Some(reader) = maybe_reader {
891 return Ok(Some(reader));
892 } else {
893 continue;
895 }
896 }
897 StreamState::Error => return Ok(None), }
899 }
900 }
901}
902
903impl<T> Stream for ParquetRecordBatchStream<T>
904where
905 T: AsyncFileReader + Unpin + Send + 'static,
906{
907 type Item = Result<RecordBatch>;
908
909 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
910 loop {
911 match &mut self.state {
912 StreamState::Decoding(batch_reader) => match batch_reader.next() {
913 Some(Ok(batch)) => {
914 return Poll::Ready(Some(Ok(batch)));
915 }
916 Some(Err(e)) => {
917 self.state = StreamState::Error;
918 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
919 }
920 None => self.state = StreamState::Init,
921 },
922 StreamState::Init => {
923 let row_group_idx = match self.row_groups.pop_front() {
924 Some(idx) => idx,
925 None => return Poll::Ready(None),
926 };
927
928 let reader = self.reader_factory.take().expect("lost reader factory");
929
930 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
931
932 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
933
934 let fut = reader
935 .read_row_group(
936 row_group_idx,
937 selection,
938 self.projection.clone(),
939 self.batch_size,
940 )
941 .boxed();
942
943 self.state = StreamState::Reading(fut)
944 }
945 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
946 Ok((reader_factory, maybe_reader)) => {
947 self.reader_factory = Some(reader_factory);
948 match maybe_reader {
949 Some(reader) => self.state = StreamState::Decoding(reader),
951 None => self.state = StreamState::Init,
953 }
954 }
955 Err(e) => {
956 self.state = StreamState::Error;
957 return Poll::Ready(Some(Err(e)));
958 }
959 },
960 StreamState::Error => return Poll::Ready(None), }
962 }
963 }
964}
965
966struct InMemoryRowGroup<'a> {
968 offset_index: Option<&'a [OffsetIndexMetaData]>,
969 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
971 row_count: usize,
972 row_group_idx: usize,
973 metadata: &'a ParquetMetaData,
974}
975
976impl InMemoryRowGroup<'_> {
977 async fn fetch<T: AsyncFileReader + Send>(
983 &mut self,
984 input: &mut T,
985 projection: &ProjectionMask,
986 selection: Option<&RowSelection>,
987 batch_size: usize,
988 cache_mask: Option<&ProjectionMask>,
989 ) -> Result<()> {
990 let metadata = self.metadata.row_group(self.row_group_idx);
991 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
992 let expanded_selection =
993 selection.expand_to_batch_boundaries(batch_size, self.row_count);
994 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
997
998 let fetch_ranges = self
999 .column_chunks
1000 .iter()
1001 .zip(metadata.columns())
1002 .enumerate()
1003 .filter(|&(idx, (chunk, _chunk_meta))| {
1004 chunk.is_none() && projection.leaf_included(idx)
1005 })
1006 .flat_map(|(idx, (_chunk, chunk_meta))| {
1007 let mut ranges: Vec<Range<u64>> = vec![];
1010 let (start, _len) = chunk_meta.byte_range();
1011 match offset_index[idx].page_locations.first() {
1012 Some(first) if first.offset as u64 != start => {
1013 ranges.push(start..first.offset as u64);
1014 }
1015 _ => (),
1016 }
1017
1018 let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1020 if use_expanded {
1021 ranges.extend(
1022 expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1023 );
1024 } else {
1025 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1026 }
1027 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1028
1029 ranges
1030 })
1031 .collect();
1032
1033 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1034 let mut page_start_offsets = page_start_offsets.into_iter();
1035
1036 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1037 if chunk.is_some() || !projection.leaf_included(idx) {
1038 continue;
1039 }
1040
1041 if let Some(offsets) = page_start_offsets.next() {
1042 let mut chunks = Vec::with_capacity(offsets.len());
1043 for _ in 0..offsets.len() {
1044 chunks.push(chunk_data.next().unwrap());
1045 }
1046
1047 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1048 length: metadata.column(idx).byte_range().1 as usize,
1049 data: offsets
1050 .into_iter()
1051 .map(|x| x as usize)
1052 .zip(chunks.into_iter())
1053 .collect(),
1054 }))
1055 }
1056 }
1057 } else {
1058 let fetch_ranges = self
1059 .column_chunks
1060 .iter()
1061 .enumerate()
1062 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1063 .map(|(idx, _chunk)| {
1064 let column = metadata.column(idx);
1065 let (start, length) = column.byte_range();
1066 start..(start + length)
1067 })
1068 .collect();
1069
1070 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1071
1072 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1073 if chunk.is_some() || !projection.leaf_included(idx) {
1074 continue;
1075 }
1076
1077 if let Some(data) = chunk_data.next() {
1078 *chunk = Some(Arc::new(ColumnChunkData::Dense {
1079 offset: metadata.column(idx).byte_range().0 as usize,
1080 data,
1081 }));
1082 }
1083 }
1084 }
1085
1086 Ok(())
1087 }
1088}
1089
1090impl RowGroups for InMemoryRowGroup<'_> {
1091 fn num_rows(&self) -> usize {
1092 self.row_count
1093 }
1094
1095 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1097 match &self.column_chunks[i] {
1098 None => Err(ParquetError::General(format!(
1099 "Invalid column index {i}, column was not fetched"
1100 ))),
1101 Some(data) => {
1102 let page_locations = self
1103 .offset_index
1104 .filter(|index| !index.is_empty())
1106 .map(|index| index[i].page_locations.clone());
1107 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1108 let page_reader = SerializedPageReader::new(
1109 data.clone(),
1110 column_chunk_metadata,
1111 self.row_count,
1112 page_locations,
1113 )?;
1114 let page_reader = page_reader.add_crypto_context(
1115 self.row_group_idx,
1116 i,
1117 self.metadata,
1118 column_chunk_metadata,
1119 )?;
1120
1121 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1122
1123 Ok(Box::new(ColumnChunkIterator {
1124 reader: Some(Ok(page_reader)),
1125 }))
1126 }
1127 }
1128 }
1129}
1130
1131#[derive(Clone)]
1133enum ColumnChunkData {
1134 Sparse {
1136 length: usize,
1138 data: Vec<(usize, Bytes)>,
1143 },
1144 Dense { offset: usize, data: Bytes },
1146}
1147
1148impl ColumnChunkData {
1149 fn get(&self, start: u64) -> Result<Bytes> {
1151 match &self {
1152 ColumnChunkData::Sparse { data, .. } => data
1153 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1154 .map(|idx| data[idx].1.clone())
1155 .map_err(|_| {
1156 ParquetError::General(format!(
1157 "Invalid offset in sparse column chunk data: {start}"
1158 ))
1159 }),
1160 ColumnChunkData::Dense { offset, data } => {
1161 let start = start as usize - *offset;
1162 Ok(data.slice(start..))
1163 }
1164 }
1165 }
1166}
1167
1168impl Length for ColumnChunkData {
1169 fn len(&self) -> u64 {
1171 match &self {
1172 ColumnChunkData::Sparse { length, .. } => *length as u64,
1173 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1174 }
1175 }
1176}
1177
1178impl ChunkReader for ColumnChunkData {
1179 type T = bytes::buf::Reader<Bytes>;
1180
1181 fn get_read(&self, start: u64) -> Result<Self::T> {
1182 Ok(self.get(start)?.reader())
1183 }
1184
1185 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1186 Ok(self.get(start)?.slice(..length))
1187 }
1188}
1189
1190struct ColumnChunkIterator {
1192 reader: Option<Result<Box<dyn PageReader>>>,
1193}
1194
1195impl Iterator for ColumnChunkIterator {
1196 type Item = Result<Box<dyn PageReader>>;
1197
1198 fn next(&mut self) -> Option<Self::Item> {
1199 self.reader.take()
1200 }
1201}
1202
1203impl PageIterator for ColumnChunkIterator {}
1204
1205#[cfg(test)]
1206mod tests {
1207 use super::*;
1208 use crate::arrow::arrow_reader::{
1209 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1210 };
1211 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1212 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1213 use crate::arrow::ArrowWriter;
1214 use crate::file::metadata::ParquetMetaDataReader;
1215 use crate::file::properties::WriterProperties;
1216 use arrow::compute::kernels::cmp::eq;
1217 use arrow::error::Result as ArrowResult;
1218 use arrow_array::builder::{ListBuilder, StringBuilder};
1219 use arrow_array::cast::AsArray;
1220 use arrow_array::types::Int32Type;
1221 use arrow_array::{
1222 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1223 StructArray, UInt64Array,
1224 };
1225 use arrow_schema::{DataType, Field, Schema};
1226 use futures::{StreamExt, TryStreamExt};
1227 use rand::{rng, Rng};
1228 use std::collections::HashMap;
1229 use std::sync::{Arc, Mutex};
1230 use tempfile::tempfile;
1231
1232 #[derive(Clone)]
1233 struct TestReader {
1234 data: Bytes,
1235 metadata: Option<Arc<ParquetMetaData>>,
1236 requests: Arc<Mutex<Vec<Range<usize>>>>,
1237 }
1238
1239 impl TestReader {
1240 fn new(data: Bytes) -> Self {
1241 Self {
1242 data,
1243 metadata: Default::default(),
1244 requests: Default::default(),
1245 }
1246 }
1247 }
1248
1249 impl AsyncFileReader for TestReader {
1250 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1251 let range = range.clone();
1252 self.requests
1253 .lock()
1254 .unwrap()
1255 .push(range.start as usize..range.end as usize);
1256 futures::future::ready(Ok(self
1257 .data
1258 .slice(range.start as usize..range.end as usize)))
1259 .boxed()
1260 }
1261
1262 fn get_metadata<'a>(
1263 &'a mut self,
1264 options: Option<&'a ArrowReaderOptions>,
1265 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1266 let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1267 PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1268 );
1269 self.metadata = Some(Arc::new(
1270 metadata_reader.parse_and_finish(&self.data).unwrap(),
1271 ));
1272 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn test_async_reader() {
1278 let testdata = arrow::util::test_util::parquet_test_data();
1279 let path = format!("{testdata}/alltypes_plain.parquet");
1280 let data = Bytes::from(std::fs::read(path).unwrap());
1281
1282 let async_reader = TestReader::new(data.clone());
1283
1284 let requests = async_reader.requests.clone();
1285 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1286 .await
1287 .unwrap();
1288
1289 let metadata = builder.metadata().clone();
1290 assert_eq!(metadata.num_row_groups(), 1);
1291
1292 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1293 let stream = builder
1294 .with_projection(mask.clone())
1295 .with_batch_size(1024)
1296 .build()
1297 .unwrap();
1298
1299 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1300
1301 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1302 .unwrap()
1303 .with_projection(mask)
1304 .with_batch_size(104)
1305 .build()
1306 .unwrap()
1307 .collect::<ArrowResult<Vec<_>>>()
1308 .unwrap();
1309
1310 assert_eq!(async_batches, sync_batches);
1311
1312 let requests = requests.lock().unwrap();
1313 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1314 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1315
1316 assert_eq!(
1317 &requests[..],
1318 &[
1319 offset_1 as usize..(offset_1 + length_1) as usize,
1320 offset_2 as usize..(offset_2 + length_2) as usize
1321 ]
1322 );
1323 }
1324
1325 #[tokio::test]
1326 async fn test_async_reader_with_next_row_group() {
1327 let testdata = arrow::util::test_util::parquet_test_data();
1328 let path = format!("{testdata}/alltypes_plain.parquet");
1329 let data = Bytes::from(std::fs::read(path).unwrap());
1330
1331 let async_reader = TestReader::new(data.clone());
1332
1333 let requests = async_reader.requests.clone();
1334 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1335 .await
1336 .unwrap();
1337
1338 let metadata = builder.metadata().clone();
1339 assert_eq!(metadata.num_row_groups(), 1);
1340
1341 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1342 let mut stream = builder
1343 .with_projection(mask.clone())
1344 .with_batch_size(1024)
1345 .build()
1346 .unwrap();
1347
1348 let mut readers = vec![];
1349 while let Some(reader) = stream.next_row_group().await.unwrap() {
1350 readers.push(reader);
1351 }
1352
1353 let async_batches: Vec<_> = readers
1354 .into_iter()
1355 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1356 .collect();
1357
1358 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1359 .unwrap()
1360 .with_projection(mask)
1361 .with_batch_size(104)
1362 .build()
1363 .unwrap()
1364 .collect::<ArrowResult<Vec<_>>>()
1365 .unwrap();
1366
1367 assert_eq!(async_batches, sync_batches);
1368
1369 let requests = requests.lock().unwrap();
1370 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1371 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1372
1373 assert_eq!(
1374 &requests[..],
1375 &[
1376 offset_1 as usize..(offset_1 + length_1) as usize,
1377 offset_2 as usize..(offset_2 + length_2) as usize
1378 ]
1379 );
1380 }
1381
1382 #[tokio::test]
1383 async fn test_async_reader_with_index() {
1384 let testdata = arrow::util::test_util::parquet_test_data();
1385 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1386 let data = Bytes::from(std::fs::read(path).unwrap());
1387
1388 let async_reader = TestReader::new(data.clone());
1389
1390 let options = ArrowReaderOptions::new().with_page_index(true);
1391 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1392 .await
1393 .unwrap();
1394
1395 let metadata_with_index = builder.metadata();
1397 assert_eq!(metadata_with_index.num_row_groups(), 1);
1398
1399 let offset_index = metadata_with_index.offset_index().unwrap();
1401 let column_index = metadata_with_index.column_index().unwrap();
1402
1403 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1404 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1405
1406 let num_columns = metadata_with_index
1407 .file_metadata()
1408 .schema_descr()
1409 .num_columns();
1410
1411 offset_index
1413 .iter()
1414 .for_each(|x| assert_eq!(x.len(), num_columns));
1415 column_index
1416 .iter()
1417 .for_each(|x| assert_eq!(x.len(), num_columns));
1418
1419 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1420 let stream = builder
1421 .with_projection(mask.clone())
1422 .with_batch_size(1024)
1423 .build()
1424 .unwrap();
1425
1426 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1427
1428 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1429 .unwrap()
1430 .with_projection(mask)
1431 .with_batch_size(1024)
1432 .build()
1433 .unwrap()
1434 .collect::<ArrowResult<Vec<_>>>()
1435 .unwrap();
1436
1437 assert_eq!(async_batches, sync_batches);
1438 }
1439
1440 #[tokio::test]
1441 async fn test_async_reader_with_limit() {
1442 let testdata = arrow::util::test_util::parquet_test_data();
1443 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1444 let data = Bytes::from(std::fs::read(path).unwrap());
1445
1446 let metadata = ParquetMetaDataReader::new()
1447 .parse_and_finish(&data)
1448 .unwrap();
1449 let metadata = Arc::new(metadata);
1450
1451 assert_eq!(metadata.num_row_groups(), 1);
1452
1453 let async_reader = TestReader::new(data.clone());
1454
1455 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1456 .await
1457 .unwrap();
1458
1459 assert_eq!(builder.metadata().num_row_groups(), 1);
1460
1461 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1462 let stream = builder
1463 .with_projection(mask.clone())
1464 .with_batch_size(1024)
1465 .with_limit(1)
1466 .build()
1467 .unwrap();
1468
1469 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1470
1471 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1472 .unwrap()
1473 .with_projection(mask)
1474 .with_batch_size(1024)
1475 .with_limit(1)
1476 .build()
1477 .unwrap()
1478 .collect::<ArrowResult<Vec<_>>>()
1479 .unwrap();
1480
1481 assert_eq!(async_batches, sync_batches);
1482 }
1483
1484 #[tokio::test]
1485 async fn test_async_reader_skip_pages() {
1486 let testdata = arrow::util::test_util::parquet_test_data();
1487 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1488 let data = Bytes::from(std::fs::read(path).unwrap());
1489
1490 let async_reader = TestReader::new(data.clone());
1491
1492 let options = ArrowReaderOptions::new().with_page_index(true);
1493 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1494 .await
1495 .unwrap();
1496
1497 assert_eq!(builder.metadata().num_row_groups(), 1);
1498
1499 let selection = RowSelection::from(vec![
1500 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1509
1510 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1511
1512 let stream = builder
1513 .with_projection(mask.clone())
1514 .with_row_selection(selection.clone())
1515 .build()
1516 .expect("building stream");
1517
1518 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1519
1520 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1521 .unwrap()
1522 .with_projection(mask)
1523 .with_batch_size(1024)
1524 .with_row_selection(selection)
1525 .build()
1526 .unwrap()
1527 .collect::<ArrowResult<Vec<_>>>()
1528 .unwrap();
1529
1530 assert_eq!(async_batches, sync_batches);
1531 }
1532
1533 #[tokio::test]
1534 async fn test_fuzz_async_reader_selection() {
1535 let testdata = arrow::util::test_util::parquet_test_data();
1536 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1537 let data = Bytes::from(std::fs::read(path).unwrap());
1538
1539 let mut rand = rng();
1540
1541 for _ in 0..100 {
1542 let mut expected_rows = 0;
1543 let mut total_rows = 0;
1544 let mut skip = false;
1545 let mut selectors = vec![];
1546
1547 while total_rows < 7300 {
1548 let row_count: usize = rand.random_range(1..100);
1549
1550 let row_count = row_count.min(7300 - total_rows);
1551
1552 selectors.push(RowSelector { row_count, skip });
1553
1554 total_rows += row_count;
1555 if !skip {
1556 expected_rows += row_count;
1557 }
1558
1559 skip = !skip;
1560 }
1561
1562 let selection = RowSelection::from(selectors);
1563
1564 let async_reader = TestReader::new(data.clone());
1565
1566 let options = ArrowReaderOptions::new().with_page_index(true);
1567 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1568 .await
1569 .unwrap();
1570
1571 assert_eq!(builder.metadata().num_row_groups(), 1);
1572
1573 let col_idx: usize = rand.random_range(0..13);
1574 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1575
1576 let stream = builder
1577 .with_projection(mask.clone())
1578 .with_row_selection(selection.clone())
1579 .build()
1580 .expect("building stream");
1581
1582 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1583
1584 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1585
1586 assert_eq!(actual_rows, expected_rows);
1587 }
1588 }
1589
1590 #[tokio::test]
1591 async fn test_async_reader_zero_row_selector() {
1592 let testdata = arrow::util::test_util::parquet_test_data();
1594 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1595 let data = Bytes::from(std::fs::read(path).unwrap());
1596
1597 let mut rand = rng();
1598
1599 let mut expected_rows = 0;
1600 let mut total_rows = 0;
1601 let mut skip = false;
1602 let mut selectors = vec![];
1603
1604 selectors.push(RowSelector {
1605 row_count: 0,
1606 skip: false,
1607 });
1608
1609 while total_rows < 7300 {
1610 let row_count: usize = rand.random_range(1..100);
1611
1612 let row_count = row_count.min(7300 - total_rows);
1613
1614 selectors.push(RowSelector { row_count, skip });
1615
1616 total_rows += row_count;
1617 if !skip {
1618 expected_rows += row_count;
1619 }
1620
1621 skip = !skip;
1622 }
1623
1624 let selection = RowSelection::from(selectors);
1625
1626 let async_reader = TestReader::new(data.clone());
1627
1628 let options = ArrowReaderOptions::new().with_page_index(true);
1629 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1630 .await
1631 .unwrap();
1632
1633 assert_eq!(builder.metadata().num_row_groups(), 1);
1634
1635 let col_idx: usize = rand.random_range(0..13);
1636 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1637
1638 let stream = builder
1639 .with_projection(mask.clone())
1640 .with_row_selection(selection.clone())
1641 .build()
1642 .expect("building stream");
1643
1644 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1645
1646 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1647
1648 assert_eq!(actual_rows, expected_rows);
1649 }
1650
1651 #[tokio::test]
1652 async fn test_row_filter() {
1653 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1654 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1655 let data = RecordBatch::try_from_iter([
1656 ("a", Arc::new(a) as ArrayRef),
1657 ("b", Arc::new(b) as ArrayRef),
1658 ])
1659 .unwrap();
1660
1661 let mut buf = Vec::with_capacity(1024);
1662 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1663 writer.write(&data).unwrap();
1664 writer.close().unwrap();
1665
1666 let data: Bytes = buf.into();
1667 let metadata = ParquetMetaDataReader::new()
1668 .parse_and_finish(&data)
1669 .unwrap();
1670 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1671
1672 let test = TestReader::new(data);
1673 let requests = test.requests.clone();
1674
1675 let a_scalar = StringArray::from_iter_values(["b"]);
1676 let a_filter = ArrowPredicateFn::new(
1677 ProjectionMask::leaves(&parquet_schema, vec![0]),
1678 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1679 );
1680
1681 let filter = RowFilter::new(vec![Box::new(a_filter)]);
1682
1683 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1684 let stream = ParquetRecordBatchStreamBuilder::new(test)
1685 .await
1686 .unwrap()
1687 .with_projection(mask.clone())
1688 .with_batch_size(1024)
1689 .with_row_filter(filter)
1690 .build()
1691 .unwrap();
1692
1693 let batches: Vec<_> = stream.try_collect().await.unwrap();
1694 assert_eq!(batches.len(), 1);
1695
1696 let batch = &batches[0];
1697 assert_eq!(batch.num_columns(), 2);
1698
1699 assert_eq!(
1701 batch.column(0).as_ref(),
1702 &StringArray::from_iter_values(["b", "b", "b"])
1703 );
1704 assert_eq!(
1705 batch.column(1).as_ref(),
1706 &StringArray::from_iter_values(["2", "3", "4"])
1707 );
1708
1709 assert_eq!(requests.lock().unwrap().len(), 2);
1713 }
1714
1715 #[tokio::test]
1716 async fn test_two_row_filters() {
1717 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1718 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1719 let c = Int32Array::from_iter(0..6);
1720 let data = RecordBatch::try_from_iter([
1721 ("a", Arc::new(a) as ArrayRef),
1722 ("b", Arc::new(b) as ArrayRef),
1723 ("c", Arc::new(c) as ArrayRef),
1724 ])
1725 .unwrap();
1726
1727 let mut buf = Vec::with_capacity(1024);
1728 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1729 writer.write(&data).unwrap();
1730 writer.close().unwrap();
1731
1732 let data: Bytes = buf.into();
1733 let metadata = ParquetMetaDataReader::new()
1734 .parse_and_finish(&data)
1735 .unwrap();
1736 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1737
1738 let test = TestReader::new(data);
1739 let requests = test.requests.clone();
1740
1741 let a_scalar = StringArray::from_iter_values(["b"]);
1742 let a_filter = ArrowPredicateFn::new(
1743 ProjectionMask::leaves(&parquet_schema, vec![0]),
1744 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1745 );
1746
1747 let b_scalar = StringArray::from_iter_values(["4"]);
1748 let b_filter = ArrowPredicateFn::new(
1749 ProjectionMask::leaves(&parquet_schema, vec![1]),
1750 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1751 );
1752
1753 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1754
1755 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1756 let stream = ParquetRecordBatchStreamBuilder::new(test)
1757 .await
1758 .unwrap()
1759 .with_projection(mask.clone())
1760 .with_batch_size(1024)
1761 .with_row_filter(filter)
1762 .build()
1763 .unwrap();
1764
1765 let batches: Vec<_> = stream.try_collect().await.unwrap();
1766 assert_eq!(batches.len(), 1);
1767
1768 let batch = &batches[0];
1769 assert_eq!(batch.num_rows(), 1);
1770 assert_eq!(batch.num_columns(), 2);
1771
1772 let col = batch.column(0);
1773 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1774 assert_eq!(val, "b");
1775
1776 let col = batch.column(1);
1777 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1778 assert_eq!(val, 3);
1779
1780 assert_eq!(requests.lock().unwrap().len(), 3);
1785 }
1786
1787 #[tokio::test]
1788 async fn test_limit_multiple_row_groups() {
1789 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1790 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1791 let c = Int32Array::from_iter(0..6);
1792 let data = RecordBatch::try_from_iter([
1793 ("a", Arc::new(a) as ArrayRef),
1794 ("b", Arc::new(b) as ArrayRef),
1795 ("c", Arc::new(c) as ArrayRef),
1796 ])
1797 .unwrap();
1798
1799 let mut buf = Vec::with_capacity(1024);
1800 let props = WriterProperties::builder()
1801 .set_max_row_group_size(3)
1802 .build();
1803 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1804 writer.write(&data).unwrap();
1805 writer.close().unwrap();
1806
1807 let data: Bytes = buf.into();
1808 let metadata = ParquetMetaDataReader::new()
1809 .parse_and_finish(&data)
1810 .unwrap();
1811
1812 assert_eq!(metadata.num_row_groups(), 2);
1813
1814 let test = TestReader::new(data);
1815
1816 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1817 .await
1818 .unwrap()
1819 .with_batch_size(1024)
1820 .with_limit(4)
1821 .build()
1822 .unwrap();
1823
1824 let batches: Vec<_> = stream.try_collect().await.unwrap();
1825 assert_eq!(batches.len(), 2);
1827
1828 let batch = &batches[0];
1829 assert_eq!(batch.num_rows(), 3);
1831 assert_eq!(batch.num_columns(), 3);
1832 let col2 = batch.column(2).as_primitive::<Int32Type>();
1833 assert_eq!(col2.values(), &[0, 1, 2]);
1834
1835 let batch = &batches[1];
1836 assert_eq!(batch.num_rows(), 1);
1838 assert_eq!(batch.num_columns(), 3);
1839 let col2 = batch.column(2).as_primitive::<Int32Type>();
1840 assert_eq!(col2.values(), &[3]);
1841
1842 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1843 .await
1844 .unwrap()
1845 .with_offset(2)
1846 .with_limit(3)
1847 .build()
1848 .unwrap();
1849
1850 let batches: Vec<_> = stream.try_collect().await.unwrap();
1851 assert_eq!(batches.len(), 2);
1853
1854 let batch = &batches[0];
1855 assert_eq!(batch.num_rows(), 1);
1857 assert_eq!(batch.num_columns(), 3);
1858 let col2 = batch.column(2).as_primitive::<Int32Type>();
1859 assert_eq!(col2.values(), &[2]);
1860
1861 let batch = &batches[1];
1862 assert_eq!(batch.num_rows(), 2);
1864 assert_eq!(batch.num_columns(), 3);
1865 let col2 = batch.column(2).as_primitive::<Int32Type>();
1866 assert_eq!(col2.values(), &[3, 4]);
1867
1868 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1869 .await
1870 .unwrap()
1871 .with_offset(4)
1872 .with_limit(20)
1873 .build()
1874 .unwrap();
1875
1876 let batches: Vec<_> = stream.try_collect().await.unwrap();
1877 assert_eq!(batches.len(), 1);
1879
1880 let batch = &batches[0];
1881 assert_eq!(batch.num_rows(), 2);
1883 assert_eq!(batch.num_columns(), 3);
1884 let col2 = batch.column(2).as_primitive::<Int32Type>();
1885 assert_eq!(col2.values(), &[4, 5]);
1886 }
1887
1888 #[tokio::test]
1889 async fn test_row_filter_with_index() {
1890 let testdata = arrow::util::test_util::parquet_test_data();
1891 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1892 let data = Bytes::from(std::fs::read(path).unwrap());
1893
1894 let metadata = ParquetMetaDataReader::new()
1895 .parse_and_finish(&data)
1896 .unwrap();
1897 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1898
1899 assert_eq!(metadata.num_row_groups(), 1);
1900
1901 let async_reader = TestReader::new(data.clone());
1902
1903 let a_filter =
1904 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1905 Ok(batch.column(0).as_boolean().clone())
1906 });
1907
1908 let b_scalar = Int8Array::from(vec![2]);
1909 let b_filter = ArrowPredicateFn::new(
1910 ProjectionMask::leaves(&parquet_schema, vec![2]),
1911 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1912 );
1913
1914 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1915
1916 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1917
1918 let options = ArrowReaderOptions::new().with_page_index(true);
1919 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1920 .await
1921 .unwrap()
1922 .with_projection(mask.clone())
1923 .with_batch_size(1024)
1924 .with_row_filter(filter)
1925 .build()
1926 .unwrap();
1927
1928 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1929
1930 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1931
1932 assert_eq!(total_rows, 730);
1933 }
1934
1935 #[tokio::test]
1936 #[allow(deprecated)]
1937 async fn test_in_memory_row_group_sparse() {
1938 let testdata = arrow::util::test_util::parquet_test_data();
1939 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1940 let data = Bytes::from(std::fs::read(path).unwrap());
1941
1942 let metadata = ParquetMetaDataReader::new()
1943 .with_page_indexes(true)
1944 .parse_and_finish(&data)
1945 .unwrap();
1946
1947 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1948
1949 let mut metadata_builder = metadata.into_builder();
1950 let mut row_groups = metadata_builder.take_row_groups();
1951 row_groups.truncate(1);
1952 let row_group_meta = row_groups.pop().unwrap();
1953
1954 let metadata = metadata_builder
1955 .add_row_group(row_group_meta)
1956 .set_column_index(None)
1957 .set_offset_index(Some(vec![offset_index.clone()]))
1958 .build();
1959
1960 let metadata = Arc::new(metadata);
1961
1962 let num_rows = metadata.row_group(0).num_rows();
1963
1964 assert_eq!(metadata.num_row_groups(), 1);
1965
1966 let async_reader = TestReader::new(data.clone());
1967
1968 let requests = async_reader.requests.clone();
1969 let (_, fields) = parquet_to_arrow_schema_and_fields(
1970 metadata.file_metadata().schema_descr(),
1971 ProjectionMask::all(),
1972 None,
1973 )
1974 .unwrap();
1975
1976 let _schema_desc = metadata.file_metadata().schema_descr();
1977
1978 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1979
1980 let reader_factory = ReaderFactory {
1981 metadata,
1982 fields: fields.map(Arc::new),
1983 input: async_reader,
1984 filter: None,
1985 limit: None,
1986 offset: None,
1987 metrics: ArrowReaderMetrics::disabled(),
1988 max_predicate_cache_size: 0,
1989 };
1990
1991 let mut skip = true;
1992 let mut pages = offset_index[0].page_locations.iter().peekable();
1993
1994 let mut selectors = vec![];
1996 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1997 while let Some(page) = pages.next() {
1998 let num_rows = if let Some(next_page) = pages.peek() {
1999 next_page.first_row_index - page.first_row_index
2000 } else {
2001 num_rows - page.first_row_index
2002 };
2003
2004 if skip {
2005 selectors.push(RowSelector::skip(num_rows as usize));
2006 } else {
2007 selectors.push(RowSelector::select(num_rows as usize));
2008 let start = page.offset as usize;
2009 let end = start + page.compressed_page_size as usize;
2010 expected_page_requests.push(start..end);
2011 }
2012 skip = !skip;
2013 }
2014
2015 let selection = RowSelection::from(selectors);
2016
2017 let (_factory, _reader) = reader_factory
2018 .read_row_group(0, Some(selection), projection.clone(), 48)
2019 .await
2020 .expect("reading row group");
2021
2022 let requests = requests.lock().unwrap();
2023
2024 assert_eq!(&requests[..], &expected_page_requests)
2025 }
2026
2027 #[tokio::test]
2028 async fn test_batch_size_overallocate() {
2029 let testdata = arrow::util::test_util::parquet_test_data();
2030 let path = format!("{testdata}/alltypes_plain.parquet");
2032 let data = Bytes::from(std::fs::read(path).unwrap());
2033
2034 let async_reader = TestReader::new(data.clone());
2035
2036 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2037 .await
2038 .unwrap();
2039
2040 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2041
2042 let stream = builder
2043 .with_projection(ProjectionMask::all())
2044 .with_batch_size(1024)
2045 .build()
2046 .unwrap();
2047 assert_ne!(1024, file_rows);
2048 assert_eq!(stream.batch_size, file_rows);
2049 }
2050
2051 #[tokio::test]
2052 async fn test_get_row_group_column_bloom_filter_without_length() {
2053 let testdata = arrow::util::test_util::parquet_test_data();
2054 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2055 let data = Bytes::from(std::fs::read(path).unwrap());
2056 test_get_row_group_column_bloom_filter(data, false).await;
2057 }
2058
2059 #[tokio::test]
2060 async fn test_parquet_record_batch_stream_schema() {
2061 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2062 schema.flattened_fields().iter().map(|f| f.name()).collect()
2063 }
2064
2065 let mut metadata = HashMap::with_capacity(1);
2074 metadata.insert("key".to_string(), "value".to_string());
2075
2076 let nested_struct_array = StructArray::from(vec![
2077 (
2078 Arc::new(Field::new("d", DataType::Utf8, true)),
2079 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2080 ),
2081 (
2082 Arc::new(Field::new("e", DataType::Utf8, true)),
2083 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2084 ),
2085 ]);
2086 let struct_array = StructArray::from(vec![
2087 (
2088 Arc::new(Field::new("a", DataType::Int32, true)),
2089 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2090 ),
2091 (
2092 Arc::new(Field::new("b", DataType::UInt64, true)),
2093 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2094 ),
2095 (
2096 Arc::new(Field::new(
2097 "c",
2098 nested_struct_array.data_type().clone(),
2099 true,
2100 )),
2101 Arc::new(nested_struct_array) as ArrayRef,
2102 ),
2103 ]);
2104
2105 let schema =
2106 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2107 let record_batch = RecordBatch::from(struct_array)
2108 .with_schema(schema.clone())
2109 .unwrap();
2110
2111 let mut file = tempfile().unwrap();
2113 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2114 writer.write(&record_batch).unwrap();
2115 writer.close().unwrap();
2116
2117 let all_fields = ["a", "b", "c", "d", "e"];
2118 let projections = [
2120 (vec![], vec![]),
2121 (vec![0], vec!["a"]),
2122 (vec![0, 1], vec!["a", "b"]),
2123 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2124 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2125 ];
2126
2127 for (indices, expected_projected_names) in projections {
2129 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2130 assert_eq!(get_all_field_names(&builder), all_fields);
2132 assert_eq!(builder.metadata, metadata);
2133 assert_eq!(get_all_field_names(&reader), expected_projected_names);
2135 assert_eq!(reader.metadata, HashMap::default());
2136 assert_eq!(get_all_field_names(&batch), expected_projected_names);
2137 assert_eq!(batch.metadata, HashMap::default());
2138 };
2139
2140 let builder =
2141 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2142 let sync_builder_schema = builder.schema().clone();
2143 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2144 let mut reader = builder.with_projection(mask).build().unwrap();
2145 let sync_reader_schema = reader.schema();
2146 let batch = reader.next().unwrap().unwrap();
2147 let sync_batch_schema = batch.schema();
2148 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2149
2150 let file = tokio::fs::File::from(file.try_clone().unwrap());
2152 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2153 let async_builder_schema = builder.schema().clone();
2154 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2155 let mut reader = builder.with_projection(mask).build().unwrap();
2156 let async_reader_schema = reader.schema().clone();
2157 let batch = reader.next().await.unwrap().unwrap();
2158 let async_batch_schema = batch.schema();
2159 assert_schemas(
2160 async_builder_schema,
2161 async_reader_schema,
2162 async_batch_schema,
2163 );
2164 }
2165 }
2166
2167 #[tokio::test]
2168 async fn test_get_row_group_column_bloom_filter_with_length() {
2169 let testdata = arrow::util::test_util::parquet_test_data();
2171 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2172 let data = Bytes::from(std::fs::read(path).unwrap());
2173 let async_reader = TestReader::new(data.clone());
2174 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2175 .await
2176 .unwrap();
2177 let schema = builder.schema().clone();
2178 let stream = builder.build().unwrap();
2179 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2180
2181 let mut parquet_data = Vec::new();
2182 let props = WriterProperties::builder()
2183 .set_bloom_filter_enabled(true)
2184 .build();
2185 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2186 for batch in batches {
2187 writer.write(&batch).unwrap();
2188 }
2189 writer.close().unwrap();
2190
2191 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2193 }
2194
2195 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2196 let async_reader = TestReader::new(data.clone());
2197
2198 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2199 .await
2200 .unwrap();
2201
2202 let metadata = builder.metadata();
2203 assert_eq!(metadata.num_row_groups(), 1);
2204 let row_group = metadata.row_group(0);
2205 let column = row_group.column(0);
2206 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2207
2208 let sbbf = builder
2209 .get_row_group_column_bloom_filter(0, 0)
2210 .await
2211 .unwrap()
2212 .unwrap();
2213 assert!(sbbf.check(&"Hello"));
2214 assert!(!sbbf.check(&"Hello_Not_Exists"));
2215 }
2216
2217 #[tokio::test]
2218 async fn test_nested_skip() {
2219 let schema = Arc::new(Schema::new(vec![
2220 Field::new("col_1", DataType::UInt64, false),
2221 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2222 ]));
2223
2224 let props = WriterProperties::builder()
2226 .set_data_page_row_count_limit(256)
2227 .set_write_batch_size(256)
2228 .set_max_row_group_size(1024);
2229
2230 let mut file = tempfile().unwrap();
2232 let mut writer =
2233 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2234
2235 let mut builder = ListBuilder::new(StringBuilder::new());
2236 for id in 0..1024 {
2237 match id % 3 {
2238 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2239 1 => builder.append_value([Some(format!("id_{id}"))]),
2240 _ => builder.append_null(),
2241 }
2242 }
2243 let refs = vec![
2244 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2245 Arc::new(builder.finish()) as ArrayRef,
2246 ];
2247
2248 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2249 writer.write(&batch).unwrap();
2250 writer.close().unwrap();
2251
2252 let selections = [
2253 RowSelection::from(vec![
2254 RowSelector::skip(313),
2255 RowSelector::select(1),
2256 RowSelector::skip(709),
2257 RowSelector::select(1),
2258 ]),
2259 RowSelection::from(vec![
2260 RowSelector::skip(255),
2261 RowSelector::select(1),
2262 RowSelector::skip(767),
2263 RowSelector::select(1),
2264 ]),
2265 RowSelection::from(vec![
2266 RowSelector::select(255),
2267 RowSelector::skip(1),
2268 RowSelector::select(767),
2269 RowSelector::skip(1),
2270 ]),
2271 RowSelection::from(vec![
2272 RowSelector::skip(254),
2273 RowSelector::select(1),
2274 RowSelector::select(1),
2275 RowSelector::skip(767),
2276 RowSelector::select(1),
2277 ]),
2278 ];
2279
2280 for selection in selections {
2281 let expected = selection.row_count();
2282 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2284 tokio::fs::File::from_std(file.try_clone().unwrap()),
2285 ArrowReaderOptions::new().with_page_index(true),
2286 )
2287 .await
2288 .unwrap();
2289
2290 reader = reader.with_row_selection(selection);
2291
2292 let mut stream = reader.build().unwrap();
2293
2294 let mut total_rows = 0;
2295 while let Some(rb) = stream.next().await {
2296 let rb = rb.unwrap();
2297 total_rows += rb.num_rows();
2298 }
2299 assert_eq!(total_rows, expected);
2300 }
2301 }
2302
2303 #[tokio::test]
2304 async fn test_row_filter_nested() {
2305 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2306 let b = StructArray::from(vec![
2307 (
2308 Arc::new(Field::new("aa", DataType::Utf8, true)),
2309 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2310 ),
2311 (
2312 Arc::new(Field::new("bb", DataType::Utf8, true)),
2313 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2314 ),
2315 ]);
2316 let c = Int32Array::from_iter(0..6);
2317 let data = RecordBatch::try_from_iter([
2318 ("a", Arc::new(a) as ArrayRef),
2319 ("b", Arc::new(b) as ArrayRef),
2320 ("c", Arc::new(c) as ArrayRef),
2321 ])
2322 .unwrap();
2323
2324 let mut buf = Vec::with_capacity(1024);
2325 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2326 writer.write(&data).unwrap();
2327 writer.close().unwrap();
2328
2329 let data: Bytes = buf.into();
2330 let metadata = ParquetMetaDataReader::new()
2331 .parse_and_finish(&data)
2332 .unwrap();
2333 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2334
2335 let test = TestReader::new(data);
2336 let requests = test.requests.clone();
2337
2338 let a_scalar = StringArray::from_iter_values(["b"]);
2339 let a_filter = ArrowPredicateFn::new(
2340 ProjectionMask::leaves(&parquet_schema, vec![0]),
2341 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2342 );
2343
2344 let b_scalar = StringArray::from_iter_values(["4"]);
2345 let b_filter = ArrowPredicateFn::new(
2346 ProjectionMask::leaves(&parquet_schema, vec![2]),
2347 move |batch| {
2348 let struct_array = batch
2350 .column(0)
2351 .as_any()
2352 .downcast_ref::<StructArray>()
2353 .unwrap();
2354 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2355 },
2356 );
2357
2358 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2359
2360 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2361 let stream = ParquetRecordBatchStreamBuilder::new(test)
2362 .await
2363 .unwrap()
2364 .with_projection(mask.clone())
2365 .with_batch_size(1024)
2366 .with_row_filter(filter)
2367 .build()
2368 .unwrap();
2369
2370 let batches: Vec<_> = stream.try_collect().await.unwrap();
2371 assert_eq!(batches.len(), 1);
2372
2373 let batch = &batches[0];
2374 assert_eq!(batch.num_rows(), 1);
2375 assert_eq!(batch.num_columns(), 2);
2376
2377 let col = batch.column(0);
2378 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2379 assert_eq!(val, "b");
2380
2381 let col = batch.column(1);
2382 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2383 assert_eq!(val, 3);
2384
2385 assert_eq!(requests.lock().unwrap().len(), 3);
2390 }
2391
2392 #[tokio::test]
2393 async fn test_cache_projection_excludes_nested_columns() {
2394 use arrow_array::{ArrayRef, StringArray};
2395
2396 let a = StringArray::from_iter_values(["r1", "r2"]);
2398 let b = StructArray::from(vec![
2399 (
2400 Arc::new(Field::new("aa", DataType::Utf8, true)),
2401 Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2402 ),
2403 (
2404 Arc::new(Field::new("bb", DataType::Utf8, true)),
2405 Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2406 ),
2407 ]);
2408
2409 let schema = Arc::new(Schema::new(vec![
2410 Field::new("a", DataType::Utf8, true),
2411 Field::new("b", b.data_type().clone(), true),
2412 ]));
2413
2414 let mut buf = Vec::new();
2415 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2416 let batch = RecordBatch::try_from_iter([
2417 ("a", Arc::new(a) as ArrayRef),
2418 ("b", Arc::new(b) as ArrayRef),
2419 ])
2420 .unwrap();
2421 writer.write(&batch).unwrap();
2422 writer.close().unwrap();
2423
2424 let data: Bytes = buf.into();
2426 let metadata = ParquetMetaDataReader::new()
2427 .parse_and_finish(&data)
2428 .unwrap();
2429 let metadata = Arc::new(metadata);
2430
2431 let parquet_schema = metadata.file_metadata().schema_descr();
2434 let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2435
2436 let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2437 Ok(arrow_array::BooleanArray::from(vec![
2438 true;
2439 batch.num_rows()
2440 ]))
2441 });
2442 let filter = RowFilter::new(vec![Box::new(always_true)]);
2443
2444 let reader_factory = ReaderFactory {
2446 metadata: Arc::clone(&metadata),
2447 fields: None,
2448 input: TestReader::new(data),
2449 filter: Some(filter),
2450 limit: None,
2451 offset: None,
2452 metrics: ArrowReaderMetrics::disabled(),
2453 max_predicate_cache_size: 0,
2454 };
2455
2456 let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2458
2459 assert!(cache_projection.is_none());
2461 }
2462
2463 #[tokio::test]
2464 #[allow(deprecated)]
2465 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2466 use tokio::fs::File;
2467 let testdata = arrow::util::test_util::parquet_test_data();
2468 let path = format!("{testdata}/alltypes_plain.parquet");
2469 let mut file = File::open(&path).await.unwrap();
2470 let file_size = file.metadata().await.unwrap().len();
2471 let mut metadata = ParquetMetaDataReader::new()
2472 .with_page_indexes(true)
2473 .load_and_finish(&mut file, file_size)
2474 .await
2475 .unwrap();
2476
2477 metadata.set_offset_index(Some(vec![]));
2478 let options = ArrowReaderOptions::new().with_page_index(true);
2479 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2480 let reader =
2481 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2482 .build()
2483 .unwrap();
2484
2485 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2486 assert_eq!(result.len(), 1);
2487 }
2488
2489 #[tokio::test]
2490 #[allow(deprecated)]
2491 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2492 use tokio::fs::File;
2493 let testdata = arrow::util::test_util::parquet_test_data();
2494 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2495 let mut file = File::open(&path).await.unwrap();
2496 let file_size = file.metadata().await.unwrap().len();
2497 let metadata = ParquetMetaDataReader::new()
2498 .with_page_indexes(true)
2499 .load_and_finish(&mut file, file_size)
2500 .await
2501 .unwrap();
2502
2503 let options = ArrowReaderOptions::new().with_page_index(true);
2504 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2505 let reader =
2506 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2507 .build()
2508 .unwrap();
2509
2510 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2511 assert_eq!(result.len(), 8);
2512 }
2513
2514 #[tokio::test]
2515 #[allow(deprecated)]
2516 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2517 use tempfile::TempDir;
2518 use tokio::fs::File;
2519 fn write_metadata_to_local_file(
2520 metadata: ParquetMetaData,
2521 file: impl AsRef<std::path::Path>,
2522 ) {
2523 use crate::file::metadata::ParquetMetaDataWriter;
2524 use std::fs::File;
2525 let file = File::create(file).unwrap();
2526 ParquetMetaDataWriter::new(file, &metadata)
2527 .finish()
2528 .unwrap()
2529 }
2530
2531 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2532 use std::fs::File;
2533 let file = File::open(file).unwrap();
2534 ParquetMetaDataReader::new()
2535 .with_page_indexes(true)
2536 .parse_and_finish(&file)
2537 .unwrap()
2538 }
2539
2540 let testdata = arrow::util::test_util::parquet_test_data();
2541 let path = format!("{testdata}/alltypes_plain.parquet");
2542 let mut file = File::open(&path).await.unwrap();
2543 let file_size = file.metadata().await.unwrap().len();
2544 let metadata = ParquetMetaDataReader::new()
2545 .with_page_indexes(true)
2546 .load_and_finish(&mut file, file_size)
2547 .await
2548 .unwrap();
2549
2550 let tempdir = TempDir::new().unwrap();
2551 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2552 write_metadata_to_local_file(metadata, &metadata_path);
2553 let metadata = read_metadata_from_local_file(&metadata_path);
2554
2555 let options = ArrowReaderOptions::new().with_page_index(true);
2556 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2557 let reader =
2558 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2559 .build()
2560 .unwrap();
2561
2562 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2564 assert_eq!(result.len(), 1);
2565 }
2566
2567 #[tokio::test]
2568 async fn test_cached_array_reader_sparse_offset_error() {
2569 use futures::TryStreamExt;
2570
2571 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2572 use arrow_array::{BooleanArray, RecordBatch};
2573
2574 let testdata = arrow::util::test_util::parquet_test_data();
2575 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2576 let data = Bytes::from(std::fs::read(path).unwrap());
2577
2578 let async_reader = TestReader::new(data);
2579
2580 let options = ArrowReaderOptions::new().with_page_index(true);
2582 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2583 .await
2584 .unwrap();
2585
2586 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2590
2591 let parquet_schema = builder.parquet_schema();
2595 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2596 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2597 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2598 });
2599 let filter = RowFilter::new(vec![Box::new(always_true)]);
2600
2601 let stream = builder
2604 .with_batch_size(8)
2605 .with_projection(proj)
2606 .with_row_selection(selection)
2607 .with_row_filter(filter)
2608 .build()
2609 .unwrap();
2610
2611 let _result: Vec<_> = stream.try_collect().await.unwrap();
2614 }
2615}