1use std::collections::VecDeque;
25use std::fmt::Formatter;
26use std::io::SeekFrom;
27use std::ops::Range;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use bytes::{Buf, Bytes};
33use futures::future::{BoxFuture, FutureExt};
34use futures::ready;
35use futures::stream::Stream;
36use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
37
38use arrow_array::RecordBatch;
39use arrow_schema::{DataType, Fields, Schema, SchemaRef};
40
41use crate::arrow::array_reader::{
42 ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
43};
44use crate::arrow::arrow_reader::{
45 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
46 RowFilter, RowSelection,
47};
48use crate::arrow::ProjectionMask;
49
50use crate::bloom_filter::{
51 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
52};
53use crate::column::page::{PageIterator, PageReader};
54use crate::errors::{ParquetError, Result};
55use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
56use crate::file::page_index::offset_index::OffsetIndexMetaData;
57use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
58use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
59
60mod metadata;
61pub use metadata::*;
62
63#[cfg(feature = "object_store")]
64mod store;
65
66use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
67use crate::arrow::arrow_reader::ReadPlanBuilder;
68use crate::arrow::schema::ParquetField;
69#[cfg(feature = "object_store")]
70pub use store::*;
71
72pub trait AsyncFileReader: Send {
86 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
88
89 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
91 async move {
92 let mut result = Vec::with_capacity(ranges.len());
93
94 for range in ranges.into_iter() {
95 let data = self.get_bytes(range).await?;
96 result.push(data);
97 }
98
99 Ok(result)
100 }
101 .boxed()
102 }
103
104 fn get_metadata<'a>(
121 &'a mut self,
122 options: Option<&'a ArrowReaderOptions>,
123 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
124}
125
126impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
128 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
129 self.as_mut().get_bytes(range)
130 }
131
132 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
133 self.as_mut().get_byte_ranges(ranges)
134 }
135
136 fn get_metadata<'a>(
137 &'a mut self,
138 options: Option<&'a ArrowReaderOptions>,
139 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
140 self.as_mut().get_metadata(options)
141 }
142}
143
144impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
145 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
146 async move {
147 self.seek(SeekFrom::End(-(suffix as i64))).await?;
148 let mut buf = Vec::with_capacity(suffix);
149 self.take(suffix as _).read_to_end(&mut buf).await?;
150 Ok(buf.into())
151 }
152 .boxed()
153 }
154}
155
156impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
157 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
158 async move {
159 self.seek(SeekFrom::Start(range.start)).await?;
160
161 let to_read = range.end - range.start;
162 let mut buffer = Vec::with_capacity(to_read.try_into()?);
163 let read = self.take(to_read).read_to_end(&mut buffer).await?;
164 if read as u64 != to_read {
165 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
166 }
167
168 Ok(buffer.into())
169 }
170 .boxed()
171 }
172
173 fn get_metadata<'a>(
174 &'a mut self,
175 options: Option<&'a ArrowReaderOptions>,
176 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177 async move {
178 let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
179 PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
180 );
181
182 #[cfg(feature = "encryption")]
183 let metadata_reader = metadata_reader.with_decryption_properties(
184 options.and_then(|o| o.file_decryption_properties.as_ref()),
185 );
186
187 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
188 Ok(Arc::new(parquet_metadata))
189 }
190 .boxed()
191 }
192}
193
194impl ArrowReaderMetadata {
195 pub async fn load_async<T: AsyncFileReader>(
199 input: &mut T,
200 options: ArrowReaderOptions,
201 ) -> Result<Self> {
202 let metadata = input.get_metadata(Some(&options)).await?;
203 Self::try_new(metadata, options)
204 }
205}
206
207#[doc(hidden)]
208pub struct AsyncReader<T>(T);
213
214pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
227
228impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
229 pub async fn new(input: T) -> Result<Self> {
360 Self::new_with_options(input, Default::default()).await
361 }
362
363 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
366 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
367 Ok(Self::new_with_metadata(input, metadata))
368 }
369
370 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
416 Self::new_builder(AsyncReader(input), metadata)
417 }
418
419 pub async fn get_row_group_column_bloom_filter(
425 &mut self,
426 row_group_idx: usize,
427 column_idx: usize,
428 ) -> Result<Option<Sbbf>> {
429 let metadata = self.metadata.row_group(row_group_idx);
430 let column_metadata = metadata.column(column_idx);
431
432 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
433 offset
434 .try_into()
435 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
436 } else {
437 return Ok(None);
438 };
439
440 let buffer = match column_metadata.bloom_filter_length() {
441 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
442 None => self
443 .input
444 .0
445 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
446 }
447 .await?;
448
449 let (header, bitset_offset) =
450 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
451
452 match header.algorithm {
453 BloomFilterAlgorithm::BLOCK(_) => {
454 }
456 }
457 match header.compression {
458 BloomFilterCompression::UNCOMPRESSED(_) => {
459 }
461 }
462 match header.hash {
463 BloomFilterHash::XXHASH(_) => {
464 }
466 }
467
468 let bitset = match column_metadata.bloom_filter_length() {
469 Some(_) => buffer.slice(
470 (TryInto::<usize>::try_into(bitset_offset).unwrap()
471 - TryInto::<usize>::try_into(offset).unwrap())..,
472 ),
473 None => {
474 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
475 ParquetError::General("Bloom filter length is invalid".to_string())
476 })?;
477 self.input
478 .0
479 .get_bytes(bitset_offset..bitset_offset + bitset_length)
480 .await?
481 }
482 };
483 Ok(Some(Sbbf::new(&bitset)))
484 }
485
486 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
490 let num_row_groups = self.metadata.row_groups().len();
491
492 let row_groups = match self.row_groups {
493 Some(row_groups) => {
494 if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
495 return Err(general_err!(
496 "row group {} out of bounds 0..{}",
497 col,
498 num_row_groups
499 ));
500 }
501 row_groups.into()
502 }
503 None => (0..self.metadata.row_groups().len()).collect(),
504 };
505
506 let batch_size = self
508 .batch_size
509 .min(self.metadata.file_metadata().num_rows() as usize);
510 let reader_factory = ReaderFactory {
511 input: self.input.0,
512 filter: self.filter,
513 metadata: self.metadata.clone(),
514 fields: self.fields,
515 limit: self.limit,
516 offset: self.offset,
517 metrics: self.metrics,
518 max_predicate_cache_size: self.max_predicate_cache_size,
519 };
520
521 let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) {
524 Some(DataType::Struct(fields)) => {
525 fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
526 }
527 None => Fields::empty(),
528 _ => unreachable!("Must be Struct for root type"),
529 };
530 let schema = Arc::new(Schema::new(projected_fields));
531
532 Ok(ParquetRecordBatchStream {
533 metadata: self.metadata,
534 batch_size,
535 row_groups,
536 projection: self.projection,
537 selection: self.selection,
538 schema,
539 reader_factory: Some(reader_factory),
540 state: StreamState::Init,
541 })
542 }
543}
544
545type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
550
551struct ReaderFactory<T> {
554 metadata: Arc<ParquetMetaData>,
555
556 fields: Option<Arc<ParquetField>>,
558
559 input: T,
560
561 filter: Option<RowFilter>,
563
564 limit: Option<usize>,
566
567 offset: Option<usize>,
569
570 metrics: ArrowReaderMetrics,
572
573 max_predicate_cache_size: usize,
575}
576
577impl<T> ReaderFactory<T>
578where
579 T: AsyncFileReader + Send,
580{
581 async fn read_row_group(
587 mut self,
588 row_group_idx: usize,
589 selection: Option<RowSelection>,
590 projection: ProjectionMask,
591 batch_size: usize,
592 ) -> ReadResult<T> {
593 let meta = self.metadata.row_group(row_group_idx);
596 let offset_index = self
597 .metadata
598 .offset_index()
599 .filter(|index| !index.is_empty())
601 .map(|x| x[row_group_idx].as_slice());
602
603 let cache_projection = match self.compute_cache_projection(&projection) {
605 Some(projection) => projection,
606 None => ProjectionMask::none(meta.columns().len()),
607 };
608 let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
609 batch_size,
610 self.max_predicate_cache_size,
611 )));
612
613 let mut row_group = InMemoryRowGroup {
614 row_count: meta.num_rows() as usize,
616 column_chunks: vec![None; meta.columns().len()],
617 offset_index,
618 row_group_idx,
619 metadata: self.metadata.as_ref(),
620 };
621
622 let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
623
624 let filter = self.filter.as_mut();
625 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
626
627 if let Some(filter) = filter {
629 let cache_options = cache_options_builder.clone().producer();
630
631 for predicate in filter.predicates.iter_mut() {
632 if !plan_builder.selects_any() {
633 return Ok((self, None)); }
635
636 let selection = plan_builder.selection();
638 let cache_mask = Some(&cache_projection);
640 row_group
641 .fetch(
642 &mut self.input,
643 predicate.projection(),
644 selection,
645 batch_size,
646 cache_mask,
647 )
648 .await?;
649
650 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
651 .with_cache_options(Some(&cache_options))
652 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
653
654 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
655 }
656 }
657
658 let rows_before = plan_builder
660 .num_rows_selected()
661 .unwrap_or(row_group.row_count);
662
663 if rows_before == 0 {
664 return Ok((self, None)); }
666
667 let plan_builder = plan_builder
669 .limited(row_group.row_count)
670 .with_offset(self.offset)
671 .with_limit(self.limit)
672 .build_limited();
673
674 let rows_after = plan_builder
675 .num_rows_selected()
676 .unwrap_or(row_group.row_count);
677
678 if let Some(offset) = &mut self.offset {
680 *offset = offset.saturating_sub(rows_before - rows_after)
683 }
684
685 if rows_after == 0 {
686 return Ok((self, None)); }
688
689 if let Some(limit) = &mut self.limit {
690 *limit -= rows_after;
691 }
692 row_group
694 .fetch(
696 &mut self.input,
697 &projection,
698 plan_builder.selection(),
699 batch_size,
700 None,
701 )
702 .await?;
703
704 let plan = plan_builder.build();
705
706 let cache_options = cache_options_builder.consumer();
707 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
708 .with_cache_options(Some(&cache_options))
709 .build_array_reader(self.fields.as_deref(), &projection)?;
710
711 let reader = ParquetRecordBatchReader::new(array_reader, plan);
712
713 Ok((self, Some(reader)))
714 }
715
716 fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
718 let filters = self.filter.as_ref()?;
719 let mut cache_projection = filters.predicates.first()?.projection().clone();
720 for predicate in filters.predicates.iter() {
721 cache_projection.union(predicate.projection());
722 }
723 cache_projection.intersect(projection);
724 self.exclude_nested_columns_from_cache(&cache_projection)
725 }
726
727 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
729 let schema = self.metadata.file_metadata().schema_descr();
730 let num_leaves = schema.num_columns();
731
732 let num_roots = schema.root_schema().get_fields().len();
734 let mut root_leaf_counts = vec![0usize; num_roots];
735 for leaf_idx in 0..num_leaves {
736 let root_idx = schema.get_column_root_idx(leaf_idx);
737 root_leaf_counts[root_idx] += 1;
738 }
739
740 let mut included_leaves = Vec::new();
742 for leaf_idx in 0..num_leaves {
743 if mask.leaf_included(leaf_idx) {
744 let root_idx = schema.get_column_root_idx(leaf_idx);
745 if root_leaf_counts[root_idx] == 1 {
746 included_leaves.push(leaf_idx);
747 }
748 }
749 }
750
751 if included_leaves.is_empty() {
752 None
753 } else {
754 Some(ProjectionMask::leaves(schema, included_leaves))
755 }
756 }
757}
758
759enum StreamState<T> {
760 Init,
762 Decoding(ParquetRecordBatchReader),
764 Reading(BoxFuture<'static, ReadResult<T>>),
766 Error,
768}
769
770impl<T> std::fmt::Debug for StreamState<T> {
771 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
772 match self {
773 StreamState::Init => write!(f, "StreamState::Init"),
774 StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
775 StreamState::Reading(_) => write!(f, "StreamState::Reading"),
776 StreamState::Error => write!(f, "StreamState::Error"),
777 }
778 }
779}
780
781pub struct ParquetRecordBatchStream<T> {
799 metadata: Arc<ParquetMetaData>,
800
801 schema: SchemaRef,
802
803 row_groups: VecDeque<usize>,
804
805 projection: ProjectionMask,
806
807 batch_size: usize,
808
809 selection: Option<RowSelection>,
810
811 reader_factory: Option<ReaderFactory<T>>,
813
814 state: StreamState<T>,
815}
816
817impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
818 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
819 f.debug_struct("ParquetRecordBatchStream")
820 .field("metadata", &self.metadata)
821 .field("schema", &self.schema)
822 .field("batch_size", &self.batch_size)
823 .field("projection", &self.projection)
824 .field("state", &self.state)
825 .finish()
826 }
827}
828
829impl<T> ParquetRecordBatchStream<T> {
830 pub fn schema(&self) -> &SchemaRef {
835 &self.schema
836 }
837}
838
839impl<T> ParquetRecordBatchStream<T>
840where
841 T: AsyncFileReader + Unpin + Send + 'static,
842{
843 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
857 loop {
858 match &mut self.state {
859 StreamState::Decoding(_) | StreamState::Reading(_) => {
860 return Err(ParquetError::General(
861 "Cannot combine the use of next_row_group with the Stream API".to_string(),
862 ))
863 }
864 StreamState::Init => {
865 let row_group_idx = match self.row_groups.pop_front() {
866 Some(idx) => idx,
867 None => return Ok(None),
868 };
869
870 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
871
872 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
873
874 let reader_factory = self.reader_factory.take().expect("lost reader factory");
875
876 let (reader_factory, maybe_reader) = reader_factory
877 .read_row_group(
878 row_group_idx,
879 selection,
880 self.projection.clone(),
881 self.batch_size,
882 )
883 .await
884 .inspect_err(|_| {
885 self.state = StreamState::Error;
886 })?;
887 self.reader_factory = Some(reader_factory);
888
889 if let Some(reader) = maybe_reader {
890 return Ok(Some(reader));
891 } else {
892 continue;
894 }
895 }
896 StreamState::Error => return Ok(None), }
898 }
899 }
900}
901
902impl<T> Stream for ParquetRecordBatchStream<T>
903where
904 T: AsyncFileReader + Unpin + Send + 'static,
905{
906 type Item = Result<RecordBatch>;
907
908 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
909 loop {
910 match &mut self.state {
911 StreamState::Decoding(batch_reader) => match batch_reader.next() {
912 Some(Ok(batch)) => {
913 return Poll::Ready(Some(Ok(batch)));
914 }
915 Some(Err(e)) => {
916 self.state = StreamState::Error;
917 return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
918 }
919 None => self.state = StreamState::Init,
920 },
921 StreamState::Init => {
922 let row_group_idx = match self.row_groups.pop_front() {
923 Some(idx) => idx,
924 None => return Poll::Ready(None),
925 };
926
927 let reader = self.reader_factory.take().expect("lost reader factory");
928
929 let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
930
931 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
932
933 let fut = reader
934 .read_row_group(
935 row_group_idx,
936 selection,
937 self.projection.clone(),
938 self.batch_size,
939 )
940 .boxed();
941
942 self.state = StreamState::Reading(fut)
943 }
944 StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
945 Ok((reader_factory, maybe_reader)) => {
946 self.reader_factory = Some(reader_factory);
947 match maybe_reader {
948 Some(reader) => self.state = StreamState::Decoding(reader),
950 None => self.state = StreamState::Init,
952 }
953 }
954 Err(e) => {
955 self.state = StreamState::Error;
956 return Poll::Ready(Some(Err(e)));
957 }
958 },
959 StreamState::Error => return Poll::Ready(None), }
961 }
962 }
963}
964
965struct InMemoryRowGroup<'a> {
967 offset_index: Option<&'a [OffsetIndexMetaData]>,
968 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
970 row_count: usize,
971 row_group_idx: usize,
972 metadata: &'a ParquetMetaData,
973}
974
975impl InMemoryRowGroup<'_> {
976 async fn fetch<T: AsyncFileReader + Send>(
982 &mut self,
983 input: &mut T,
984 projection: &ProjectionMask,
985 selection: Option<&RowSelection>,
986 batch_size: usize,
987 cache_mask: Option<&ProjectionMask>,
988 ) -> Result<()> {
989 let metadata = self.metadata.row_group(self.row_group_idx);
990 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
991 let expanded_selection =
992 selection.expand_to_batch_boundaries(batch_size, self.row_count);
993 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
996
997 let fetch_ranges = self
998 .column_chunks
999 .iter()
1000 .zip(metadata.columns())
1001 .enumerate()
1002 .filter(|&(idx, (chunk, _chunk_meta))| {
1003 chunk.is_none() && projection.leaf_included(idx)
1004 })
1005 .flat_map(|(idx, (_chunk, chunk_meta))| {
1006 let mut ranges: Vec<Range<u64>> = vec![];
1009 let (start, _len) = chunk_meta.byte_range();
1010 match offset_index[idx].page_locations.first() {
1011 Some(first) if first.offset as u64 != start => {
1012 ranges.push(start..first.offset as u64);
1013 }
1014 _ => (),
1015 }
1016
1017 let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1019 if use_expanded {
1020 ranges.extend(
1021 expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1022 );
1023 } else {
1024 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1025 }
1026 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1027
1028 ranges
1029 })
1030 .collect();
1031
1032 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1033 let mut page_start_offsets = page_start_offsets.into_iter();
1034
1035 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1036 if chunk.is_some() || !projection.leaf_included(idx) {
1037 continue;
1038 }
1039
1040 if let Some(offsets) = page_start_offsets.next() {
1041 let mut chunks = Vec::with_capacity(offsets.len());
1042 for _ in 0..offsets.len() {
1043 chunks.push(chunk_data.next().unwrap());
1044 }
1045
1046 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
1047 length: metadata.column(idx).byte_range().1 as usize,
1048 data: offsets
1049 .into_iter()
1050 .map(|x| x as usize)
1051 .zip(chunks.into_iter())
1052 .collect(),
1053 }))
1054 }
1055 }
1056 } else {
1057 let fetch_ranges = self
1058 .column_chunks
1059 .iter()
1060 .enumerate()
1061 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1062 .map(|(idx, _chunk)| {
1063 let column = metadata.column(idx);
1064 let (start, length) = column.byte_range();
1065 start..(start + length)
1066 })
1067 .collect();
1068
1069 let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1070
1071 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1072 if chunk.is_some() || !projection.leaf_included(idx) {
1073 continue;
1074 }
1075
1076 if let Some(data) = chunk_data.next() {
1077 *chunk = Some(Arc::new(ColumnChunkData::Dense {
1078 offset: metadata.column(idx).byte_range().0 as usize,
1079 data,
1080 }));
1081 }
1082 }
1083 }
1084
1085 Ok(())
1086 }
1087}
1088
1089impl RowGroups for InMemoryRowGroup<'_> {
1090 fn num_rows(&self) -> usize {
1091 self.row_count
1092 }
1093
1094 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1096 match &self.column_chunks[i] {
1097 None => Err(ParquetError::General(format!(
1098 "Invalid column index {i}, column was not fetched"
1099 ))),
1100 Some(data) => {
1101 let page_locations = self
1102 .offset_index
1103 .filter(|index| !index.is_empty())
1105 .map(|index| index[i].page_locations.clone());
1106 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1107 let page_reader = SerializedPageReader::new(
1108 data.clone(),
1109 column_chunk_metadata,
1110 self.row_count,
1111 page_locations,
1112 )?;
1113 let page_reader = page_reader.add_crypto_context(
1114 self.row_group_idx,
1115 i,
1116 self.metadata,
1117 column_chunk_metadata,
1118 )?;
1119
1120 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1121
1122 Ok(Box::new(ColumnChunkIterator {
1123 reader: Some(Ok(page_reader)),
1124 }))
1125 }
1126 }
1127 }
1128}
1129
1130#[derive(Clone)]
1132enum ColumnChunkData {
1133 Sparse {
1135 length: usize,
1137 data: Vec<(usize, Bytes)>,
1142 },
1143 Dense { offset: usize, data: Bytes },
1145}
1146
1147impl ColumnChunkData {
1148 fn get(&self, start: u64) -> Result<Bytes> {
1150 match &self {
1151 ColumnChunkData::Sparse { data, .. } => data
1152 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
1153 .map(|idx| data[idx].1.clone())
1154 .map_err(|_| {
1155 ParquetError::General(format!(
1156 "Invalid offset in sparse column chunk data: {start}"
1157 ))
1158 }),
1159 ColumnChunkData::Dense { offset, data } => {
1160 let start = start as usize - *offset;
1161 Ok(data.slice(start..))
1162 }
1163 }
1164 }
1165}
1166
1167impl Length for ColumnChunkData {
1168 fn len(&self) -> u64 {
1170 match &self {
1171 ColumnChunkData::Sparse { length, .. } => *length as u64,
1172 ColumnChunkData::Dense { data, .. } => data.len() as u64,
1173 }
1174 }
1175}
1176
1177impl ChunkReader for ColumnChunkData {
1178 type T = bytes::buf::Reader<Bytes>;
1179
1180 fn get_read(&self, start: u64) -> Result<Self::T> {
1181 Ok(self.get(start)?.reader())
1182 }
1183
1184 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1185 Ok(self.get(start)?.slice(..length))
1186 }
1187}
1188
1189struct ColumnChunkIterator {
1191 reader: Option<Result<Box<dyn PageReader>>>,
1192}
1193
1194impl Iterator for ColumnChunkIterator {
1195 type Item = Result<Box<dyn PageReader>>;
1196
1197 fn next(&mut self) -> Option<Self::Item> {
1198 self.reader.take()
1199 }
1200}
1201
1202impl PageIterator for ColumnChunkIterator {}
1203
1204#[cfg(test)]
1205mod tests {
1206 use super::*;
1207 use crate::arrow::arrow_reader::{
1208 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
1209 };
1210 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
1211 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
1212 use crate::arrow::ArrowWriter;
1213 use crate::file::metadata::ParquetMetaDataReader;
1214 use crate::file::properties::WriterProperties;
1215 use arrow::compute::kernels::cmp::eq;
1216 use arrow::error::Result as ArrowResult;
1217 use arrow_array::builder::{ListBuilder, StringBuilder};
1218 use arrow_array::cast::AsArray;
1219 use arrow_array::types::Int32Type;
1220 use arrow_array::{
1221 Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
1222 StructArray, UInt64Array,
1223 };
1224 use arrow_schema::{DataType, Field, Schema};
1225 use futures::{StreamExt, TryStreamExt};
1226 use rand::{rng, Rng};
1227 use std::collections::HashMap;
1228 use std::sync::{Arc, Mutex};
1229 use tempfile::tempfile;
1230
1231 #[derive(Clone)]
1232 struct TestReader {
1233 data: Bytes,
1234 metadata: Option<Arc<ParquetMetaData>>,
1235 requests: Arc<Mutex<Vec<Range<usize>>>>,
1236 }
1237
1238 impl TestReader {
1239 fn new(data: Bytes) -> Self {
1240 Self {
1241 data,
1242 metadata: Default::default(),
1243 requests: Default::default(),
1244 }
1245 }
1246 }
1247
1248 impl AsyncFileReader for TestReader {
1249 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1250 let range = range.clone();
1251 self.requests
1252 .lock()
1253 .unwrap()
1254 .push(range.start as usize..range.end as usize);
1255 futures::future::ready(Ok(self
1256 .data
1257 .slice(range.start as usize..range.end as usize)))
1258 .boxed()
1259 }
1260
1261 fn get_metadata<'a>(
1262 &'a mut self,
1263 options: Option<&'a ArrowReaderOptions>,
1264 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1265 let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1266 PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1267 );
1268 self.metadata = Some(Arc::new(
1269 metadata_reader.parse_and_finish(&self.data).unwrap(),
1270 ));
1271 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
1272 }
1273 }
1274
1275 #[tokio::test]
1276 async fn test_async_reader() {
1277 let testdata = arrow::util::test_util::parquet_test_data();
1278 let path = format!("{testdata}/alltypes_plain.parquet");
1279 let data = Bytes::from(std::fs::read(path).unwrap());
1280
1281 let async_reader = TestReader::new(data.clone());
1282
1283 let requests = async_reader.requests.clone();
1284 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1285 .await
1286 .unwrap();
1287
1288 let metadata = builder.metadata().clone();
1289 assert_eq!(metadata.num_row_groups(), 1);
1290
1291 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1292 let stream = builder
1293 .with_projection(mask.clone())
1294 .with_batch_size(1024)
1295 .build()
1296 .unwrap();
1297
1298 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1299
1300 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1301 .unwrap()
1302 .with_projection(mask)
1303 .with_batch_size(104)
1304 .build()
1305 .unwrap()
1306 .collect::<ArrowResult<Vec<_>>>()
1307 .unwrap();
1308
1309 assert_eq!(async_batches, sync_batches);
1310
1311 let requests = requests.lock().unwrap();
1312 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1313 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1314
1315 assert_eq!(
1316 &requests[..],
1317 &[
1318 offset_1 as usize..(offset_1 + length_1) as usize,
1319 offset_2 as usize..(offset_2 + length_2) as usize
1320 ]
1321 );
1322 }
1323
1324 #[tokio::test]
1325 async fn test_async_reader_with_next_row_group() {
1326 let testdata = arrow::util::test_util::parquet_test_data();
1327 let path = format!("{testdata}/alltypes_plain.parquet");
1328 let data = Bytes::from(std::fs::read(path).unwrap());
1329
1330 let async_reader = TestReader::new(data.clone());
1331
1332 let requests = async_reader.requests.clone();
1333 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1334 .await
1335 .unwrap();
1336
1337 let metadata = builder.metadata().clone();
1338 assert_eq!(metadata.num_row_groups(), 1);
1339
1340 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1341 let mut stream = builder
1342 .with_projection(mask.clone())
1343 .with_batch_size(1024)
1344 .build()
1345 .unwrap();
1346
1347 let mut readers = vec![];
1348 while let Some(reader) = stream.next_row_group().await.unwrap() {
1349 readers.push(reader);
1350 }
1351
1352 let async_batches: Vec<_> = readers
1353 .into_iter()
1354 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1355 .collect();
1356
1357 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1358 .unwrap()
1359 .with_projection(mask)
1360 .with_batch_size(104)
1361 .build()
1362 .unwrap()
1363 .collect::<ArrowResult<Vec<_>>>()
1364 .unwrap();
1365
1366 assert_eq!(async_batches, sync_batches);
1367
1368 let requests = requests.lock().unwrap();
1369 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1370 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1371
1372 assert_eq!(
1373 &requests[..],
1374 &[
1375 offset_1 as usize..(offset_1 + length_1) as usize,
1376 offset_2 as usize..(offset_2 + length_2) as usize
1377 ]
1378 );
1379 }
1380
1381 #[tokio::test]
1382 async fn test_async_reader_with_index() {
1383 let testdata = arrow::util::test_util::parquet_test_data();
1384 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1385 let data = Bytes::from(std::fs::read(path).unwrap());
1386
1387 let async_reader = TestReader::new(data.clone());
1388
1389 let options = ArrowReaderOptions::new().with_page_index(true);
1390 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1391 .await
1392 .unwrap();
1393
1394 let metadata_with_index = builder.metadata();
1396 assert_eq!(metadata_with_index.num_row_groups(), 1);
1397
1398 let offset_index = metadata_with_index.offset_index().unwrap();
1400 let column_index = metadata_with_index.column_index().unwrap();
1401
1402 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1403 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1404
1405 let num_columns = metadata_with_index
1406 .file_metadata()
1407 .schema_descr()
1408 .num_columns();
1409
1410 offset_index
1412 .iter()
1413 .for_each(|x| assert_eq!(x.len(), num_columns));
1414 column_index
1415 .iter()
1416 .for_each(|x| assert_eq!(x.len(), num_columns));
1417
1418 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1419 let stream = builder
1420 .with_projection(mask.clone())
1421 .with_batch_size(1024)
1422 .build()
1423 .unwrap();
1424
1425 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1426
1427 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1428 .unwrap()
1429 .with_projection(mask)
1430 .with_batch_size(1024)
1431 .build()
1432 .unwrap()
1433 .collect::<ArrowResult<Vec<_>>>()
1434 .unwrap();
1435
1436 assert_eq!(async_batches, sync_batches);
1437 }
1438
1439 #[tokio::test]
1440 async fn test_async_reader_with_limit() {
1441 let testdata = arrow::util::test_util::parquet_test_data();
1442 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1443 let data = Bytes::from(std::fs::read(path).unwrap());
1444
1445 let metadata = ParquetMetaDataReader::new()
1446 .parse_and_finish(&data)
1447 .unwrap();
1448 let metadata = Arc::new(metadata);
1449
1450 assert_eq!(metadata.num_row_groups(), 1);
1451
1452 let async_reader = TestReader::new(data.clone());
1453
1454 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1455 .await
1456 .unwrap();
1457
1458 assert_eq!(builder.metadata().num_row_groups(), 1);
1459
1460 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1461 let stream = builder
1462 .with_projection(mask.clone())
1463 .with_batch_size(1024)
1464 .with_limit(1)
1465 .build()
1466 .unwrap();
1467
1468 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1469
1470 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1471 .unwrap()
1472 .with_projection(mask)
1473 .with_batch_size(1024)
1474 .with_limit(1)
1475 .build()
1476 .unwrap()
1477 .collect::<ArrowResult<Vec<_>>>()
1478 .unwrap();
1479
1480 assert_eq!(async_batches, sync_batches);
1481 }
1482
1483 #[tokio::test]
1484 async fn test_async_reader_skip_pages() {
1485 let testdata = arrow::util::test_util::parquet_test_data();
1486 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1487 let data = Bytes::from(std::fs::read(path).unwrap());
1488
1489 let async_reader = TestReader::new(data.clone());
1490
1491 let options = ArrowReaderOptions::new().with_page_index(true);
1492 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1493 .await
1494 .unwrap();
1495
1496 assert_eq!(builder.metadata().num_row_groups(), 1);
1497
1498 let selection = RowSelection::from(vec![
1499 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1508
1509 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1510
1511 let stream = builder
1512 .with_projection(mask.clone())
1513 .with_row_selection(selection.clone())
1514 .build()
1515 .expect("building stream");
1516
1517 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1518
1519 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1520 .unwrap()
1521 .with_projection(mask)
1522 .with_batch_size(1024)
1523 .with_row_selection(selection)
1524 .build()
1525 .unwrap()
1526 .collect::<ArrowResult<Vec<_>>>()
1527 .unwrap();
1528
1529 assert_eq!(async_batches, sync_batches);
1530 }
1531
1532 #[tokio::test]
1533 async fn test_fuzz_async_reader_selection() {
1534 let testdata = arrow::util::test_util::parquet_test_data();
1535 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1536 let data = Bytes::from(std::fs::read(path).unwrap());
1537
1538 let mut rand = rng();
1539
1540 for _ in 0..100 {
1541 let mut expected_rows = 0;
1542 let mut total_rows = 0;
1543 let mut skip = false;
1544 let mut selectors = vec![];
1545
1546 while total_rows < 7300 {
1547 let row_count: usize = rand.random_range(1..100);
1548
1549 let row_count = row_count.min(7300 - total_rows);
1550
1551 selectors.push(RowSelector { row_count, skip });
1552
1553 total_rows += row_count;
1554 if !skip {
1555 expected_rows += row_count;
1556 }
1557
1558 skip = !skip;
1559 }
1560
1561 let selection = RowSelection::from(selectors);
1562
1563 let async_reader = TestReader::new(data.clone());
1564
1565 let options = ArrowReaderOptions::new().with_page_index(true);
1566 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1567 .await
1568 .unwrap();
1569
1570 assert_eq!(builder.metadata().num_row_groups(), 1);
1571
1572 let col_idx: usize = rand.random_range(0..13);
1573 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1574
1575 let stream = builder
1576 .with_projection(mask.clone())
1577 .with_row_selection(selection.clone())
1578 .build()
1579 .expect("building stream");
1580
1581 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1582
1583 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1584
1585 assert_eq!(actual_rows, expected_rows);
1586 }
1587 }
1588
1589 #[tokio::test]
1590 async fn test_async_reader_zero_row_selector() {
1591 let testdata = arrow::util::test_util::parquet_test_data();
1593 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1594 let data = Bytes::from(std::fs::read(path).unwrap());
1595
1596 let mut rand = rng();
1597
1598 let mut expected_rows = 0;
1599 let mut total_rows = 0;
1600 let mut skip = false;
1601 let mut selectors = vec![];
1602
1603 selectors.push(RowSelector {
1604 row_count: 0,
1605 skip: false,
1606 });
1607
1608 while total_rows < 7300 {
1609 let row_count: usize = rand.random_range(1..100);
1610
1611 let row_count = row_count.min(7300 - total_rows);
1612
1613 selectors.push(RowSelector { row_count, skip });
1614
1615 total_rows += row_count;
1616 if !skip {
1617 expected_rows += row_count;
1618 }
1619
1620 skip = !skip;
1621 }
1622
1623 let selection = RowSelection::from(selectors);
1624
1625 let async_reader = TestReader::new(data.clone());
1626
1627 let options = ArrowReaderOptions::new().with_page_index(true);
1628 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1629 .await
1630 .unwrap();
1631
1632 assert_eq!(builder.metadata().num_row_groups(), 1);
1633
1634 let col_idx: usize = rand.random_range(0..13);
1635 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1636
1637 let stream = builder
1638 .with_projection(mask.clone())
1639 .with_row_selection(selection.clone())
1640 .build()
1641 .expect("building stream");
1642
1643 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1644
1645 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1646
1647 assert_eq!(actual_rows, expected_rows);
1648 }
1649
1650 #[tokio::test]
1651 async fn test_row_filter() {
1652 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1653 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1654 let data = RecordBatch::try_from_iter([
1655 ("a", Arc::new(a) as ArrayRef),
1656 ("b", Arc::new(b) as ArrayRef),
1657 ])
1658 .unwrap();
1659
1660 let mut buf = Vec::with_capacity(1024);
1661 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1662 writer.write(&data).unwrap();
1663 writer.close().unwrap();
1664
1665 let data: Bytes = buf.into();
1666 let metadata = ParquetMetaDataReader::new()
1667 .parse_and_finish(&data)
1668 .unwrap();
1669 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1670
1671 let test = TestReader::new(data);
1672 let requests = test.requests.clone();
1673
1674 let a_scalar = StringArray::from_iter_values(["b"]);
1675 let a_filter = ArrowPredicateFn::new(
1676 ProjectionMask::leaves(&parquet_schema, vec![0]),
1677 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1678 );
1679
1680 let filter = RowFilter::new(vec![Box::new(a_filter)]);
1681
1682 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1683 let stream = ParquetRecordBatchStreamBuilder::new(test)
1684 .await
1685 .unwrap()
1686 .with_projection(mask.clone())
1687 .with_batch_size(1024)
1688 .with_row_filter(filter)
1689 .build()
1690 .unwrap();
1691
1692 let batches: Vec<_> = stream.try_collect().await.unwrap();
1693 assert_eq!(batches.len(), 1);
1694
1695 let batch = &batches[0];
1696 assert_eq!(batch.num_columns(), 2);
1697
1698 assert_eq!(
1700 batch.column(0).as_ref(),
1701 &StringArray::from_iter_values(["b", "b", "b"])
1702 );
1703 assert_eq!(
1704 batch.column(1).as_ref(),
1705 &StringArray::from_iter_values(["2", "3", "4"])
1706 );
1707
1708 assert_eq!(requests.lock().unwrap().len(), 2);
1712 }
1713
1714 #[tokio::test]
1715 async fn test_two_row_filters() {
1716 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1717 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1718 let c = Int32Array::from_iter(0..6);
1719 let data = RecordBatch::try_from_iter([
1720 ("a", Arc::new(a) as ArrayRef),
1721 ("b", Arc::new(b) as ArrayRef),
1722 ("c", Arc::new(c) as ArrayRef),
1723 ])
1724 .unwrap();
1725
1726 let mut buf = Vec::with_capacity(1024);
1727 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1728 writer.write(&data).unwrap();
1729 writer.close().unwrap();
1730
1731 let data: Bytes = buf.into();
1732 let metadata = ParquetMetaDataReader::new()
1733 .parse_and_finish(&data)
1734 .unwrap();
1735 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1736
1737 let test = TestReader::new(data);
1738 let requests = test.requests.clone();
1739
1740 let a_scalar = StringArray::from_iter_values(["b"]);
1741 let a_filter = ArrowPredicateFn::new(
1742 ProjectionMask::leaves(&parquet_schema, vec![0]),
1743 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1744 );
1745
1746 let b_scalar = StringArray::from_iter_values(["4"]);
1747 let b_filter = ArrowPredicateFn::new(
1748 ProjectionMask::leaves(&parquet_schema, vec![1]),
1749 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1750 );
1751
1752 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1753
1754 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1755 let stream = ParquetRecordBatchStreamBuilder::new(test)
1756 .await
1757 .unwrap()
1758 .with_projection(mask.clone())
1759 .with_batch_size(1024)
1760 .with_row_filter(filter)
1761 .build()
1762 .unwrap();
1763
1764 let batches: Vec<_> = stream.try_collect().await.unwrap();
1765 assert_eq!(batches.len(), 1);
1766
1767 let batch = &batches[0];
1768 assert_eq!(batch.num_rows(), 1);
1769 assert_eq!(batch.num_columns(), 2);
1770
1771 let col = batch.column(0);
1772 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1773 assert_eq!(val, "b");
1774
1775 let col = batch.column(1);
1776 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1777 assert_eq!(val, 3);
1778
1779 assert_eq!(requests.lock().unwrap().len(), 3);
1784 }
1785
1786 #[tokio::test]
1787 async fn test_limit_multiple_row_groups() {
1788 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1789 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1790 let c = Int32Array::from_iter(0..6);
1791 let data = RecordBatch::try_from_iter([
1792 ("a", Arc::new(a) as ArrayRef),
1793 ("b", Arc::new(b) as ArrayRef),
1794 ("c", Arc::new(c) as ArrayRef),
1795 ])
1796 .unwrap();
1797
1798 let mut buf = Vec::with_capacity(1024);
1799 let props = WriterProperties::builder()
1800 .set_max_row_group_size(3)
1801 .build();
1802 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1803 writer.write(&data).unwrap();
1804 writer.close().unwrap();
1805
1806 let data: Bytes = buf.into();
1807 let metadata = ParquetMetaDataReader::new()
1808 .parse_and_finish(&data)
1809 .unwrap();
1810
1811 assert_eq!(metadata.num_row_groups(), 2);
1812
1813 let test = TestReader::new(data);
1814
1815 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1816 .await
1817 .unwrap()
1818 .with_batch_size(1024)
1819 .with_limit(4)
1820 .build()
1821 .unwrap();
1822
1823 let batches: Vec<_> = stream.try_collect().await.unwrap();
1824 assert_eq!(batches.len(), 2);
1826
1827 let batch = &batches[0];
1828 assert_eq!(batch.num_rows(), 3);
1830 assert_eq!(batch.num_columns(), 3);
1831 let col2 = batch.column(2).as_primitive::<Int32Type>();
1832 assert_eq!(col2.values(), &[0, 1, 2]);
1833
1834 let batch = &batches[1];
1835 assert_eq!(batch.num_rows(), 1);
1837 assert_eq!(batch.num_columns(), 3);
1838 let col2 = batch.column(2).as_primitive::<Int32Type>();
1839 assert_eq!(col2.values(), &[3]);
1840
1841 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1842 .await
1843 .unwrap()
1844 .with_offset(2)
1845 .with_limit(3)
1846 .build()
1847 .unwrap();
1848
1849 let batches: Vec<_> = stream.try_collect().await.unwrap();
1850 assert_eq!(batches.len(), 2);
1852
1853 let batch = &batches[0];
1854 assert_eq!(batch.num_rows(), 1);
1856 assert_eq!(batch.num_columns(), 3);
1857 let col2 = batch.column(2).as_primitive::<Int32Type>();
1858 assert_eq!(col2.values(), &[2]);
1859
1860 let batch = &batches[1];
1861 assert_eq!(batch.num_rows(), 2);
1863 assert_eq!(batch.num_columns(), 3);
1864 let col2 = batch.column(2).as_primitive::<Int32Type>();
1865 assert_eq!(col2.values(), &[3, 4]);
1866
1867 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1868 .await
1869 .unwrap()
1870 .with_offset(4)
1871 .with_limit(20)
1872 .build()
1873 .unwrap();
1874
1875 let batches: Vec<_> = stream.try_collect().await.unwrap();
1876 assert_eq!(batches.len(), 1);
1878
1879 let batch = &batches[0];
1880 assert_eq!(batch.num_rows(), 2);
1882 assert_eq!(batch.num_columns(), 3);
1883 let col2 = batch.column(2).as_primitive::<Int32Type>();
1884 assert_eq!(col2.values(), &[4, 5]);
1885 }
1886
1887 #[tokio::test]
1888 async fn test_row_filter_with_index() {
1889 let testdata = arrow::util::test_util::parquet_test_data();
1890 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1891 let data = Bytes::from(std::fs::read(path).unwrap());
1892
1893 let metadata = ParquetMetaDataReader::new()
1894 .parse_and_finish(&data)
1895 .unwrap();
1896 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1897
1898 assert_eq!(metadata.num_row_groups(), 1);
1899
1900 let async_reader = TestReader::new(data.clone());
1901
1902 let a_filter =
1903 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1904 Ok(batch.column(0).as_boolean().clone())
1905 });
1906
1907 let b_scalar = Int8Array::from(vec![2]);
1908 let b_filter = ArrowPredicateFn::new(
1909 ProjectionMask::leaves(&parquet_schema, vec![2]),
1910 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1911 );
1912
1913 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1914
1915 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1916
1917 let options = ArrowReaderOptions::new().with_page_index(true);
1918 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1919 .await
1920 .unwrap()
1921 .with_projection(mask.clone())
1922 .with_batch_size(1024)
1923 .with_row_filter(filter)
1924 .build()
1925 .unwrap();
1926
1927 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1928
1929 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1930
1931 assert_eq!(total_rows, 730);
1932 }
1933
1934 #[tokio::test]
1935 #[allow(deprecated)]
1936 async fn test_in_memory_row_group_sparse() {
1937 let testdata = arrow::util::test_util::parquet_test_data();
1938 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1939 let data = Bytes::from(std::fs::read(path).unwrap());
1940
1941 let metadata = ParquetMetaDataReader::new()
1942 .with_page_indexes(true)
1943 .parse_and_finish(&data)
1944 .unwrap();
1945
1946 let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();
1947
1948 let mut metadata_builder = metadata.into_builder();
1949 let mut row_groups = metadata_builder.take_row_groups();
1950 row_groups.truncate(1);
1951 let row_group_meta = row_groups.pop().unwrap();
1952
1953 let metadata = metadata_builder
1954 .add_row_group(row_group_meta)
1955 .set_column_index(None)
1956 .set_offset_index(Some(vec![offset_index.clone()]))
1957 .build();
1958
1959 let metadata = Arc::new(metadata);
1960
1961 let num_rows = metadata.row_group(0).num_rows();
1962
1963 assert_eq!(metadata.num_row_groups(), 1);
1964
1965 let async_reader = TestReader::new(data.clone());
1966
1967 let requests = async_reader.requests.clone();
1968 let (_, fields) = parquet_to_arrow_schema_and_fields(
1969 metadata.file_metadata().schema_descr(),
1970 ProjectionMask::all(),
1971 None,
1972 )
1973 .unwrap();
1974
1975 let _schema_desc = metadata.file_metadata().schema_descr();
1976
1977 let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
1978
1979 let reader_factory = ReaderFactory {
1980 metadata,
1981 fields: fields.map(Arc::new),
1982 input: async_reader,
1983 filter: None,
1984 limit: None,
1985 offset: None,
1986 metrics: ArrowReaderMetrics::disabled(),
1987 max_predicate_cache_size: 0,
1988 };
1989
1990 let mut skip = true;
1991 let mut pages = offset_index[0].page_locations.iter().peekable();
1992
1993 let mut selectors = vec![];
1995 let mut expected_page_requests: Vec<Range<usize>> = vec![];
1996 while let Some(page) = pages.next() {
1997 let num_rows = if let Some(next_page) = pages.peek() {
1998 next_page.first_row_index - page.first_row_index
1999 } else {
2000 num_rows - page.first_row_index
2001 };
2002
2003 if skip {
2004 selectors.push(RowSelector::skip(num_rows as usize));
2005 } else {
2006 selectors.push(RowSelector::select(num_rows as usize));
2007 let start = page.offset as usize;
2008 let end = start + page.compressed_page_size as usize;
2009 expected_page_requests.push(start..end);
2010 }
2011 skip = !skip;
2012 }
2013
2014 let selection = RowSelection::from(selectors);
2015
2016 let (_factory, _reader) = reader_factory
2017 .read_row_group(0, Some(selection), projection.clone(), 48)
2018 .await
2019 .expect("reading row group");
2020
2021 let requests = requests.lock().unwrap();
2022
2023 assert_eq!(&requests[..], &expected_page_requests)
2024 }
2025
2026 #[tokio::test]
2027 async fn test_batch_size_overallocate() {
2028 let testdata = arrow::util::test_util::parquet_test_data();
2029 let path = format!("{testdata}/alltypes_plain.parquet");
2031 let data = Bytes::from(std::fs::read(path).unwrap());
2032
2033 let async_reader = TestReader::new(data.clone());
2034
2035 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2036 .await
2037 .unwrap();
2038
2039 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
2040
2041 let stream = builder
2042 .with_projection(ProjectionMask::all())
2043 .with_batch_size(1024)
2044 .build()
2045 .unwrap();
2046 assert_ne!(1024, file_rows);
2047 assert_eq!(stream.batch_size, file_rows);
2048 }
2049
2050 #[tokio::test]
2051 async fn test_get_row_group_column_bloom_filter_without_length() {
2052 let testdata = arrow::util::test_util::parquet_test_data();
2053 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2054 let data = Bytes::from(std::fs::read(path).unwrap());
2055 test_get_row_group_column_bloom_filter(data, false).await;
2056 }
2057
2058 #[tokio::test]
2059 async fn test_parquet_record_batch_stream_schema() {
2060 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
2061 schema.flattened_fields().iter().map(|f| f.name()).collect()
2062 }
2063
2064 let mut metadata = HashMap::with_capacity(1);
2073 metadata.insert("key".to_string(), "value".to_string());
2074
2075 let nested_struct_array = StructArray::from(vec![
2076 (
2077 Arc::new(Field::new("d", DataType::Utf8, true)),
2078 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
2079 ),
2080 (
2081 Arc::new(Field::new("e", DataType::Utf8, true)),
2082 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
2083 ),
2084 ]);
2085 let struct_array = StructArray::from(vec![
2086 (
2087 Arc::new(Field::new("a", DataType::Int32, true)),
2088 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
2089 ),
2090 (
2091 Arc::new(Field::new("b", DataType::UInt64, true)),
2092 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
2093 ),
2094 (
2095 Arc::new(Field::new(
2096 "c",
2097 nested_struct_array.data_type().clone(),
2098 true,
2099 )),
2100 Arc::new(nested_struct_array) as ArrayRef,
2101 ),
2102 ]);
2103
2104 let schema =
2105 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
2106 let record_batch = RecordBatch::from(struct_array)
2107 .with_schema(schema.clone())
2108 .unwrap();
2109
2110 let mut file = tempfile().unwrap();
2112 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
2113 writer.write(&record_batch).unwrap();
2114 writer.close().unwrap();
2115
2116 let all_fields = ["a", "b", "c", "d", "e"];
2117 let projections = [
2119 (vec![], vec![]),
2120 (vec![0], vec!["a"]),
2121 (vec![0, 1], vec!["a", "b"]),
2122 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
2123 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
2124 ];
2125
2126 for (indices, expected_projected_names) in projections {
2128 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
2129 assert_eq!(get_all_field_names(&builder), all_fields);
2131 assert_eq!(builder.metadata, metadata);
2132 assert_eq!(get_all_field_names(&reader), expected_projected_names);
2134 assert_eq!(reader.metadata, HashMap::default());
2135 assert_eq!(get_all_field_names(&batch), expected_projected_names);
2136 assert_eq!(batch.metadata, HashMap::default());
2137 };
2138
2139 let builder =
2140 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
2141 let sync_builder_schema = builder.schema().clone();
2142 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
2143 let mut reader = builder.with_projection(mask).build().unwrap();
2144 let sync_reader_schema = reader.schema();
2145 let batch = reader.next().unwrap().unwrap();
2146 let sync_batch_schema = batch.schema();
2147 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
2148
2149 let file = tokio::fs::File::from(file.try_clone().unwrap());
2151 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
2152 let async_builder_schema = builder.schema().clone();
2153 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
2154 let mut reader = builder.with_projection(mask).build().unwrap();
2155 let async_reader_schema = reader.schema().clone();
2156 let batch = reader.next().await.unwrap().unwrap();
2157 let async_batch_schema = batch.schema();
2158 assert_schemas(
2159 async_builder_schema,
2160 async_reader_schema,
2161 async_batch_schema,
2162 );
2163 }
2164 }
2165
2166 #[tokio::test]
2167 async fn test_get_row_group_column_bloom_filter_with_length() {
2168 let testdata = arrow::util::test_util::parquet_test_data();
2170 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
2171 let data = Bytes::from(std::fs::read(path).unwrap());
2172 let async_reader = TestReader::new(data.clone());
2173 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2174 .await
2175 .unwrap();
2176 let schema = builder.schema().clone();
2177 let stream = builder.build().unwrap();
2178 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
2179
2180 let mut parquet_data = Vec::new();
2181 let props = WriterProperties::builder()
2182 .set_bloom_filter_enabled(true)
2183 .build();
2184 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
2185 for batch in batches {
2186 writer.write(&batch).unwrap();
2187 }
2188 writer.close().unwrap();
2189
2190 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
2192 }
2193
2194 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2195 let async_reader = TestReader::new(data.clone());
2196
2197 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
2198 .await
2199 .unwrap();
2200
2201 let metadata = builder.metadata();
2202 assert_eq!(metadata.num_row_groups(), 1);
2203 let row_group = metadata.row_group(0);
2204 let column = row_group.column(0);
2205 assert_eq!(column.bloom_filter_length().is_some(), with_length);
2206
2207 let sbbf = builder
2208 .get_row_group_column_bloom_filter(0, 0)
2209 .await
2210 .unwrap()
2211 .unwrap();
2212 assert!(sbbf.check(&"Hello"));
2213 assert!(!sbbf.check(&"Hello_Not_Exists"));
2214 }
2215
2216 #[tokio::test]
2217 async fn test_nested_skip() {
2218 let schema = Arc::new(Schema::new(vec![
2219 Field::new("col_1", DataType::UInt64, false),
2220 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
2221 ]));
2222
2223 let props = WriterProperties::builder()
2225 .set_data_page_row_count_limit(256)
2226 .set_write_batch_size(256)
2227 .set_max_row_group_size(1024);
2228
2229 let mut file = tempfile().unwrap();
2231 let mut writer =
2232 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
2233
2234 let mut builder = ListBuilder::new(StringBuilder::new());
2235 for id in 0..1024 {
2236 match id % 3 {
2237 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
2238 1 => builder.append_value([Some(format!("id_{id}"))]),
2239 _ => builder.append_null(),
2240 }
2241 }
2242 let refs = vec![
2243 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
2244 Arc::new(builder.finish()) as ArrayRef,
2245 ];
2246
2247 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
2248 writer.write(&batch).unwrap();
2249 writer.close().unwrap();
2250
2251 let selections = [
2252 RowSelection::from(vec![
2253 RowSelector::skip(313),
2254 RowSelector::select(1),
2255 RowSelector::skip(709),
2256 RowSelector::select(1),
2257 ]),
2258 RowSelection::from(vec![
2259 RowSelector::skip(255),
2260 RowSelector::select(1),
2261 RowSelector::skip(767),
2262 RowSelector::select(1),
2263 ]),
2264 RowSelection::from(vec![
2265 RowSelector::select(255),
2266 RowSelector::skip(1),
2267 RowSelector::select(767),
2268 RowSelector::skip(1),
2269 ]),
2270 RowSelection::from(vec![
2271 RowSelector::skip(254),
2272 RowSelector::select(1),
2273 RowSelector::select(1),
2274 RowSelector::skip(767),
2275 RowSelector::select(1),
2276 ]),
2277 ];
2278
2279 for selection in selections {
2280 let expected = selection.row_count();
2281 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
2283 tokio::fs::File::from_std(file.try_clone().unwrap()),
2284 ArrowReaderOptions::new().with_page_index(true),
2285 )
2286 .await
2287 .unwrap();
2288
2289 reader = reader.with_row_selection(selection);
2290
2291 let mut stream = reader.build().unwrap();
2292
2293 let mut total_rows = 0;
2294 while let Some(rb) = stream.next().await {
2295 let rb = rb.unwrap();
2296 total_rows += rb.num_rows();
2297 }
2298 assert_eq!(total_rows, expected);
2299 }
2300 }
2301
2302 #[tokio::test]
2303 async fn test_row_filter_nested() {
2304 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
2305 let b = StructArray::from(vec![
2306 (
2307 Arc::new(Field::new("aa", DataType::Utf8, true)),
2308 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
2309 ),
2310 (
2311 Arc::new(Field::new("bb", DataType::Utf8, true)),
2312 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
2313 ),
2314 ]);
2315 let c = Int32Array::from_iter(0..6);
2316 let data = RecordBatch::try_from_iter([
2317 ("a", Arc::new(a) as ArrayRef),
2318 ("b", Arc::new(b) as ArrayRef),
2319 ("c", Arc::new(c) as ArrayRef),
2320 ])
2321 .unwrap();
2322
2323 let mut buf = Vec::with_capacity(1024);
2324 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
2325 writer.write(&data).unwrap();
2326 writer.close().unwrap();
2327
2328 let data: Bytes = buf.into();
2329 let metadata = ParquetMetaDataReader::new()
2330 .parse_and_finish(&data)
2331 .unwrap();
2332 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2333
2334 let test = TestReader::new(data);
2335 let requests = test.requests.clone();
2336
2337 let a_scalar = StringArray::from_iter_values(["b"]);
2338 let a_filter = ArrowPredicateFn::new(
2339 ProjectionMask::leaves(&parquet_schema, vec![0]),
2340 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
2341 );
2342
2343 let b_scalar = StringArray::from_iter_values(["4"]);
2344 let b_filter = ArrowPredicateFn::new(
2345 ProjectionMask::leaves(&parquet_schema, vec![2]),
2346 move |batch| {
2347 let struct_array = batch
2349 .column(0)
2350 .as_any()
2351 .downcast_ref::<StructArray>()
2352 .unwrap();
2353 eq(struct_array.column(0), &Scalar::new(&b_scalar))
2354 },
2355 );
2356
2357 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
2358
2359 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
2360 let stream = ParquetRecordBatchStreamBuilder::new(test)
2361 .await
2362 .unwrap()
2363 .with_projection(mask.clone())
2364 .with_batch_size(1024)
2365 .with_row_filter(filter)
2366 .build()
2367 .unwrap();
2368
2369 let batches: Vec<_> = stream.try_collect().await.unwrap();
2370 assert_eq!(batches.len(), 1);
2371
2372 let batch = &batches[0];
2373 assert_eq!(batch.num_rows(), 1);
2374 assert_eq!(batch.num_columns(), 2);
2375
2376 let col = batch.column(0);
2377 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
2378 assert_eq!(val, "b");
2379
2380 let col = batch.column(1);
2381 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
2382 assert_eq!(val, 3);
2383
2384 assert_eq!(requests.lock().unwrap().len(), 3);
2389 }
2390
2391 #[tokio::test]
2392 async fn test_cache_projection_excludes_nested_columns() {
2393 use arrow_array::{ArrayRef, StringArray};
2394
2395 let a = StringArray::from_iter_values(["r1", "r2"]);
2397 let b = StructArray::from(vec![
2398 (
2399 Arc::new(Field::new("aa", DataType::Utf8, true)),
2400 Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2401 ),
2402 (
2403 Arc::new(Field::new("bb", DataType::Utf8, true)),
2404 Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2405 ),
2406 ]);
2407
2408 let schema = Arc::new(Schema::new(vec![
2409 Field::new("a", DataType::Utf8, true),
2410 Field::new("b", b.data_type().clone(), true),
2411 ]));
2412
2413 let mut buf = Vec::new();
2414 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2415 let batch = RecordBatch::try_from_iter([
2416 ("a", Arc::new(a) as ArrayRef),
2417 ("b", Arc::new(b) as ArrayRef),
2418 ])
2419 .unwrap();
2420 writer.write(&batch).unwrap();
2421 writer.close().unwrap();
2422
2423 let data: Bytes = buf.into();
2425 let metadata = ParquetMetaDataReader::new()
2426 .parse_and_finish(&data)
2427 .unwrap();
2428 let metadata = Arc::new(metadata);
2429
2430 let parquet_schema = metadata.file_metadata().schema_descr();
2433 let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2434
2435 let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2436 Ok(arrow_array::BooleanArray::from(vec![
2437 true;
2438 batch.num_rows()
2439 ]))
2440 });
2441 let filter = RowFilter::new(vec![Box::new(always_true)]);
2442
2443 let reader_factory = ReaderFactory {
2445 metadata: Arc::clone(&metadata),
2446 fields: None,
2447 input: TestReader::new(data),
2448 filter: Some(filter),
2449 limit: None,
2450 offset: None,
2451 metrics: ArrowReaderMetrics::disabled(),
2452 max_predicate_cache_size: 0,
2453 };
2454
2455 let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2457
2458 assert!(cache_projection.is_none());
2460 }
2461
2462 #[tokio::test]
2463 #[allow(deprecated)]
2464 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
2465 use tokio::fs::File;
2466 let testdata = arrow::util::test_util::parquet_test_data();
2467 let path = format!("{testdata}/alltypes_plain.parquet");
2468 let mut file = File::open(&path).await.unwrap();
2469 let file_size = file.metadata().await.unwrap().len();
2470 let mut metadata = ParquetMetaDataReader::new()
2471 .with_page_indexes(true)
2472 .load_and_finish(&mut file, file_size)
2473 .await
2474 .unwrap();
2475
2476 metadata.set_offset_index(Some(vec![]));
2477 let options = ArrowReaderOptions::new().with_page_index(true);
2478 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2479 let reader =
2480 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2481 .build()
2482 .unwrap();
2483
2484 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2485 assert_eq!(result.len(), 1);
2486 }
2487
2488 #[tokio::test]
2489 #[allow(deprecated)]
2490 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
2491 use tokio::fs::File;
2492 let testdata = arrow::util::test_util::parquet_test_data();
2493 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
2494 let mut file = File::open(&path).await.unwrap();
2495 let file_size = file.metadata().await.unwrap().len();
2496 let metadata = ParquetMetaDataReader::new()
2497 .with_page_indexes(true)
2498 .load_and_finish(&mut file, file_size)
2499 .await
2500 .unwrap();
2501
2502 let options = ArrowReaderOptions::new().with_page_index(true);
2503 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2504 let reader =
2505 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2506 .build()
2507 .unwrap();
2508
2509 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2510 assert_eq!(result.len(), 8);
2511 }
2512
2513 #[tokio::test]
2514 #[allow(deprecated)]
2515 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2516 use tempfile::TempDir;
2517 use tokio::fs::File;
2518 fn write_metadata_to_local_file(
2519 metadata: ParquetMetaData,
2520 file: impl AsRef<std::path::Path>,
2521 ) {
2522 use crate::file::metadata::ParquetMetaDataWriter;
2523 use std::fs::File;
2524 let file = File::create(file).unwrap();
2525 ParquetMetaDataWriter::new(file, &metadata)
2526 .finish()
2527 .unwrap()
2528 }
2529
2530 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2531 use std::fs::File;
2532 let file = File::open(file).unwrap();
2533 ParquetMetaDataReader::new()
2534 .with_page_indexes(true)
2535 .parse_and_finish(&file)
2536 .unwrap()
2537 }
2538
2539 let testdata = arrow::util::test_util::parquet_test_data();
2540 let path = format!("{testdata}/alltypes_plain.parquet");
2541 let mut file = File::open(&path).await.unwrap();
2542 let file_size = file.metadata().await.unwrap().len();
2543 let metadata = ParquetMetaDataReader::new()
2544 .with_page_indexes(true)
2545 .load_and_finish(&mut file, file_size)
2546 .await
2547 .unwrap();
2548
2549 let tempdir = TempDir::new().unwrap();
2550 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2551 write_metadata_to_local_file(metadata, &metadata_path);
2552 let metadata = read_metadata_from_local_file(&metadata_path);
2553
2554 let options = ArrowReaderOptions::new().with_page_index(true);
2555 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2556 let reader =
2557 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2558 .build()
2559 .unwrap();
2560
2561 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2563 assert_eq!(result.len(), 1);
2564 }
2565
2566 #[tokio::test]
2567 async fn test_cached_array_reader_sparse_offset_error() {
2568 use futures::TryStreamExt;
2569
2570 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2571 use arrow_array::{BooleanArray, RecordBatch};
2572
2573 let testdata = arrow::util::test_util::parquet_test_data();
2574 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2575 let data = Bytes::from(std::fs::read(path).unwrap());
2576
2577 let async_reader = TestReader::new(data);
2578
2579 let options = ArrowReaderOptions::new().with_page_index(true);
2581 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2582 .await
2583 .unwrap();
2584
2585 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2589
2590 let parquet_schema = builder.parquet_schema();
2594 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2595 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2596 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2597 });
2598 let filter = RowFilter::new(vec![Box::new(always_true)]);
2599
2600 let stream = builder
2603 .with_batch_size(8)
2604 .with_projection(proj)
2605 .with_row_selection(selection)
2606 .with_row_filter(filter)
2607 .build()
2608 .unwrap();
2609
2610 let _result: Vec<_> = stream.try_collect().await.unwrap();
2613 }
2614}