1use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61pub trait AsyncFileReader: Send {
75 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80 async move {
81 let mut result = Vec::with_capacity(ranges.len());
82
83 for range in ranges.into_iter() {
84 let data = self.get_bytes(range).await?;
85 result.push(data);
86 }
87
88 Ok(result)
89 }
90 .boxed()
91 }
92
93 fn get_metadata<'a>(
110 &'a mut self,
111 options: Option<&'a ArrowReaderOptions>,
112 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118 self.as_mut().get_bytes(range)
119 }
120
121 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122 self.as_mut().get_byte_ranges(ranges)
123 }
124
125 fn get_metadata<'a>(
126 &'a mut self,
127 options: Option<&'a ArrowReaderOptions>,
128 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129 self.as_mut().get_metadata(options)
130 }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135 async move {
136 self.seek(SeekFrom::End(-(suffix as i64))).await?;
137 let mut buf = Vec::with_capacity(suffix);
138 self.take(suffix as _).read_to_end(&mut buf).await?;
139 Ok(buf.into())
140 }
141 .boxed()
142 }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147 async move {
148 self.seek(SeekFrom::Start(range.start)).await?;
149
150 let to_read = range.end - range.start;
151 let mut buffer = Vec::with_capacity(to_read.try_into()?);
152 let read = self.take(to_read).read_to_end(&mut buffer).await?;
153 if read as u64 != to_read {
154 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155 }
156
157 Ok(buffer.into())
158 }
159 .boxed()
160 }
161
162 fn get_metadata<'a>(
163 &'a mut self,
164 options: Option<&'a ArrowReaderOptions>,
165 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166 async move {
167 let metadata_opts = options.map(|o| o.metadata_options().clone());
168 let metadata_reader = ParquetMetaDataReader::new()
169 .with_page_index_policy(PageIndexPolicy::from(
170 options.is_some_and(|o| o.page_index()),
171 ))
172 .with_metadata_options(metadata_opts);
173
174 #[cfg(feature = "encryption")]
175 let metadata_reader = metadata_reader.with_decryption_properties(
176 options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
177 );
178
179 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
180 Ok(Arc::new(parquet_metadata))
181 }
182 .boxed()
183 }
184}
185
186impl ArrowReaderMetadata {
187 pub async fn load_async<T: AsyncFileReader>(
191 input: &mut T,
192 options: ArrowReaderOptions,
193 ) -> Result<Self> {
194 let metadata = input.get_metadata(Some(&options)).await?;
195 Self::try_new(metadata, options)
196 }
197}
198
199#[doc(hidden)]
200pub struct AsyncReader<T>(T);
205
206pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
219
220impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
221 pub async fn new(input: T) -> Result<Self> {
352 Self::new_with_options(input, Default::default()).await
353 }
354
355 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
358 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
359 Ok(Self::new_with_metadata(input, metadata))
360 }
361
362 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
408 Self::new_builder(AsyncReader(input), metadata)
409 }
410
411 pub async fn get_row_group_column_bloom_filter(
417 &mut self,
418 row_group_idx: usize,
419 column_idx: usize,
420 ) -> Result<Option<Sbbf>> {
421 let metadata = self.metadata.row_group(row_group_idx);
422 let column_metadata = metadata.column(column_idx);
423
424 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
425 offset
426 .try_into()
427 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
428 } else {
429 return Ok(None);
430 };
431
432 let buffer = match column_metadata.bloom_filter_length() {
433 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
434 None => self
435 .input
436 .0
437 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
438 }
439 .await?;
440
441 let (header, bitset_offset) =
442 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
443
444 match header.algorithm {
445 BloomFilterAlgorithm::BLOCK => {
446 }
448 }
449 match header.compression {
450 BloomFilterCompression::UNCOMPRESSED => {
451 }
453 }
454 match header.hash {
455 BloomFilterHash::XXHASH => {
456 }
458 }
459
460 let bitset = match column_metadata.bloom_filter_length() {
461 Some(_) => buffer.slice(
462 (TryInto::<usize>::try_into(bitset_offset).unwrap()
463 - TryInto::<usize>::try_into(offset).unwrap())..,
464 ),
465 None => {
466 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
467 ParquetError::General("Bloom filter length is invalid".to_string())
468 })?;
469 self.input
470 .0
471 .get_bytes(bitset_offset..bitset_offset + bitset_length)
472 .await?
473 }
474 };
475 Ok(Some(Sbbf::new(&bitset)))
476 }
477
478 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
482 let Self {
483 input,
484 metadata,
485 schema,
486 fields,
487 batch_size,
488 row_groups,
489 projection,
490 filter,
491 selection,
492 row_selection_policy: selection_strategy,
493 limit,
494 offset,
495 metrics,
496 max_predicate_cache_size,
497 } = self;
498
499 let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
502 let projected_fields = schema
503 .fields
504 .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
505 let projected_schema = Arc::new(Schema::new(projected_fields));
506
507 let decoder = ParquetPushDecoderBuilder {
508 input: NoInput,
509 metadata,
510 schema,
511 fields,
512 projection,
513 filter,
514 selection,
515 row_selection_policy: selection_strategy,
516 batch_size,
517 row_groups,
518 limit,
519 offset,
520 metrics,
521 max_predicate_cache_size,
522 }
523 .build()?;
524
525 let request_state = RequestState::None { input: input.0 };
526
527 Ok(ParquetRecordBatchStream {
528 schema: projected_schema,
529 decoder,
530 request_state,
531 })
532 }
533}
534
535enum RequestState<T> {
539 None {
541 input: T,
542 },
543 Outstanding {
545 ranges: Vec<Range<u64>>,
547 future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
552 },
553 Done,
554}
555
556impl<T> RequestState<T>
557where
558 T: AsyncFileReader + Unpin + Send + 'static,
559{
560 fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
562 let ranges_captured = ranges.clone();
563
564 let future = async move {
569 let data = input.get_byte_ranges(ranges_captured).await?;
570 Ok((input, data))
571 }
572 .boxed();
573 RequestState::Outstanding { ranges, future }
574 }
575}
576
577impl<T> std::fmt::Debug for RequestState<T> {
578 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579 match self {
580 RequestState::None { input: _ } => f
581 .debug_struct("RequestState::None")
582 .field("input", &"...")
583 .finish(),
584 RequestState::Outstanding { ranges, .. } => f
585 .debug_struct("RequestState::Outstanding")
586 .field("ranges", &ranges)
587 .finish(),
588 RequestState::Done => {
589 write!(f, "RequestState::Done")
590 }
591 }
592 }
593}
594
595pub struct ParquetRecordBatchStream<T> {
615 schema: SchemaRef,
617 request_state: RequestState<T>,
619 decoder: ParquetPushDecoder,
621}
622
623impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
624 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
625 f.debug_struct("ParquetRecordBatchStream")
626 .field("request_state", &self.request_state)
627 .finish()
628 }
629}
630
631impl<T> ParquetRecordBatchStream<T> {
632 pub fn schema(&self) -> &SchemaRef {
637 &self.schema
638 }
639}
640
641impl<T> ParquetRecordBatchStream<T>
642where
643 T: AsyncFileReader + Unpin + Send + 'static,
644{
645 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
659 loop {
660 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
663 match request_state {
664 RequestState::None { input } => {
666 match self.decoder.try_next_reader()? {
667 DecodeResult::NeedsData(ranges) => {
668 self.request_state = RequestState::begin_request(input, ranges);
669 continue; }
671 DecodeResult::Data(reader) => {
672 self.request_state = RequestState::None { input };
673 return Ok(Some(reader));
674 }
675 DecodeResult::Finished => return Ok(None),
676 }
677 }
678 RequestState::Outstanding { ranges, future } => {
679 let (input, data) = future.await?;
680 self.decoder.push_ranges(ranges, data)?;
682 self.request_state = RequestState::None { input };
683 continue; }
685 RequestState::Done => {
686 self.request_state = RequestState::Done;
687 return Ok(None);
688 }
689 }
690 }
691 }
692}
693
694impl<T> Stream for ParquetRecordBatchStream<T>
695where
696 T: AsyncFileReader + Unpin + Send + 'static,
697{
698 type Item = Result<RecordBatch>;
699 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
700 match self.poll_next_inner(cx) {
701 Ok(res) => {
702 res.map(|res| Ok(res).transpose())
705 }
706 Err(e) => {
707 self.request_state = RequestState::Done;
708 Poll::Ready(Some(Err(e)))
709 }
710 }
711 }
712}
713
714impl<T> ParquetRecordBatchStream<T>
715where
716 T: AsyncFileReader + Unpin + Send + 'static,
717{
718 fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
723 loop {
724 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
725 match request_state {
726 RequestState::None { input } => {
727 match self.decoder.try_decode()? {
729 DecodeResult::NeedsData(ranges) => {
730 self.request_state = RequestState::begin_request(input, ranges);
731 continue; }
733 DecodeResult::Data(batch) => {
734 self.request_state = RequestState::None { input };
735 return Ok(Poll::Ready(Some(batch)));
736 }
737 DecodeResult::Finished => {
738 self.request_state = RequestState::Done;
739 return Ok(Poll::Ready(None));
740 }
741 }
742 }
743 RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
744 Poll::Ready(result) => {
746 let (input, data) = result?;
747 self.decoder.push_ranges(ranges, data)?;
749 self.request_state = RequestState::None { input };
750 continue; }
752 Poll::Pending => {
753 self.request_state = RequestState::Outstanding { ranges, future };
754 return Ok(Poll::Pending);
755 }
756 },
757 RequestState::Done => {
758 self.request_state = RequestState::Done;
760 return Ok(Poll::Ready(None));
761 }
762 }
763 }
764 }
765}
766
767#[cfg(test)]
768mod tests {
769 use super::*;
770 use crate::arrow::arrow_reader::RowSelectionPolicy;
771 use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
772 use crate::arrow::arrow_reader::{
773 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
774 };
775 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
776 use crate::arrow::schema::virtual_type::RowNumber;
777 use crate::arrow::{ArrowWriter, ProjectionMask};
778 use crate::file::metadata::ParquetMetaDataReader;
779 use crate::file::properties::WriterProperties;
780 use arrow::compute::kernels::cmp::eq;
781 use arrow::compute::or;
782 use arrow::error::Result as ArrowResult;
783 use arrow_array::builder::{ListBuilder, StringBuilder};
784 use arrow_array::cast::AsArray;
785 use arrow_array::types::Int32Type;
786 use arrow_array::{
787 Array, ArrayRef, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray,
788 StructArray, UInt64Array,
789 };
790 use arrow_schema::{DataType, Field, Schema};
791 use arrow_select::concat::concat_batches;
792 use futures::{StreamExt, TryStreamExt};
793 use rand::{Rng, rng};
794 use std::collections::HashMap;
795 use std::sync::{Arc, Mutex};
796 use tempfile::tempfile;
797
798 #[derive(Clone)]
799 struct TestReader {
800 data: Bytes,
801 metadata: Option<Arc<ParquetMetaData>>,
802 requests: Arc<Mutex<Vec<Range<usize>>>>,
803 }
804
805 impl TestReader {
806 fn new(data: Bytes) -> Self {
807 Self {
808 data,
809 metadata: Default::default(),
810 requests: Default::default(),
811 }
812 }
813 }
814
815 impl AsyncFileReader for TestReader {
816 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
817 let range = range.clone();
818 self.requests
819 .lock()
820 .unwrap()
821 .push(range.start as usize..range.end as usize);
822 futures::future::ready(Ok(self
823 .data
824 .slice(range.start as usize..range.end as usize)))
825 .boxed()
826 }
827
828 fn get_metadata<'a>(
829 &'a mut self,
830 options: Option<&'a ArrowReaderOptions>,
831 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
832 let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
833 PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
834 );
835 self.metadata = Some(Arc::new(
836 metadata_reader.parse_and_finish(&self.data).unwrap(),
837 ));
838 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
839 }
840 }
841
842 #[tokio::test]
843 async fn test_async_reader() {
844 let testdata = arrow::util::test_util::parquet_test_data();
845 let path = format!("{testdata}/alltypes_plain.parquet");
846 let data = Bytes::from(std::fs::read(path).unwrap());
847
848 let async_reader = TestReader::new(data.clone());
849
850 let requests = async_reader.requests.clone();
851 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
852 .await
853 .unwrap();
854
855 let metadata = builder.metadata().clone();
856 assert_eq!(metadata.num_row_groups(), 1);
857
858 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
859 let stream = builder
860 .with_projection(mask.clone())
861 .with_batch_size(1024)
862 .build()
863 .unwrap();
864
865 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
866
867 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
868 .unwrap()
869 .with_projection(mask)
870 .with_batch_size(104)
871 .build()
872 .unwrap()
873 .collect::<ArrowResult<Vec<_>>>()
874 .unwrap();
875
876 assert_eq!(async_batches, sync_batches);
877
878 let requests = requests.lock().unwrap();
879 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
880 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
881
882 assert_eq!(
883 &requests[..],
884 &[
885 offset_1 as usize..(offset_1 + length_1) as usize,
886 offset_2 as usize..(offset_2 + length_2) as usize
887 ]
888 );
889 }
890
891 #[tokio::test]
892 async fn test_async_reader_with_next_row_group() {
893 let testdata = arrow::util::test_util::parquet_test_data();
894 let path = format!("{testdata}/alltypes_plain.parquet");
895 let data = Bytes::from(std::fs::read(path).unwrap());
896
897 let async_reader = TestReader::new(data.clone());
898
899 let requests = async_reader.requests.clone();
900 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
901 .await
902 .unwrap();
903
904 let metadata = builder.metadata().clone();
905 assert_eq!(metadata.num_row_groups(), 1);
906
907 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
908 let mut stream = builder
909 .with_projection(mask.clone())
910 .with_batch_size(1024)
911 .build()
912 .unwrap();
913
914 let mut readers = vec![];
915 while let Some(reader) = stream.next_row_group().await.unwrap() {
916 readers.push(reader);
917 }
918
919 let async_batches: Vec<_> = readers
920 .into_iter()
921 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
922 .collect();
923
924 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
925 .unwrap()
926 .with_projection(mask)
927 .with_batch_size(104)
928 .build()
929 .unwrap()
930 .collect::<ArrowResult<Vec<_>>>()
931 .unwrap();
932
933 assert_eq!(async_batches, sync_batches);
934
935 let requests = requests.lock().unwrap();
936 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
937 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
938
939 assert_eq!(
940 &requests[..],
941 &[
942 offset_1 as usize..(offset_1 + length_1) as usize,
943 offset_2 as usize..(offset_2 + length_2) as usize
944 ]
945 );
946 }
947
948 #[tokio::test]
949 async fn test_async_reader_with_index() {
950 let testdata = arrow::util::test_util::parquet_test_data();
951 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
952 let data = Bytes::from(std::fs::read(path).unwrap());
953
954 let async_reader = TestReader::new(data.clone());
955
956 let options = ArrowReaderOptions::new().with_page_index(true);
957 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
958 .await
959 .unwrap();
960
961 let metadata_with_index = builder.metadata();
963 assert_eq!(metadata_with_index.num_row_groups(), 1);
964
965 let offset_index = metadata_with_index.offset_index().unwrap();
967 let column_index = metadata_with_index.column_index().unwrap();
968
969 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
970 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
971
972 let num_columns = metadata_with_index
973 .file_metadata()
974 .schema_descr()
975 .num_columns();
976
977 offset_index
979 .iter()
980 .for_each(|x| assert_eq!(x.len(), num_columns));
981 column_index
982 .iter()
983 .for_each(|x| assert_eq!(x.len(), num_columns));
984
985 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
986 let stream = builder
987 .with_projection(mask.clone())
988 .with_batch_size(1024)
989 .build()
990 .unwrap();
991
992 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
993
994 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
995 .unwrap()
996 .with_projection(mask)
997 .with_batch_size(1024)
998 .build()
999 .unwrap()
1000 .collect::<ArrowResult<Vec<_>>>()
1001 .unwrap();
1002
1003 assert_eq!(async_batches, sync_batches);
1004 }
1005
1006 #[tokio::test]
1007 async fn test_async_reader_with_limit() {
1008 let testdata = arrow::util::test_util::parquet_test_data();
1009 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1010 let data = Bytes::from(std::fs::read(path).unwrap());
1011
1012 let metadata = ParquetMetaDataReader::new()
1013 .parse_and_finish(&data)
1014 .unwrap();
1015 let metadata = Arc::new(metadata);
1016
1017 assert_eq!(metadata.num_row_groups(), 1);
1018
1019 let async_reader = TestReader::new(data.clone());
1020
1021 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1022 .await
1023 .unwrap();
1024
1025 assert_eq!(builder.metadata().num_row_groups(), 1);
1026
1027 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1028 let stream = builder
1029 .with_projection(mask.clone())
1030 .with_batch_size(1024)
1031 .with_limit(1)
1032 .build()
1033 .unwrap();
1034
1035 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1036
1037 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1038 .unwrap()
1039 .with_projection(mask)
1040 .with_batch_size(1024)
1041 .with_limit(1)
1042 .build()
1043 .unwrap()
1044 .collect::<ArrowResult<Vec<_>>>()
1045 .unwrap();
1046
1047 assert_eq!(async_batches, sync_batches);
1048 }
1049
1050 #[tokio::test]
1051 async fn test_async_reader_skip_pages() {
1052 let testdata = arrow::util::test_util::parquet_test_data();
1053 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1054 let data = Bytes::from(std::fs::read(path).unwrap());
1055
1056 let async_reader = TestReader::new(data.clone());
1057
1058 let options = ArrowReaderOptions::new().with_page_index(true);
1059 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1060 .await
1061 .unwrap();
1062
1063 assert_eq!(builder.metadata().num_row_groups(), 1);
1064
1065 let selection = RowSelection::from(vec![
1066 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1075
1076 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1077
1078 let stream = builder
1079 .with_projection(mask.clone())
1080 .with_row_selection(selection.clone())
1081 .build()
1082 .expect("building stream");
1083
1084 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1085
1086 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1087 .unwrap()
1088 .with_projection(mask)
1089 .with_batch_size(1024)
1090 .with_row_selection(selection)
1091 .build()
1092 .unwrap()
1093 .collect::<ArrowResult<Vec<_>>>()
1094 .unwrap();
1095
1096 assert_eq!(async_batches, sync_batches);
1097 }
1098
1099 #[tokio::test]
1100 async fn test_fuzz_async_reader_selection() {
1101 let testdata = arrow::util::test_util::parquet_test_data();
1102 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1103 let data = Bytes::from(std::fs::read(path).unwrap());
1104
1105 let mut rand = rng();
1106
1107 for _ in 0..100 {
1108 let mut expected_rows = 0;
1109 let mut total_rows = 0;
1110 let mut skip = false;
1111 let mut selectors = vec![];
1112
1113 while total_rows < 7300 {
1114 let row_count: usize = rand.random_range(1..100);
1115
1116 let row_count = row_count.min(7300 - total_rows);
1117
1118 selectors.push(RowSelector { row_count, skip });
1119
1120 total_rows += row_count;
1121 if !skip {
1122 expected_rows += row_count;
1123 }
1124
1125 skip = !skip;
1126 }
1127
1128 let selection = RowSelection::from(selectors);
1129
1130 let async_reader = TestReader::new(data.clone());
1131
1132 let options = ArrowReaderOptions::new().with_page_index(true);
1133 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1134 .await
1135 .unwrap();
1136
1137 assert_eq!(builder.metadata().num_row_groups(), 1);
1138
1139 let col_idx: usize = rand.random_range(0..13);
1140 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1141
1142 let stream = builder
1143 .with_projection(mask.clone())
1144 .with_row_selection(selection.clone())
1145 .build()
1146 .expect("building stream");
1147
1148 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1149
1150 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1151
1152 assert_eq!(actual_rows, expected_rows);
1153 }
1154 }
1155
1156 #[tokio::test]
1157 async fn test_async_reader_zero_row_selector() {
1158 let testdata = arrow::util::test_util::parquet_test_data();
1160 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1161 let data = Bytes::from(std::fs::read(path).unwrap());
1162
1163 let mut rand = rng();
1164
1165 let mut expected_rows = 0;
1166 let mut total_rows = 0;
1167 let mut skip = false;
1168 let mut selectors = vec![];
1169
1170 selectors.push(RowSelector {
1171 row_count: 0,
1172 skip: false,
1173 });
1174
1175 while total_rows < 7300 {
1176 let row_count: usize = rand.random_range(1..100);
1177
1178 let row_count = row_count.min(7300 - total_rows);
1179
1180 selectors.push(RowSelector { row_count, skip });
1181
1182 total_rows += row_count;
1183 if !skip {
1184 expected_rows += row_count;
1185 }
1186
1187 skip = !skip;
1188 }
1189
1190 let selection = RowSelection::from(selectors);
1191
1192 let async_reader = TestReader::new(data.clone());
1193
1194 let options = ArrowReaderOptions::new().with_page_index(true);
1195 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1196 .await
1197 .unwrap();
1198
1199 assert_eq!(builder.metadata().num_row_groups(), 1);
1200
1201 let col_idx: usize = rand.random_range(0..13);
1202 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1203
1204 let stream = builder
1205 .with_projection(mask.clone())
1206 .with_row_selection(selection.clone())
1207 .build()
1208 .expect("building stream");
1209
1210 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1211
1212 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1213
1214 assert_eq!(actual_rows, expected_rows);
1215 }
1216
1217 #[tokio::test]
1218 async fn test_row_filter_full_page_skip_is_handled_async() {
1219 let first_value: i64 = 1111;
1220 let last_value: i64 = 9999;
1221 let num_rows: usize = 12;
1222
1223 let schema = Arc::new(Schema::new(vec![
1227 Field::new("key", DataType::Int64, false),
1228 Field::new("value", DataType::Int64, false),
1229 ]));
1230
1231 let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
1232 int_values[0] = first_value;
1233 int_values[num_rows - 1] = last_value;
1234 let keys = Int64Array::from(int_values.clone());
1235 let values = Int64Array::from(int_values.clone());
1236 let batch = RecordBatch::try_new(
1237 Arc::clone(&schema),
1238 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
1239 )
1240 .unwrap();
1241
1242 let props = WriterProperties::builder()
1243 .set_write_batch_size(2)
1244 .set_data_page_row_count_limit(2)
1245 .build();
1246
1247 let mut buffer = Vec::new();
1248 let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
1249 writer.write(&batch).unwrap();
1250 writer.close().unwrap();
1251 let data = Bytes::from(buffer);
1252
1253 let builder = ParquetRecordBatchStreamBuilder::new_with_options(
1254 TestReader::new(data.clone()),
1255 ArrowReaderOptions::new().with_page_index(true),
1256 )
1257 .await
1258 .unwrap();
1259 let schema = builder.parquet_schema().clone();
1260 let filter_mask = ProjectionMask::leaves(&schema, [0]);
1261
1262 let make_predicate = |mask: ProjectionMask| {
1263 ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
1264 let column = batch.column(0);
1265 let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
1266 let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
1267 or(&match_first, &match_second)
1268 })
1269 };
1270
1271 let predicate = make_predicate(filter_mask.clone());
1272
1273 let stream = ParquetRecordBatchStreamBuilder::new_with_options(
1276 TestReader::new(data.clone()),
1277 ArrowReaderOptions::new().with_page_index(true),
1278 )
1279 .await
1280 .unwrap()
1281 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1282 .with_batch_size(12)
1283 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
1284 .build()
1285 .unwrap();
1286
1287 let schema = stream.schema().clone();
1288 let batches: Vec<_> = stream.try_collect().await.unwrap();
1289 let result = concat_batches(&schema, &batches).unwrap();
1290 assert_eq!(result.num_rows(), 2);
1291 }
1292
1293 #[tokio::test]
1294 async fn test_row_filter() {
1295 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1296 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1297 let data = RecordBatch::try_from_iter([
1298 ("a", Arc::new(a) as ArrayRef),
1299 ("b", Arc::new(b) as ArrayRef),
1300 ])
1301 .unwrap();
1302
1303 let mut buf = Vec::with_capacity(1024);
1304 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1305 writer.write(&data).unwrap();
1306 writer.close().unwrap();
1307
1308 let data: Bytes = buf.into();
1309 let metadata = ParquetMetaDataReader::new()
1310 .parse_and_finish(&data)
1311 .unwrap();
1312 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1313
1314 let test = TestReader::new(data);
1315 let requests = test.requests.clone();
1316
1317 let a_scalar = StringArray::from_iter_values(["b"]);
1318 let a_filter = ArrowPredicateFn::new(
1319 ProjectionMask::leaves(&parquet_schema, vec![0]),
1320 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1321 );
1322
1323 let filter = RowFilter::new(vec![Box::new(a_filter)]);
1324
1325 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1326 let stream = ParquetRecordBatchStreamBuilder::new(test)
1327 .await
1328 .unwrap()
1329 .with_projection(mask.clone())
1330 .with_batch_size(1024)
1331 .with_row_filter(filter)
1332 .build()
1333 .unwrap();
1334
1335 let batches: Vec<_> = stream.try_collect().await.unwrap();
1336 assert_eq!(batches.len(), 1);
1337
1338 let batch = &batches[0];
1339 assert_eq!(batch.num_columns(), 2);
1340
1341 assert_eq!(
1343 batch.column(0).as_ref(),
1344 &StringArray::from_iter_values(["b", "b", "b"])
1345 );
1346 assert_eq!(
1347 batch.column(1).as_ref(),
1348 &StringArray::from_iter_values(["2", "3", "4"])
1349 );
1350
1351 assert_eq!(requests.lock().unwrap().len(), 2);
1355 }
1356
1357 #[tokio::test]
1358 async fn test_two_row_filters() {
1359 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1360 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1361 let c = Int32Array::from_iter(0..6);
1362 let data = RecordBatch::try_from_iter([
1363 ("a", Arc::new(a) as ArrayRef),
1364 ("b", Arc::new(b) as ArrayRef),
1365 ("c", Arc::new(c) as ArrayRef),
1366 ])
1367 .unwrap();
1368
1369 let mut buf = Vec::with_capacity(1024);
1370 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1371 writer.write(&data).unwrap();
1372 writer.close().unwrap();
1373
1374 let data: Bytes = buf.into();
1375 let metadata = ParquetMetaDataReader::new()
1376 .parse_and_finish(&data)
1377 .unwrap();
1378 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1379
1380 let test = TestReader::new(data);
1381 let requests = test.requests.clone();
1382
1383 let a_scalar = StringArray::from_iter_values(["b"]);
1384 let a_filter = ArrowPredicateFn::new(
1385 ProjectionMask::leaves(&parquet_schema, vec![0]),
1386 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1387 );
1388
1389 let b_scalar = StringArray::from_iter_values(["4"]);
1390 let b_filter = ArrowPredicateFn::new(
1391 ProjectionMask::leaves(&parquet_schema, vec![1]),
1392 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1393 );
1394
1395 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1396
1397 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1398 let stream = ParquetRecordBatchStreamBuilder::new(test)
1399 .await
1400 .unwrap()
1401 .with_projection(mask.clone())
1402 .with_batch_size(1024)
1403 .with_row_filter(filter)
1404 .build()
1405 .unwrap();
1406
1407 let batches: Vec<_> = stream.try_collect().await.unwrap();
1408 assert_eq!(batches.len(), 1);
1409
1410 let batch = &batches[0];
1411 assert_eq!(batch.num_rows(), 1);
1412 assert_eq!(batch.num_columns(), 2);
1413
1414 let col = batch.column(0);
1415 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1416 assert_eq!(val, "b");
1417
1418 let col = batch.column(1);
1419 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1420 assert_eq!(val, 3);
1421
1422 assert_eq!(requests.lock().unwrap().len(), 3);
1427 }
1428
1429 #[tokio::test]
1430 async fn test_limit_multiple_row_groups() {
1431 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1432 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1433 let c = Int32Array::from_iter(0..6);
1434 let data = RecordBatch::try_from_iter([
1435 ("a", Arc::new(a) as ArrayRef),
1436 ("b", Arc::new(b) as ArrayRef),
1437 ("c", Arc::new(c) as ArrayRef),
1438 ])
1439 .unwrap();
1440
1441 let mut buf = Vec::with_capacity(1024);
1442 let props = WriterProperties::builder()
1443 .set_max_row_group_size(3)
1444 .build();
1445 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1446 writer.write(&data).unwrap();
1447 writer.close().unwrap();
1448
1449 let data: Bytes = buf.into();
1450 let metadata = ParquetMetaDataReader::new()
1451 .parse_and_finish(&data)
1452 .unwrap();
1453
1454 assert_eq!(metadata.num_row_groups(), 2);
1455
1456 let test = TestReader::new(data);
1457
1458 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1459 .await
1460 .unwrap()
1461 .with_batch_size(1024)
1462 .with_limit(4)
1463 .build()
1464 .unwrap();
1465
1466 let batches: Vec<_> = stream.try_collect().await.unwrap();
1467 assert_eq!(batches.len(), 2);
1469
1470 let batch = &batches[0];
1471 assert_eq!(batch.num_rows(), 3);
1473 assert_eq!(batch.num_columns(), 3);
1474 let col2 = batch.column(2).as_primitive::<Int32Type>();
1475 assert_eq!(col2.values(), &[0, 1, 2]);
1476
1477 let batch = &batches[1];
1478 assert_eq!(batch.num_rows(), 1);
1480 assert_eq!(batch.num_columns(), 3);
1481 let col2 = batch.column(2).as_primitive::<Int32Type>();
1482 assert_eq!(col2.values(), &[3]);
1483
1484 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1485 .await
1486 .unwrap()
1487 .with_offset(2)
1488 .with_limit(3)
1489 .build()
1490 .unwrap();
1491
1492 let batches: Vec<_> = stream.try_collect().await.unwrap();
1493 assert_eq!(batches.len(), 2);
1495
1496 let batch = &batches[0];
1497 assert_eq!(batch.num_rows(), 1);
1499 assert_eq!(batch.num_columns(), 3);
1500 let col2 = batch.column(2).as_primitive::<Int32Type>();
1501 assert_eq!(col2.values(), &[2]);
1502
1503 let batch = &batches[1];
1504 assert_eq!(batch.num_rows(), 2);
1506 assert_eq!(batch.num_columns(), 3);
1507 let col2 = batch.column(2).as_primitive::<Int32Type>();
1508 assert_eq!(col2.values(), &[3, 4]);
1509
1510 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1511 .await
1512 .unwrap()
1513 .with_offset(4)
1514 .with_limit(20)
1515 .build()
1516 .unwrap();
1517
1518 let batches: Vec<_> = stream.try_collect().await.unwrap();
1519 assert_eq!(batches.len(), 1);
1521
1522 let batch = &batches[0];
1523 assert_eq!(batch.num_rows(), 2);
1525 assert_eq!(batch.num_columns(), 3);
1526 let col2 = batch.column(2).as_primitive::<Int32Type>();
1527 assert_eq!(col2.values(), &[4, 5]);
1528 }
1529
1530 #[tokio::test]
1531 async fn test_row_filter_with_index() {
1532 let testdata = arrow::util::test_util::parquet_test_data();
1533 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1534 let data = Bytes::from(std::fs::read(path).unwrap());
1535
1536 let metadata = ParquetMetaDataReader::new()
1537 .parse_and_finish(&data)
1538 .unwrap();
1539 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1540
1541 assert_eq!(metadata.num_row_groups(), 1);
1542
1543 let async_reader = TestReader::new(data.clone());
1544
1545 let a_filter =
1546 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1547 Ok(batch.column(0).as_boolean().clone())
1548 });
1549
1550 let b_scalar = Int8Array::from(vec![2]);
1551 let b_filter = ArrowPredicateFn::new(
1552 ProjectionMask::leaves(&parquet_schema, vec![2]),
1553 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1554 );
1555
1556 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1557
1558 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1559
1560 let options = ArrowReaderOptions::new().with_page_index(true);
1561 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1562 .await
1563 .unwrap()
1564 .with_projection(mask.clone())
1565 .with_batch_size(1024)
1566 .with_row_filter(filter)
1567 .build()
1568 .unwrap();
1569
1570 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1571
1572 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1573
1574 assert_eq!(total_rows, 730);
1575 }
1576
1577 #[tokio::test]
1578 async fn test_batch_size_overallocate() {
1579 let testdata = arrow::util::test_util::parquet_test_data();
1580 let path = format!("{testdata}/alltypes_plain.parquet");
1582 let data = Bytes::from(std::fs::read(path).unwrap());
1583
1584 let async_reader = TestReader::new(data.clone());
1585
1586 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1587 .await
1588 .unwrap();
1589
1590 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1591
1592 let builder = builder
1593 .with_projection(ProjectionMask::all())
1594 .with_batch_size(1024);
1595
1596 assert_ne!(1024, file_rows);
1599 assert_eq!(builder.batch_size, file_rows);
1600
1601 let _stream = builder.build().unwrap();
1602 }
1603
1604 #[tokio::test]
1605 async fn test_get_row_group_column_bloom_filter_without_length() {
1606 let testdata = arrow::util::test_util::parquet_test_data();
1607 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1608 let data = Bytes::from(std::fs::read(path).unwrap());
1609 test_get_row_group_column_bloom_filter(data, false).await;
1610 }
1611
1612 #[tokio::test]
1613 async fn test_parquet_record_batch_stream_schema() {
1614 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1615 schema.flattened_fields().iter().map(|f| f.name()).collect()
1616 }
1617
1618 let mut metadata = HashMap::with_capacity(1);
1627 metadata.insert("key".to_string(), "value".to_string());
1628
1629 let nested_struct_array = StructArray::from(vec![
1630 (
1631 Arc::new(Field::new("d", DataType::Utf8, true)),
1632 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1633 ),
1634 (
1635 Arc::new(Field::new("e", DataType::Utf8, true)),
1636 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1637 ),
1638 ]);
1639 let struct_array = StructArray::from(vec![
1640 (
1641 Arc::new(Field::new("a", DataType::Int32, true)),
1642 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1643 ),
1644 (
1645 Arc::new(Field::new("b", DataType::UInt64, true)),
1646 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1647 ),
1648 (
1649 Arc::new(Field::new(
1650 "c",
1651 nested_struct_array.data_type().clone(),
1652 true,
1653 )),
1654 Arc::new(nested_struct_array) as ArrayRef,
1655 ),
1656 ]);
1657
1658 let schema =
1659 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1660 let record_batch = RecordBatch::from(struct_array)
1661 .with_schema(schema.clone())
1662 .unwrap();
1663
1664 let mut file = tempfile().unwrap();
1666 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1667 writer.write(&record_batch).unwrap();
1668 writer.close().unwrap();
1669
1670 let all_fields = ["a", "b", "c", "d", "e"];
1671 let projections = [
1673 (vec![], vec![]),
1674 (vec![0], vec!["a"]),
1675 (vec![0, 1], vec!["a", "b"]),
1676 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1677 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1678 ];
1679
1680 for (indices, expected_projected_names) in projections {
1682 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1683 assert_eq!(get_all_field_names(&builder), all_fields);
1685 assert_eq!(builder.metadata, metadata);
1686 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1688 assert_eq!(reader.metadata, HashMap::default());
1689 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1690 assert_eq!(batch.metadata, HashMap::default());
1691 };
1692
1693 let builder =
1694 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1695 let sync_builder_schema = builder.schema().clone();
1696 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1697 let mut reader = builder.with_projection(mask).build().unwrap();
1698 let sync_reader_schema = reader.schema();
1699 let batch = reader.next().unwrap().unwrap();
1700 let sync_batch_schema = batch.schema();
1701 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1702
1703 let file = tokio::fs::File::from(file.try_clone().unwrap());
1705 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1706 let async_builder_schema = builder.schema().clone();
1707 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1708 let mut reader = builder.with_projection(mask).build().unwrap();
1709 let async_reader_schema = reader.schema().clone();
1710 let batch = reader.next().await.unwrap().unwrap();
1711 let async_batch_schema = batch.schema();
1712 assert_schemas(
1713 async_builder_schema,
1714 async_reader_schema,
1715 async_batch_schema,
1716 );
1717 }
1718 }
1719
1720 #[tokio::test]
1721 async fn test_get_row_group_column_bloom_filter_with_length() {
1722 let testdata = arrow::util::test_util::parquet_test_data();
1724 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1725 let data = Bytes::from(std::fs::read(path).unwrap());
1726 let async_reader = TestReader::new(data.clone());
1727 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1728 .await
1729 .unwrap();
1730 let schema = builder.schema().clone();
1731 let stream = builder.build().unwrap();
1732 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1733
1734 let mut parquet_data = Vec::new();
1735 let props = WriterProperties::builder()
1736 .set_bloom_filter_enabled(true)
1737 .build();
1738 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1739 for batch in batches {
1740 writer.write(&batch).unwrap();
1741 }
1742 writer.close().unwrap();
1743
1744 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1746 }
1747
1748 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1749 let async_reader = TestReader::new(data.clone());
1750
1751 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1752 .await
1753 .unwrap();
1754
1755 let metadata = builder.metadata();
1756 assert_eq!(metadata.num_row_groups(), 1);
1757 let row_group = metadata.row_group(0);
1758 let column = row_group.column(0);
1759 assert_eq!(column.bloom_filter_length().is_some(), with_length);
1760
1761 let sbbf = builder
1762 .get_row_group_column_bloom_filter(0, 0)
1763 .await
1764 .unwrap()
1765 .unwrap();
1766 assert!(sbbf.check(&"Hello"));
1767 assert!(!sbbf.check(&"Hello_Not_Exists"));
1768 }
1769
1770 #[tokio::test]
1771 async fn test_nested_skip() {
1772 let schema = Arc::new(Schema::new(vec![
1773 Field::new("col_1", DataType::UInt64, false),
1774 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1775 ]));
1776
1777 let props = WriterProperties::builder()
1779 .set_data_page_row_count_limit(256)
1780 .set_write_batch_size(256)
1781 .set_max_row_group_size(1024);
1782
1783 let mut file = tempfile().unwrap();
1785 let mut writer =
1786 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1787
1788 let mut builder = ListBuilder::new(StringBuilder::new());
1789 for id in 0..1024 {
1790 match id % 3 {
1791 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1792 1 => builder.append_value([Some(format!("id_{id}"))]),
1793 _ => builder.append_null(),
1794 }
1795 }
1796 let refs = vec![
1797 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1798 Arc::new(builder.finish()) as ArrayRef,
1799 ];
1800
1801 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1802 writer.write(&batch).unwrap();
1803 writer.close().unwrap();
1804
1805 let selections = [
1806 RowSelection::from(vec![
1807 RowSelector::skip(313),
1808 RowSelector::select(1),
1809 RowSelector::skip(709),
1810 RowSelector::select(1),
1811 ]),
1812 RowSelection::from(vec![
1813 RowSelector::skip(255),
1814 RowSelector::select(1),
1815 RowSelector::skip(767),
1816 RowSelector::select(1),
1817 ]),
1818 RowSelection::from(vec![
1819 RowSelector::select(255),
1820 RowSelector::skip(1),
1821 RowSelector::select(767),
1822 RowSelector::skip(1),
1823 ]),
1824 RowSelection::from(vec![
1825 RowSelector::skip(254),
1826 RowSelector::select(1),
1827 RowSelector::select(1),
1828 RowSelector::skip(767),
1829 RowSelector::select(1),
1830 ]),
1831 ];
1832
1833 for selection in selections {
1834 let expected = selection.row_count();
1835 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1837 tokio::fs::File::from_std(file.try_clone().unwrap()),
1838 ArrowReaderOptions::new().with_page_index(true),
1839 )
1840 .await
1841 .unwrap();
1842
1843 reader = reader.with_row_selection(selection);
1844
1845 let mut stream = reader.build().unwrap();
1846
1847 let mut total_rows = 0;
1848 while let Some(rb) = stream.next().await {
1849 let rb = rb.unwrap();
1850 total_rows += rb.num_rows();
1851 }
1852 assert_eq!(total_rows, expected);
1853 }
1854 }
1855
1856 #[tokio::test]
1857 async fn test_row_filter_nested() {
1858 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1859 let b = StructArray::from(vec![
1860 (
1861 Arc::new(Field::new("aa", DataType::Utf8, true)),
1862 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
1863 ),
1864 (
1865 Arc::new(Field::new("bb", DataType::Utf8, true)),
1866 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
1867 ),
1868 ]);
1869 let c = Int32Array::from_iter(0..6);
1870 let data = RecordBatch::try_from_iter([
1871 ("a", Arc::new(a) as ArrayRef),
1872 ("b", Arc::new(b) as ArrayRef),
1873 ("c", Arc::new(c) as ArrayRef),
1874 ])
1875 .unwrap();
1876
1877 let mut buf = Vec::with_capacity(1024);
1878 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1879 writer.write(&data).unwrap();
1880 writer.close().unwrap();
1881
1882 let data: Bytes = buf.into();
1883 let metadata = ParquetMetaDataReader::new()
1884 .parse_and_finish(&data)
1885 .unwrap();
1886 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1887
1888 let test = TestReader::new(data);
1889 let requests = test.requests.clone();
1890
1891 let a_scalar = StringArray::from_iter_values(["b"]);
1892 let a_filter = ArrowPredicateFn::new(
1893 ProjectionMask::leaves(&parquet_schema, vec![0]),
1894 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1895 );
1896
1897 let b_scalar = StringArray::from_iter_values(["4"]);
1898 let b_filter = ArrowPredicateFn::new(
1899 ProjectionMask::leaves(&parquet_schema, vec![2]),
1900 move |batch| {
1901 let struct_array = batch
1903 .column(0)
1904 .as_any()
1905 .downcast_ref::<StructArray>()
1906 .unwrap();
1907 eq(struct_array.column(0), &Scalar::new(&b_scalar))
1908 },
1909 );
1910
1911 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1912
1913 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
1914 let stream = ParquetRecordBatchStreamBuilder::new(test)
1915 .await
1916 .unwrap()
1917 .with_projection(mask.clone())
1918 .with_batch_size(1024)
1919 .with_row_filter(filter)
1920 .build()
1921 .unwrap();
1922
1923 let batches: Vec<_> = stream.try_collect().await.unwrap();
1924 assert_eq!(batches.len(), 1);
1925
1926 let batch = &batches[0];
1927 assert_eq!(batch.num_rows(), 1);
1928 assert_eq!(batch.num_columns(), 2);
1929
1930 let col = batch.column(0);
1931 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1932 assert_eq!(val, "b");
1933
1934 let col = batch.column(1);
1935 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1936 assert_eq!(val, 3);
1937
1938 assert_eq!(requests.lock().unwrap().len(), 3);
1943 }
1944
1945 #[tokio::test]
1946 #[allow(deprecated)]
1947 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1948 use tokio::fs::File;
1949 let testdata = arrow::util::test_util::parquet_test_data();
1950 let path = format!("{testdata}/alltypes_plain.parquet");
1951 let mut file = File::open(&path).await.unwrap();
1952 let file_size = file.metadata().await.unwrap().len();
1953 let mut metadata = ParquetMetaDataReader::new()
1954 .with_page_indexes(true)
1955 .load_and_finish(&mut file, file_size)
1956 .await
1957 .unwrap();
1958
1959 metadata.set_offset_index(Some(vec![]));
1960 let options = ArrowReaderOptions::new().with_page_index(true);
1961 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1962 let reader =
1963 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1964 .build()
1965 .unwrap();
1966
1967 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1968 assert_eq!(result.len(), 1);
1969 }
1970
1971 #[tokio::test]
1972 #[allow(deprecated)]
1973 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1974 use tokio::fs::File;
1975 let testdata = arrow::util::test_util::parquet_test_data();
1976 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1977 let mut file = File::open(&path).await.unwrap();
1978 let file_size = file.metadata().await.unwrap().len();
1979 let metadata = ParquetMetaDataReader::new()
1980 .with_page_indexes(true)
1981 .load_and_finish(&mut file, file_size)
1982 .await
1983 .unwrap();
1984
1985 let options = ArrowReaderOptions::new().with_page_index(true);
1986 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1987 let reader =
1988 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1989 .build()
1990 .unwrap();
1991
1992 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1993 assert_eq!(result.len(), 8);
1994 }
1995
1996 #[tokio::test]
1997 #[allow(deprecated)]
1998 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
1999 use tempfile::TempDir;
2000 use tokio::fs::File;
2001 fn write_metadata_to_local_file(
2002 metadata: ParquetMetaData,
2003 file: impl AsRef<std::path::Path>,
2004 ) {
2005 use crate::file::metadata::ParquetMetaDataWriter;
2006 use std::fs::File;
2007 let file = File::create(file).unwrap();
2008 ParquetMetaDataWriter::new(file, &metadata)
2009 .finish()
2010 .unwrap()
2011 }
2012
2013 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2014 use std::fs::File;
2015 let file = File::open(file).unwrap();
2016 ParquetMetaDataReader::new()
2017 .with_page_indexes(true)
2018 .parse_and_finish(&file)
2019 .unwrap()
2020 }
2021
2022 let testdata = arrow::util::test_util::parquet_test_data();
2023 let path = format!("{testdata}/alltypes_plain.parquet");
2024 let mut file = File::open(&path).await.unwrap();
2025 let file_size = file.metadata().await.unwrap().len();
2026 let metadata = ParquetMetaDataReader::new()
2027 .with_page_indexes(true)
2028 .load_and_finish(&mut file, file_size)
2029 .await
2030 .unwrap();
2031
2032 let tempdir = TempDir::new().unwrap();
2033 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2034 write_metadata_to_local_file(metadata, &metadata_path);
2035 let metadata = read_metadata_from_local_file(&metadata_path);
2036
2037 let options = ArrowReaderOptions::new().with_page_index(true);
2038 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2039 let reader =
2040 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2041 .build()
2042 .unwrap();
2043
2044 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2046 assert_eq!(result.len(), 1);
2047 }
2048
2049 #[tokio::test]
2050 async fn test_cached_array_reader_sparse_offset_error() {
2051 use futures::TryStreamExt;
2052
2053 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2054 use arrow_array::{BooleanArray, RecordBatch};
2055
2056 let testdata = arrow::util::test_util::parquet_test_data();
2057 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2058 let data = Bytes::from(std::fs::read(path).unwrap());
2059
2060 let async_reader = TestReader::new(data);
2061
2062 let options = ArrowReaderOptions::new().with_page_index(true);
2064 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2065 .await
2066 .unwrap();
2067
2068 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2072
2073 let parquet_schema = builder.parquet_schema();
2077 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2078 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2079 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2080 });
2081 let filter = RowFilter::new(vec![Box::new(always_true)]);
2082
2083 let stream = builder
2086 .with_batch_size(8)
2087 .with_projection(proj)
2088 .with_row_selection(selection)
2089 .with_row_filter(filter)
2090 .build()
2091 .unwrap();
2092
2093 let _result: Vec<_> = stream.try_collect().await.unwrap();
2096 }
2097
2098 #[tokio::test]
2099 async fn test_predicate_cache_disabled() {
2100 let k = Int32Array::from_iter_values(0..10);
2101 let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2102
2103 let mut buf = Vec::new();
2104 let props = WriterProperties::builder()
2106 .set_data_page_row_count_limit(1)
2107 .set_write_batch_size(1)
2108 .set_max_row_group_size(10)
2109 .set_write_page_header_statistics(true)
2110 .build();
2111 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2112 writer.write(&data).unwrap();
2113 writer.close().unwrap();
2114
2115 let data = Bytes::from(buf);
2116 let metadata = ParquetMetaDataReader::new()
2117 .with_page_index_policy(PageIndexPolicy::Required)
2118 .parse_and_finish(&data)
2119 .unwrap();
2120 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2121
2122 let build_filter = || {
2124 let scalar = Int32Array::from_iter_values([5]);
2125 let predicate = ArrowPredicateFn::new(
2126 ProjectionMask::leaves(&parquet_schema, vec![0]),
2127 move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2128 );
2129 RowFilter::new(vec![Box::new(predicate)])
2130 };
2131
2132 let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2134
2135 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2136 let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2137
2138 let reader_with_cache = TestReader::new(data.clone());
2140 let requests_with_cache = reader_with_cache.requests.clone();
2141 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2142 reader_with_cache,
2143 reader_metadata.clone(),
2144 )
2145 .with_batch_size(1000)
2146 .with_row_selection(selection.clone())
2147 .with_row_filter(build_filter())
2148 .build()
2149 .unwrap();
2150 let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2151
2152 let reader_without_cache = TestReader::new(data);
2154 let requests_without_cache = reader_without_cache.requests.clone();
2155 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2156 reader_without_cache,
2157 reader_metadata,
2158 )
2159 .with_batch_size(1000)
2160 .with_row_selection(selection)
2161 .with_row_filter(build_filter())
2162 .with_max_predicate_cache_size(0) .build()
2164 .unwrap();
2165 let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2166
2167 assert_eq!(batches_with_cache, batches_without_cache);
2168
2169 let requests_with_cache = requests_with_cache.lock().unwrap();
2170 let requests_without_cache = requests_without_cache.lock().unwrap();
2171
2172 assert_eq!(requests_with_cache.len(), 11);
2174 assert_eq!(requests_without_cache.len(), 2);
2175
2176 assert_eq!(
2178 requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2179 433
2180 );
2181 assert_eq!(
2182 requests_without_cache
2183 .iter()
2184 .map(|r| r.len())
2185 .sum::<usize>(),
2186 92
2187 );
2188 }
2189
2190 #[test]
2191 fn test_row_numbers_with_multiple_row_groups() {
2192 test_row_numbers_with_multiple_row_groups_helper(
2193 false,
2194 |path, selection, _row_filter, batch_size| {
2195 let runtime = tokio::runtime::Builder::new_current_thread()
2196 .enable_all()
2197 .build()
2198 .expect("Could not create runtime");
2199 runtime.block_on(async move {
2200 let file = tokio::fs::File::open(path).await.unwrap();
2201 let row_number_field = Arc::new(
2202 Field::new("row_number", DataType::Int64, false)
2203 .with_extension_type(RowNumber),
2204 );
2205 let options =
2206 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
2207 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2208 .await
2209 .unwrap()
2210 .with_row_selection(selection)
2211 .with_batch_size(batch_size)
2212 .build()
2213 .expect("Could not create reader");
2214 reader.try_collect::<Vec<_>>().await.unwrap()
2215 })
2216 },
2217 );
2218 }
2219
2220 #[test]
2221 fn test_row_numbers_with_multiple_row_groups_and_filter() {
2222 test_row_numbers_with_multiple_row_groups_helper(
2223 true,
2224 |path, selection, row_filter, batch_size| {
2225 let runtime = tokio::runtime::Builder::new_current_thread()
2226 .enable_all()
2227 .build()
2228 .expect("Could not create runtime");
2229 runtime.block_on(async move {
2230 let file = tokio::fs::File::open(path).await.unwrap();
2231 let row_number_field = Arc::new(
2232 Field::new("row_number", DataType::Int64, false)
2233 .with_extension_type(RowNumber),
2234 );
2235 let options =
2236 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
2237 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2238 .await
2239 .unwrap()
2240 .with_row_selection(selection)
2241 .with_row_filter(row_filter.expect("No row filter"))
2242 .with_batch_size(batch_size)
2243 .build()
2244 .expect("Could not create reader");
2245 reader.try_collect::<Vec<_>>().await.unwrap()
2246 })
2247 },
2248 );
2249 }
2250}