1use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61pub trait AsyncFileReader: Send {
75 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80 async move {
81 let mut result = Vec::with_capacity(ranges.len());
82
83 for range in ranges.into_iter() {
84 let data = self.get_bytes(range).await?;
85 result.push(data);
86 }
87
88 Ok(result)
89 }
90 .boxed()
91 }
92
93 fn get_metadata<'a>(
110 &'a mut self,
111 options: Option<&'a ArrowReaderOptions>,
112 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118 self.as_mut().get_bytes(range)
119 }
120
121 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122 self.as_mut().get_byte_ranges(ranges)
123 }
124
125 fn get_metadata<'a>(
126 &'a mut self,
127 options: Option<&'a ArrowReaderOptions>,
128 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129 self.as_mut().get_metadata(options)
130 }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135 async move {
136 self.seek(SeekFrom::End(-(suffix as i64))).await?;
137 let mut buf = Vec::with_capacity(suffix);
138 self.take(suffix as _).read_to_end(&mut buf).await?;
139 Ok(buf.into())
140 }
141 .boxed()
142 }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147 async move {
148 self.seek(SeekFrom::Start(range.start)).await?;
149
150 let to_read = range.end - range.start;
151 let mut buffer = Vec::with_capacity(to_read.try_into()?);
152 let read = self.take(to_read).read_to_end(&mut buffer).await?;
153 if read as u64 != to_read {
154 return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155 }
156
157 Ok(buffer.into())
158 }
159 .boxed()
160 }
161
162 fn get_metadata<'a>(
163 &'a mut self,
164 options: Option<&'a ArrowReaderOptions>,
165 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166 async move {
167 let metadata_opts = options.map(|o| o.metadata_options().clone());
168 let mut metadata_reader =
169 ParquetMetaDataReader::new().with_metadata_options(metadata_opts);
170
171 if let Some(opts) = options {
172 metadata_reader = metadata_reader
173 .with_column_index_policy(opts.column_index_policy())
174 .with_offset_index_policy(opts.offset_index_policy());
175 }
176
177 #[cfg(feature = "encryption")]
178 let metadata_reader = metadata_reader.with_decryption_properties(
179 options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
180 );
181
182 let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183 Ok(Arc::new(parquet_metadata))
184 }
185 .boxed()
186 }
187}
188
189impl ArrowReaderMetadata {
190 pub async fn load_async<T: AsyncFileReader>(
194 input: &mut T,
195 options: ArrowReaderOptions,
196 ) -> Result<Self> {
197 let metadata = input.get_metadata(Some(&options)).await?;
198 Self::try_new(metadata, options)
199 }
200}
201
202#[doc(hidden)]
203pub struct AsyncReader<T>(T);
208
209pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
222
223impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
224 pub async fn new(input: T) -> Result<Self> {
355 Self::new_with_options(input, Default::default()).await
356 }
357
358 pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
361 let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
362 Ok(Self::new_with_metadata(input, metadata))
363 }
364
365 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
411 Self::new_builder(AsyncReader(input), metadata)
412 }
413
414 pub async fn get_row_group_column_bloom_filter(
420 &mut self,
421 row_group_idx: usize,
422 column_idx: usize,
423 ) -> Result<Option<Sbbf>> {
424 let metadata = self.metadata.row_group(row_group_idx);
425 let column_metadata = metadata.column(column_idx);
426
427 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428 offset
429 .try_into()
430 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
431 } else {
432 return Ok(None);
433 };
434
435 let buffer = match column_metadata.bloom_filter_length() {
436 Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437 None => self
438 .input
439 .0
440 .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441 }
442 .await?;
443
444 let (header, bitset_offset) =
445 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446
447 match header.algorithm {
448 BloomFilterAlgorithm::BLOCK => {
449 }
451 }
452 match header.compression {
453 BloomFilterCompression::UNCOMPRESSED => {
454 }
456 }
457 match header.hash {
458 BloomFilterHash::XXHASH => {
459 }
461 }
462
463 let bitset = match column_metadata.bloom_filter_length() {
464 Some(_) => buffer.slice(
465 (TryInto::<usize>::try_into(bitset_offset).unwrap()
466 - TryInto::<usize>::try_into(offset).unwrap())..,
467 ),
468 None => {
469 let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
470 ParquetError::General("Bloom filter length is invalid".to_string())
471 })?;
472 self.input
473 .0
474 .get_bytes(bitset_offset..bitset_offset + bitset_length)
475 .await?
476 }
477 };
478 Ok(Some(Sbbf::new(&bitset)))
479 }
480
481 pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
485 let Self {
486 input,
487 metadata,
488 schema,
489 fields,
490 batch_size,
491 row_groups,
492 projection,
493 filter,
494 selection,
495 row_selection_policy: selection_strategy,
496 limit,
497 offset,
498 metrics,
499 max_predicate_cache_size,
500 } = self;
501
502 let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
505 let projected_fields = schema
506 .fields
507 .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
508 let projected_schema = Arc::new(Schema::new(projected_fields));
509
510 let decoder = ParquetPushDecoderBuilder {
511 input: NoInput,
512 metadata,
513 schema,
514 fields,
515 projection,
516 filter,
517 selection,
518 row_selection_policy: selection_strategy,
519 batch_size,
520 row_groups,
521 limit,
522 offset,
523 metrics,
524 max_predicate_cache_size,
525 }
526 .build()?;
527
528 let request_state = RequestState::None { input: input.0 };
529
530 Ok(ParquetRecordBatchStream {
531 schema: projected_schema,
532 decoder,
533 request_state,
534 })
535 }
536}
537
538enum RequestState<T> {
542 None {
544 input: T,
545 },
546 Outstanding {
548 ranges: Vec<Range<u64>>,
550 future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
555 },
556 Done,
557}
558
559impl<T> RequestState<T>
560where
561 T: AsyncFileReader + Unpin + Send + 'static,
562{
563 fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
565 let ranges_captured = ranges.clone();
566
567 let future = async move {
572 let data = input.get_byte_ranges(ranges_captured).await?;
573 Ok((input, data))
574 }
575 .boxed();
576 RequestState::Outstanding { ranges, future }
577 }
578}
579
580impl<T> std::fmt::Debug for RequestState<T> {
581 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582 match self {
583 RequestState::None { input: _ } => f
584 .debug_struct("RequestState::None")
585 .field("input", &"...")
586 .finish(),
587 RequestState::Outstanding { ranges, .. } => f
588 .debug_struct("RequestState::Outstanding")
589 .field("ranges", &ranges)
590 .finish(),
591 RequestState::Done => {
592 write!(f, "RequestState::Done")
593 }
594 }
595 }
596}
597
598pub struct ParquetRecordBatchStream<T> {
618 schema: SchemaRef,
620 request_state: RequestState<T>,
622 decoder: ParquetPushDecoder,
624}
625
626impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
627 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
628 f.debug_struct("ParquetRecordBatchStream")
629 .field("request_state", &self.request_state)
630 .finish()
631 }
632}
633
634impl<T> ParquetRecordBatchStream<T> {
635 pub fn schema(&self) -> &SchemaRef {
640 &self.schema
641 }
642}
643
644impl<T> ParquetRecordBatchStream<T>
645where
646 T: AsyncFileReader + Unpin + Send + 'static,
647{
648 pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
662 loop {
663 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
666 match request_state {
667 RequestState::None { input } => {
669 match self.decoder.try_next_reader()? {
670 DecodeResult::NeedsData(ranges) => {
671 self.request_state = RequestState::begin_request(input, ranges);
672 continue; }
674 DecodeResult::Data(reader) => {
675 self.request_state = RequestState::None { input };
676 return Ok(Some(reader));
677 }
678 DecodeResult::Finished => return Ok(None),
679 }
680 }
681 RequestState::Outstanding { ranges, future } => {
682 let (input, data) = future.await?;
683 self.decoder.push_ranges(ranges, data)?;
685 self.request_state = RequestState::None { input };
686 continue; }
688 RequestState::Done => {
689 self.request_state = RequestState::Done;
690 return Ok(None);
691 }
692 }
693 }
694 }
695}
696
697impl<T> Stream for ParquetRecordBatchStream<T>
698where
699 T: AsyncFileReader + Unpin + Send + 'static,
700{
701 type Item = Result<RecordBatch>;
702 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
703 match self.poll_next_inner(cx) {
704 Ok(res) => {
705 res.map(|res| Ok(res).transpose())
708 }
709 Err(e) => {
710 self.request_state = RequestState::Done;
711 Poll::Ready(Some(Err(e)))
712 }
713 }
714 }
715}
716
717impl<T> ParquetRecordBatchStream<T>
718where
719 T: AsyncFileReader + Unpin + Send + 'static,
720{
721 fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
726 loop {
727 let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
728 match request_state {
729 RequestState::None { input } => {
730 match self.decoder.try_decode()? {
732 DecodeResult::NeedsData(ranges) => {
733 self.request_state = RequestState::begin_request(input, ranges);
734 continue; }
736 DecodeResult::Data(batch) => {
737 self.request_state = RequestState::None { input };
738 return Ok(Poll::Ready(Some(batch)));
739 }
740 DecodeResult::Finished => {
741 self.request_state = RequestState::Done;
742 return Ok(Poll::Ready(None));
743 }
744 }
745 }
746 RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
747 Poll::Ready(result) => {
749 let (input, data) = result?;
750 self.decoder.push_ranges(ranges, data)?;
752 self.request_state = RequestState::None { input };
753 continue; }
755 Poll::Pending => {
756 self.request_state = RequestState::Outstanding { ranges, future };
757 return Ok(Poll::Pending);
758 }
759 },
760 RequestState::Done => {
761 self.request_state = RequestState::Done;
763 return Ok(Poll::Ready(None));
764 }
765 }
766 }
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use crate::arrow::arrow_reader::RowSelectionPolicy;
774 use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
775 use crate::arrow::arrow_reader::{
776 ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
777 };
778 use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
779 use crate::arrow::schema::virtual_type::RowNumber;
780 use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
781 use crate::file::metadata::PageIndexPolicy;
782 use crate::file::metadata::ParquetMetaDataReader;
783 use crate::file::properties::WriterProperties;
784 use arrow::compute::kernels::cmp::eq;
785 use arrow::compute::or;
786 use arrow::error::Result as ArrowResult;
787 use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
788 use arrow_array::cast::AsArray;
789 use arrow_array::types::Int32Type;
790 use arrow_array::{
791 Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader,
792 Scalar, StringArray, StructArray, UInt64Array,
793 };
794 use arrow_schema::{DataType, Field, Schema};
795 use arrow_select::concat::concat_batches;
796 use futures::{StreamExt, TryStreamExt};
797 use rand::{Rng, rng};
798 use std::collections::HashMap;
799 use std::sync::{Arc, Mutex};
800 use tempfile::tempfile;
801
802 #[derive(Clone)]
803 struct TestReader {
804 data: Bytes,
805 metadata: Option<Arc<ParquetMetaData>>,
806 requests: Arc<Mutex<Vec<Range<usize>>>>,
807 }
808
809 impl TestReader {
810 fn new(data: Bytes) -> Self {
811 Self {
812 data,
813 metadata: Default::default(),
814 requests: Default::default(),
815 }
816 }
817 }
818
819 impl AsyncFileReader for TestReader {
820 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
821 let range = range.clone();
822 self.requests
823 .lock()
824 .unwrap()
825 .push(range.start as usize..range.end as usize);
826 futures::future::ready(Ok(self
827 .data
828 .slice(range.start as usize..range.end as usize)))
829 .boxed()
830 }
831
832 fn get_metadata<'a>(
833 &'a mut self,
834 options: Option<&'a ArrowReaderOptions>,
835 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
836 let mut metadata_reader = ParquetMetaDataReader::new();
837 if let Some(opts) = options {
838 metadata_reader = metadata_reader
839 .with_column_index_policy(opts.column_index_policy())
840 .with_offset_index_policy(opts.offset_index_policy());
841 }
842 self.metadata = Some(Arc::new(
843 metadata_reader.parse_and_finish(&self.data).unwrap(),
844 ));
845 futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
846 }
847 }
848
849 #[tokio::test]
850 async fn test_async_reader() {
851 let testdata = arrow::util::test_util::parquet_test_data();
852 let path = format!("{testdata}/alltypes_plain.parquet");
853 let data = Bytes::from(std::fs::read(path).unwrap());
854
855 let async_reader = TestReader::new(data.clone());
856
857 let requests = async_reader.requests.clone();
858 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
859 .await
860 .unwrap();
861
862 let metadata = builder.metadata().clone();
863 assert_eq!(metadata.num_row_groups(), 1);
864
865 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
866 let stream = builder
867 .with_projection(mask.clone())
868 .with_batch_size(1024)
869 .build()
870 .unwrap();
871
872 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
873
874 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
875 .unwrap()
876 .with_projection(mask)
877 .with_batch_size(104)
878 .build()
879 .unwrap()
880 .collect::<ArrowResult<Vec<_>>>()
881 .unwrap();
882
883 assert_eq!(async_batches, sync_batches);
884
885 let requests = requests.lock().unwrap();
886 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
887 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
888
889 assert_eq!(
890 &requests[..],
891 &[
892 offset_1 as usize..(offset_1 + length_1) as usize,
893 offset_2 as usize..(offset_2 + length_2) as usize
894 ]
895 );
896 }
897
898 #[tokio::test]
899 async fn test_async_reader_with_next_row_group() {
900 let testdata = arrow::util::test_util::parquet_test_data();
901 let path = format!("{testdata}/alltypes_plain.parquet");
902 let data = Bytes::from(std::fs::read(path).unwrap());
903
904 let async_reader = TestReader::new(data.clone());
905
906 let requests = async_reader.requests.clone();
907 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
908 .await
909 .unwrap();
910
911 let metadata = builder.metadata().clone();
912 assert_eq!(metadata.num_row_groups(), 1);
913
914 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
915 let mut stream = builder
916 .with_projection(mask.clone())
917 .with_batch_size(1024)
918 .build()
919 .unwrap();
920
921 let mut readers = vec![];
922 while let Some(reader) = stream.next_row_group().await.unwrap() {
923 readers.push(reader);
924 }
925
926 let async_batches: Vec<_> = readers
927 .into_iter()
928 .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
929 .collect();
930
931 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
932 .unwrap()
933 .with_projection(mask)
934 .with_batch_size(104)
935 .build()
936 .unwrap()
937 .collect::<ArrowResult<Vec<_>>>()
938 .unwrap();
939
940 assert_eq!(async_batches, sync_batches);
941
942 let requests = requests.lock().unwrap();
943 let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
944 let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
945
946 assert_eq!(
947 &requests[..],
948 &[
949 offset_1 as usize..(offset_1 + length_1) as usize,
950 offset_2 as usize..(offset_2 + length_2) as usize
951 ]
952 );
953 }
954
955 #[tokio::test]
956 async fn test_async_reader_with_index() {
957 let testdata = arrow::util::test_util::parquet_test_data();
958 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
959 let data = Bytes::from(std::fs::read(path).unwrap());
960
961 let async_reader = TestReader::new(data.clone());
962
963 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
964 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
965 .await
966 .unwrap();
967
968 let metadata_with_index = builder.metadata();
970 assert_eq!(metadata_with_index.num_row_groups(), 1);
971
972 let offset_index = metadata_with_index.offset_index().unwrap();
974 let column_index = metadata_with_index.column_index().unwrap();
975
976 assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
977 assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
978
979 let num_columns = metadata_with_index
980 .file_metadata()
981 .schema_descr()
982 .num_columns();
983
984 offset_index
986 .iter()
987 .for_each(|x| assert_eq!(x.len(), num_columns));
988 column_index
989 .iter()
990 .for_each(|x| assert_eq!(x.len(), num_columns));
991
992 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
993 let stream = builder
994 .with_projection(mask.clone())
995 .with_batch_size(1024)
996 .build()
997 .unwrap();
998
999 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1000
1001 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1002 .unwrap()
1003 .with_projection(mask)
1004 .with_batch_size(1024)
1005 .build()
1006 .unwrap()
1007 .collect::<ArrowResult<Vec<_>>>()
1008 .unwrap();
1009
1010 assert_eq!(async_batches, sync_batches);
1011 }
1012
1013 #[tokio::test]
1014 async fn test_async_reader_with_limit() {
1015 let testdata = arrow::util::test_util::parquet_test_data();
1016 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1017 let data = Bytes::from(std::fs::read(path).unwrap());
1018
1019 let metadata = ParquetMetaDataReader::new()
1020 .parse_and_finish(&data)
1021 .unwrap();
1022 let metadata = Arc::new(metadata);
1023
1024 assert_eq!(metadata.num_row_groups(), 1);
1025
1026 let async_reader = TestReader::new(data.clone());
1027
1028 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1029 .await
1030 .unwrap();
1031
1032 assert_eq!(builder.metadata().num_row_groups(), 1);
1033
1034 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1035 let stream = builder
1036 .with_projection(mask.clone())
1037 .with_batch_size(1024)
1038 .with_limit(1)
1039 .build()
1040 .unwrap();
1041
1042 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1043
1044 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1045 .unwrap()
1046 .with_projection(mask)
1047 .with_batch_size(1024)
1048 .with_limit(1)
1049 .build()
1050 .unwrap()
1051 .collect::<ArrowResult<Vec<_>>>()
1052 .unwrap();
1053
1054 assert_eq!(async_batches, sync_batches);
1055 }
1056
1057 #[tokio::test]
1058 async fn test_async_reader_skip_pages() {
1059 let testdata = arrow::util::test_util::parquet_test_data();
1060 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1061 let data = Bytes::from(std::fs::read(path).unwrap());
1062
1063 let async_reader = TestReader::new(data.clone());
1064
1065 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1066 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1067 .await
1068 .unwrap();
1069
1070 assert_eq!(builder.metadata().num_row_groups(), 1);
1071
1072 let selection = RowSelection::from(vec![
1073 RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
1082
1083 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1084
1085 let stream = builder
1086 .with_projection(mask.clone())
1087 .with_row_selection(selection.clone())
1088 .build()
1089 .expect("building stream");
1090
1091 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1092
1093 let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1094 .unwrap()
1095 .with_projection(mask)
1096 .with_batch_size(1024)
1097 .with_row_selection(selection)
1098 .build()
1099 .unwrap()
1100 .collect::<ArrowResult<Vec<_>>>()
1101 .unwrap();
1102
1103 assert_eq!(async_batches, sync_batches);
1104 }
1105
1106 #[tokio::test]
1107 async fn test_fuzz_async_reader_selection() {
1108 let testdata = arrow::util::test_util::parquet_test_data();
1109 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1110 let data = Bytes::from(std::fs::read(path).unwrap());
1111
1112 let mut rand = rng();
1113
1114 for _ in 0..100 {
1115 let mut expected_rows = 0;
1116 let mut total_rows = 0;
1117 let mut skip = false;
1118 let mut selectors = vec![];
1119
1120 while total_rows < 7300 {
1121 let row_count: usize = rand.random_range(1..100);
1122
1123 let row_count = row_count.min(7300 - total_rows);
1124
1125 selectors.push(RowSelector { row_count, skip });
1126
1127 total_rows += row_count;
1128 if !skip {
1129 expected_rows += row_count;
1130 }
1131
1132 skip = !skip;
1133 }
1134
1135 let selection = RowSelection::from(selectors);
1136
1137 let async_reader = TestReader::new(data.clone());
1138
1139 let options =
1140 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1141 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1142 .await
1143 .unwrap();
1144
1145 assert_eq!(builder.metadata().num_row_groups(), 1);
1146
1147 let col_idx: usize = rand.random_range(0..13);
1148 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1149
1150 let stream = builder
1151 .with_projection(mask.clone())
1152 .with_row_selection(selection.clone())
1153 .build()
1154 .expect("building stream");
1155
1156 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1157
1158 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1159
1160 assert_eq!(actual_rows, expected_rows);
1161 }
1162 }
1163
1164 #[tokio::test]
1165 async fn test_async_reader_zero_row_selector() {
1166 let testdata = arrow::util::test_util::parquet_test_data();
1168 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1169 let data = Bytes::from(std::fs::read(path).unwrap());
1170
1171 let mut rand = rng();
1172
1173 let mut expected_rows = 0;
1174 let mut total_rows = 0;
1175 let mut skip = false;
1176 let mut selectors = vec![];
1177
1178 selectors.push(RowSelector {
1179 row_count: 0,
1180 skip: false,
1181 });
1182
1183 while total_rows < 7300 {
1184 let row_count: usize = rand.random_range(1..100);
1185
1186 let row_count = row_count.min(7300 - total_rows);
1187
1188 selectors.push(RowSelector { row_count, skip });
1189
1190 total_rows += row_count;
1191 if !skip {
1192 expected_rows += row_count;
1193 }
1194
1195 skip = !skip;
1196 }
1197
1198 let selection = RowSelection::from(selectors);
1199
1200 let async_reader = TestReader::new(data.clone());
1201
1202 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1203 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1204 .await
1205 .unwrap();
1206
1207 assert_eq!(builder.metadata().num_row_groups(), 1);
1208
1209 let col_idx: usize = rand.random_range(0..13);
1210 let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1211
1212 let stream = builder
1213 .with_projection(mask.clone())
1214 .with_row_selection(selection.clone())
1215 .build()
1216 .expect("building stream");
1217
1218 let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1219
1220 let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1221
1222 assert_eq!(actual_rows, expected_rows);
1223 }
1224
1225 #[tokio::test]
1226 async fn test_row_filter_full_page_skip_is_handled_async() {
1227 let first_value: i64 = 1111;
1228 let last_value: i64 = 9999;
1229 let num_rows: usize = 12;
1230
1231 let schema = Arc::new(Schema::new(vec![
1235 Field::new("key", DataType::Int64, false),
1236 Field::new("value", DataType::Int64, false),
1237 ]));
1238
1239 let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
1240 int_values[0] = first_value;
1241 int_values[num_rows - 1] = last_value;
1242 let keys = Int64Array::from(int_values.clone());
1243 let values = Int64Array::from(int_values.clone());
1244 let batch = RecordBatch::try_new(
1245 Arc::clone(&schema),
1246 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
1247 )
1248 .unwrap();
1249
1250 let props = WriterProperties::builder()
1251 .set_write_batch_size(2)
1252 .set_data_page_row_count_limit(2)
1253 .build();
1254
1255 let mut buffer = Vec::new();
1256 let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
1257 writer.write(&batch).unwrap();
1258 writer.close().unwrap();
1259 let data = Bytes::from(buffer);
1260
1261 let builder = ParquetRecordBatchStreamBuilder::new_with_options(
1262 TestReader::new(data.clone()),
1263 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1264 )
1265 .await
1266 .unwrap();
1267 let schema = builder.parquet_schema().clone();
1268 let filter_mask = ProjectionMask::leaves(&schema, [0]);
1269
1270 let make_predicate = |mask: ProjectionMask| {
1271 ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
1272 let column = batch.column(0);
1273 let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
1274 let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
1275 or(&match_first, &match_second)
1276 })
1277 };
1278
1279 let predicate = make_predicate(filter_mask.clone());
1280
1281 let stream = ParquetRecordBatchStreamBuilder::new_with_options(
1284 TestReader::new(data.clone()),
1285 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1286 )
1287 .await
1288 .unwrap()
1289 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1290 .with_batch_size(12)
1291 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
1292 .build()
1293 .unwrap();
1294
1295 let schema = stream.schema().clone();
1296 let batches: Vec<_> = stream.try_collect().await.unwrap();
1297 let result = concat_batches(&schema, &batches).unwrap();
1298 assert_eq!(result.num_rows(), 2);
1299 }
1300
1301 #[tokio::test]
1302 async fn test_row_filter() {
1303 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1304 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1305 let data = RecordBatch::try_from_iter([
1306 ("a", Arc::new(a) as ArrayRef),
1307 ("b", Arc::new(b) as ArrayRef),
1308 ])
1309 .unwrap();
1310
1311 let mut buf = Vec::with_capacity(1024);
1312 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1313 writer.write(&data).unwrap();
1314 writer.close().unwrap();
1315
1316 let data: Bytes = buf.into();
1317 let metadata = ParquetMetaDataReader::new()
1318 .parse_and_finish(&data)
1319 .unwrap();
1320 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1321
1322 let test = TestReader::new(data);
1323 let requests = test.requests.clone();
1324
1325 let a_scalar = StringArray::from_iter_values(["b"]);
1326 let a_filter = ArrowPredicateFn::new(
1327 ProjectionMask::leaves(&parquet_schema, vec![0]),
1328 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1329 );
1330
1331 let filter = RowFilter::new(vec![Box::new(a_filter)]);
1332
1333 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1334 let stream = ParquetRecordBatchStreamBuilder::new(test)
1335 .await
1336 .unwrap()
1337 .with_projection(mask.clone())
1338 .with_batch_size(1024)
1339 .with_row_filter(filter)
1340 .build()
1341 .unwrap();
1342
1343 let batches: Vec<_> = stream.try_collect().await.unwrap();
1344 assert_eq!(batches.len(), 1);
1345
1346 let batch = &batches[0];
1347 assert_eq!(batch.num_columns(), 2);
1348
1349 assert_eq!(
1351 batch.column(0).as_ref(),
1352 &StringArray::from_iter_values(["b", "b", "b"])
1353 );
1354 assert_eq!(
1355 batch.column(1).as_ref(),
1356 &StringArray::from_iter_values(["2", "3", "4"])
1357 );
1358
1359 assert_eq!(requests.lock().unwrap().len(), 2);
1363 }
1364
1365 #[tokio::test]
1366 async fn test_two_row_filters() {
1367 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1368 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1369 let c = Int32Array::from_iter(0..6);
1370 let data = RecordBatch::try_from_iter([
1371 ("a", Arc::new(a) as ArrayRef),
1372 ("b", Arc::new(b) as ArrayRef),
1373 ("c", Arc::new(c) as ArrayRef),
1374 ])
1375 .unwrap();
1376
1377 let mut buf = Vec::with_capacity(1024);
1378 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1379 writer.write(&data).unwrap();
1380 writer.close().unwrap();
1381
1382 let data: Bytes = buf.into();
1383 let metadata = ParquetMetaDataReader::new()
1384 .parse_and_finish(&data)
1385 .unwrap();
1386 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1387
1388 let test = TestReader::new(data);
1389 let requests = test.requests.clone();
1390
1391 let a_scalar = StringArray::from_iter_values(["b"]);
1392 let a_filter = ArrowPredicateFn::new(
1393 ProjectionMask::leaves(&parquet_schema, vec![0]),
1394 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1395 );
1396
1397 let b_scalar = StringArray::from_iter_values(["4"]);
1398 let b_filter = ArrowPredicateFn::new(
1399 ProjectionMask::leaves(&parquet_schema, vec![1]),
1400 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1401 );
1402
1403 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1404
1405 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1406 let stream = ParquetRecordBatchStreamBuilder::new(test)
1407 .await
1408 .unwrap()
1409 .with_projection(mask.clone())
1410 .with_batch_size(1024)
1411 .with_row_filter(filter)
1412 .build()
1413 .unwrap();
1414
1415 let batches: Vec<_> = stream.try_collect().await.unwrap();
1416 assert_eq!(batches.len(), 1);
1417
1418 let batch = &batches[0];
1419 assert_eq!(batch.num_rows(), 1);
1420 assert_eq!(batch.num_columns(), 2);
1421
1422 let col = batch.column(0);
1423 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1424 assert_eq!(val, "b");
1425
1426 let col = batch.column(1);
1427 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1428 assert_eq!(val, 3);
1429
1430 assert_eq!(requests.lock().unwrap().len(), 3);
1435 }
1436
1437 #[tokio::test]
1438 async fn test_limit_multiple_row_groups() {
1439 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1440 let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1441 let c = Int32Array::from_iter(0..6);
1442 let data = RecordBatch::try_from_iter([
1443 ("a", Arc::new(a) as ArrayRef),
1444 ("b", Arc::new(b) as ArrayRef),
1445 ("c", Arc::new(c) as ArrayRef),
1446 ])
1447 .unwrap();
1448
1449 let mut buf = Vec::with_capacity(1024);
1450 let props = WriterProperties::builder()
1451 .set_max_row_group_size(3)
1452 .build();
1453 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1454 writer.write(&data).unwrap();
1455 writer.close().unwrap();
1456
1457 let data: Bytes = buf.into();
1458 let metadata = ParquetMetaDataReader::new()
1459 .parse_and_finish(&data)
1460 .unwrap();
1461
1462 assert_eq!(metadata.num_row_groups(), 2);
1463
1464 let test = TestReader::new(data);
1465
1466 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1467 .await
1468 .unwrap()
1469 .with_batch_size(1024)
1470 .with_limit(4)
1471 .build()
1472 .unwrap();
1473
1474 let batches: Vec<_> = stream.try_collect().await.unwrap();
1475 assert_eq!(batches.len(), 2);
1477
1478 let batch = &batches[0];
1479 assert_eq!(batch.num_rows(), 3);
1481 assert_eq!(batch.num_columns(), 3);
1482 let col2 = batch.column(2).as_primitive::<Int32Type>();
1483 assert_eq!(col2.values(), &[0, 1, 2]);
1484
1485 let batch = &batches[1];
1486 assert_eq!(batch.num_rows(), 1);
1488 assert_eq!(batch.num_columns(), 3);
1489 let col2 = batch.column(2).as_primitive::<Int32Type>();
1490 assert_eq!(col2.values(), &[3]);
1491
1492 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1493 .await
1494 .unwrap()
1495 .with_offset(2)
1496 .with_limit(3)
1497 .build()
1498 .unwrap();
1499
1500 let batches: Vec<_> = stream.try_collect().await.unwrap();
1501 assert_eq!(batches.len(), 2);
1503
1504 let batch = &batches[0];
1505 assert_eq!(batch.num_rows(), 1);
1507 assert_eq!(batch.num_columns(), 3);
1508 let col2 = batch.column(2).as_primitive::<Int32Type>();
1509 assert_eq!(col2.values(), &[2]);
1510
1511 let batch = &batches[1];
1512 assert_eq!(batch.num_rows(), 2);
1514 assert_eq!(batch.num_columns(), 3);
1515 let col2 = batch.column(2).as_primitive::<Int32Type>();
1516 assert_eq!(col2.values(), &[3, 4]);
1517
1518 let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1519 .await
1520 .unwrap()
1521 .with_offset(4)
1522 .with_limit(20)
1523 .build()
1524 .unwrap();
1525
1526 let batches: Vec<_> = stream.try_collect().await.unwrap();
1527 assert_eq!(batches.len(), 1);
1529
1530 let batch = &batches[0];
1531 assert_eq!(batch.num_rows(), 2);
1533 assert_eq!(batch.num_columns(), 3);
1534 let col2 = batch.column(2).as_primitive::<Int32Type>();
1535 assert_eq!(col2.values(), &[4, 5]);
1536 }
1537
1538 #[tokio::test]
1539 async fn test_row_filter_with_index() {
1540 let testdata = arrow::util::test_util::parquet_test_data();
1541 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1542 let data = Bytes::from(std::fs::read(path).unwrap());
1543
1544 let metadata = ParquetMetaDataReader::new()
1545 .parse_and_finish(&data)
1546 .unwrap();
1547 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1548
1549 assert_eq!(metadata.num_row_groups(), 1);
1550
1551 let async_reader = TestReader::new(data.clone());
1552
1553 let a_filter =
1554 ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
1555 Ok(batch.column(0).as_boolean().clone())
1556 });
1557
1558 let b_scalar = Int8Array::from(vec![2]);
1559 let b_filter = ArrowPredicateFn::new(
1560 ProjectionMask::leaves(&parquet_schema, vec![2]),
1561 move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
1562 );
1563
1564 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1565
1566 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
1567
1568 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1569 let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1570 .await
1571 .unwrap()
1572 .with_projection(mask.clone())
1573 .with_batch_size(1024)
1574 .with_row_filter(filter)
1575 .build()
1576 .unwrap();
1577
1578 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1579
1580 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1581
1582 assert_eq!(total_rows, 730);
1583 }
1584
1585 #[tokio::test]
1586 async fn test_batch_size_overallocate() {
1587 let testdata = arrow::util::test_util::parquet_test_data();
1588 let path = format!("{testdata}/alltypes_plain.parquet");
1590 let data = Bytes::from(std::fs::read(path).unwrap());
1591
1592 let async_reader = TestReader::new(data.clone());
1593
1594 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1595 .await
1596 .unwrap();
1597
1598 let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1599
1600 let builder = builder
1601 .with_projection(ProjectionMask::all())
1602 .with_batch_size(1024);
1603
1604 assert_ne!(1024, file_rows);
1607 assert_eq!(builder.batch_size, file_rows);
1608
1609 let _stream = builder.build().unwrap();
1610 }
1611
1612 #[tokio::test]
1613 async fn test_get_row_group_column_bloom_filter_without_length() {
1614 let testdata = arrow::util::test_util::parquet_test_data();
1615 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1616 let data = Bytes::from(std::fs::read(path).unwrap());
1617 test_get_row_group_column_bloom_filter(data, false).await;
1618 }
1619
1620 #[tokio::test]
1621 async fn test_parquet_record_batch_stream_schema() {
1622 fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1623 schema.flattened_fields().iter().map(|f| f.name()).collect()
1624 }
1625
1626 let mut metadata = HashMap::with_capacity(1);
1635 metadata.insert("key".to_string(), "value".to_string());
1636
1637 let nested_struct_array = StructArray::from(vec![
1638 (
1639 Arc::new(Field::new("d", DataType::Utf8, true)),
1640 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1641 ),
1642 (
1643 Arc::new(Field::new("e", DataType::Utf8, true)),
1644 Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1645 ),
1646 ]);
1647 let struct_array = StructArray::from(vec![
1648 (
1649 Arc::new(Field::new("a", DataType::Int32, true)),
1650 Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1651 ),
1652 (
1653 Arc::new(Field::new("b", DataType::UInt64, true)),
1654 Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1655 ),
1656 (
1657 Arc::new(Field::new(
1658 "c",
1659 nested_struct_array.data_type().clone(),
1660 true,
1661 )),
1662 Arc::new(nested_struct_array) as ArrayRef,
1663 ),
1664 ]);
1665
1666 let schema =
1667 Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1668 let record_batch = RecordBatch::from(struct_array)
1669 .with_schema(schema.clone())
1670 .unwrap();
1671
1672 let mut file = tempfile().unwrap();
1674 let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1675 writer.write(&record_batch).unwrap();
1676 writer.close().unwrap();
1677
1678 let all_fields = ["a", "b", "c", "d", "e"];
1679 let projections = [
1681 (vec![], vec![]),
1682 (vec![0], vec!["a"]),
1683 (vec![0, 1], vec!["a", "b"]),
1684 (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1685 (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1686 ];
1687
1688 for (indices, expected_projected_names) in projections {
1690 let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1691 assert_eq!(get_all_field_names(&builder), all_fields);
1693 assert_eq!(builder.metadata, metadata);
1694 assert_eq!(get_all_field_names(&reader), expected_projected_names);
1696 assert_eq!(reader.metadata, HashMap::default());
1697 assert_eq!(get_all_field_names(&batch), expected_projected_names);
1698 assert_eq!(batch.metadata, HashMap::default());
1699 };
1700
1701 let builder =
1702 ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1703 let sync_builder_schema = builder.schema().clone();
1704 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1705 let mut reader = builder.with_projection(mask).build().unwrap();
1706 let sync_reader_schema = reader.schema();
1707 let batch = reader.next().unwrap().unwrap();
1708 let sync_batch_schema = batch.schema();
1709 assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1710
1711 let file = tokio::fs::File::from(file.try_clone().unwrap());
1713 let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1714 let async_builder_schema = builder.schema().clone();
1715 let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1716 let mut reader = builder.with_projection(mask).build().unwrap();
1717 let async_reader_schema = reader.schema().clone();
1718 let batch = reader.next().await.unwrap().unwrap();
1719 let async_batch_schema = batch.schema();
1720 assert_schemas(
1721 async_builder_schema,
1722 async_reader_schema,
1723 async_batch_schema,
1724 );
1725 }
1726 }
1727
1728 #[tokio::test]
1729 async fn test_get_row_group_column_bloom_filter_with_length() {
1730 let testdata = arrow::util::test_util::parquet_test_data();
1732 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1733 let data = Bytes::from(std::fs::read(path).unwrap());
1734 let async_reader = TestReader::new(data.clone());
1735 let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1736 .await
1737 .unwrap();
1738 let schema = builder.schema().clone();
1739 let stream = builder.build().unwrap();
1740 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1741
1742 let mut parquet_data = Vec::new();
1743 let props = WriterProperties::builder()
1744 .set_bloom_filter_enabled(true)
1745 .build();
1746 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
1747 for batch in batches {
1748 writer.write(&batch).unwrap();
1749 }
1750 writer.close().unwrap();
1751
1752 test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
1754 }
1755
1756 async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
1757 let async_reader = TestReader::new(data.clone());
1758
1759 let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1760 .await
1761 .unwrap();
1762
1763 let metadata = builder.metadata();
1764 assert_eq!(metadata.num_row_groups(), 1);
1765 let row_group = metadata.row_group(0);
1766 let column = row_group.column(0);
1767 assert_eq!(column.bloom_filter_length().is_some(), with_length);
1768
1769 let sbbf = builder
1770 .get_row_group_column_bloom_filter(0, 0)
1771 .await
1772 .unwrap()
1773 .unwrap();
1774 assert!(sbbf.check(&"Hello"));
1775 assert!(!sbbf.check(&"Hello_Not_Exists"));
1776 }
1777
1778 #[tokio::test]
1779 async fn test_nested_skip() {
1780 let schema = Arc::new(Schema::new(vec![
1781 Field::new("col_1", DataType::UInt64, false),
1782 Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1783 ]));
1784
1785 let props = WriterProperties::builder()
1787 .set_data_page_row_count_limit(256)
1788 .set_write_batch_size(256)
1789 .set_max_row_group_size(1024);
1790
1791 let mut file = tempfile().unwrap();
1793 let mut writer =
1794 ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1795
1796 let mut builder = ListBuilder::new(StringBuilder::new());
1797 for id in 0..1024 {
1798 match id % 3 {
1799 0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1800 1 => builder.append_value([Some(format!("id_{id}"))]),
1801 _ => builder.append_null(),
1802 }
1803 }
1804 let refs = vec![
1805 Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1806 Arc::new(builder.finish()) as ArrayRef,
1807 ];
1808
1809 let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1810 writer.write(&batch).unwrap();
1811 writer.close().unwrap();
1812
1813 let selections = [
1814 RowSelection::from(vec![
1815 RowSelector::skip(313),
1816 RowSelector::select(1),
1817 RowSelector::skip(709),
1818 RowSelector::select(1),
1819 ]),
1820 RowSelection::from(vec![
1821 RowSelector::skip(255),
1822 RowSelector::select(1),
1823 RowSelector::skip(767),
1824 RowSelector::select(1),
1825 ]),
1826 RowSelection::from(vec![
1827 RowSelector::select(255),
1828 RowSelector::skip(1),
1829 RowSelector::select(767),
1830 RowSelector::skip(1),
1831 ]),
1832 RowSelection::from(vec![
1833 RowSelector::skip(254),
1834 RowSelector::select(1),
1835 RowSelector::select(1),
1836 RowSelector::skip(767),
1837 RowSelector::select(1),
1838 ]),
1839 ];
1840
1841 for selection in selections {
1842 let expected = selection.row_count();
1843 let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1845 tokio::fs::File::from_std(file.try_clone().unwrap()),
1846 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1847 )
1848 .await
1849 .unwrap();
1850
1851 reader = reader.with_row_selection(selection);
1852
1853 let mut stream = reader.build().unwrap();
1854
1855 let mut total_rows = 0;
1856 while let Some(rb) = stream.next().await {
1857 let rb = rb.unwrap();
1858 total_rows += rb.num_rows();
1859 }
1860 assert_eq!(total_rows, expected);
1861 }
1862 }
1863
1864 #[tokio::test]
1865 async fn test_row_filter_nested() {
1866 let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1867 let b = StructArray::from(vec![
1868 (
1869 Arc::new(Field::new("aa", DataType::Utf8, true)),
1870 Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
1871 ),
1872 (
1873 Arc::new(Field::new("bb", DataType::Utf8, true)),
1874 Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
1875 ),
1876 ]);
1877 let c = Int32Array::from_iter(0..6);
1878 let data = RecordBatch::try_from_iter([
1879 ("a", Arc::new(a) as ArrayRef),
1880 ("b", Arc::new(b) as ArrayRef),
1881 ("c", Arc::new(c) as ArrayRef),
1882 ])
1883 .unwrap();
1884
1885 let mut buf = Vec::with_capacity(1024);
1886 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1887 writer.write(&data).unwrap();
1888 writer.close().unwrap();
1889
1890 let data: Bytes = buf.into();
1891 let metadata = ParquetMetaDataReader::new()
1892 .parse_and_finish(&data)
1893 .unwrap();
1894 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1895
1896 let test = TestReader::new(data);
1897 let requests = test.requests.clone();
1898
1899 let a_scalar = StringArray::from_iter_values(["b"]);
1900 let a_filter = ArrowPredicateFn::new(
1901 ProjectionMask::leaves(&parquet_schema, vec![0]),
1902 move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1903 );
1904
1905 let b_scalar = StringArray::from_iter_values(["4"]);
1906 let b_filter = ArrowPredicateFn::new(
1907 ProjectionMask::leaves(&parquet_schema, vec![2]),
1908 move |batch| {
1909 let struct_array = batch
1911 .column(0)
1912 .as_any()
1913 .downcast_ref::<StructArray>()
1914 .unwrap();
1915 eq(struct_array.column(0), &Scalar::new(&b_scalar))
1916 },
1917 );
1918
1919 let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
1920
1921 let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
1922 let stream = ParquetRecordBatchStreamBuilder::new(test)
1923 .await
1924 .unwrap()
1925 .with_projection(mask.clone())
1926 .with_batch_size(1024)
1927 .with_row_filter(filter)
1928 .build()
1929 .unwrap();
1930
1931 let batches: Vec<_> = stream.try_collect().await.unwrap();
1932 assert_eq!(batches.len(), 1);
1933
1934 let batch = &batches[0];
1935 assert_eq!(batch.num_rows(), 1);
1936 assert_eq!(batch.num_columns(), 2);
1937
1938 let col = batch.column(0);
1939 let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
1940 assert_eq!(val, "b");
1941
1942 let col = batch.column(1);
1943 let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1944 assert_eq!(val, 3);
1945
1946 assert_eq!(requests.lock().unwrap().len(), 3);
1951 }
1952
1953 #[tokio::test]
1954 #[allow(deprecated)]
1955 async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1956 use tokio::fs::File;
1957 let testdata = arrow::util::test_util::parquet_test_data();
1958 let path = format!("{testdata}/alltypes_plain.parquet");
1959 let mut file = File::open(&path).await.unwrap();
1960 let file_size = file.metadata().await.unwrap().len();
1961 let mut metadata = ParquetMetaDataReader::new()
1962 .with_page_indexes(true)
1963 .load_and_finish(&mut file, file_size)
1964 .await
1965 .unwrap();
1966
1967 metadata.set_offset_index(Some(vec![]));
1968 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1969 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1970 let reader =
1971 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1972 .build()
1973 .unwrap();
1974
1975 let result = reader.try_collect::<Vec<_>>().await.unwrap();
1976 assert_eq!(result.len(), 1);
1977 }
1978
1979 #[tokio::test]
1980 #[allow(deprecated)]
1981 async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1982 use tokio::fs::File;
1983 let testdata = arrow::util::test_util::parquet_test_data();
1984 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1985 let mut file = File::open(&path).await.unwrap();
1986 let file_size = file.metadata().await.unwrap().len();
1987 let metadata = ParquetMetaDataReader::new()
1988 .with_page_indexes(true)
1989 .load_and_finish(&mut file, file_size)
1990 .await
1991 .unwrap();
1992
1993 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1994 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1995 let reader =
1996 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1997 .build()
1998 .unwrap();
1999
2000 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2001 assert_eq!(result.len(), 8);
2002 }
2003
2004 #[tokio::test]
2005 #[allow(deprecated)]
2006 async fn empty_offset_index_doesnt_panic_in_column_chunks() {
2007 use tempfile::TempDir;
2008 use tokio::fs::File;
2009 fn write_metadata_to_local_file(
2010 metadata: ParquetMetaData,
2011 file: impl AsRef<std::path::Path>,
2012 ) {
2013 use crate::file::metadata::ParquetMetaDataWriter;
2014 use std::fs::File;
2015 let file = File::create(file).unwrap();
2016 ParquetMetaDataWriter::new(file, &metadata)
2017 .finish()
2018 .unwrap()
2019 }
2020
2021 fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
2022 use std::fs::File;
2023 let file = File::open(file).unwrap();
2024 ParquetMetaDataReader::new()
2025 .with_page_indexes(true)
2026 .parse_and_finish(&file)
2027 .unwrap()
2028 }
2029
2030 let testdata = arrow::util::test_util::parquet_test_data();
2031 let path = format!("{testdata}/alltypes_plain.parquet");
2032 let mut file = File::open(&path).await.unwrap();
2033 let file_size = file.metadata().await.unwrap().len();
2034 let metadata = ParquetMetaDataReader::new()
2035 .with_page_indexes(true)
2036 .load_and_finish(&mut file, file_size)
2037 .await
2038 .unwrap();
2039
2040 let tempdir = TempDir::new().unwrap();
2041 let metadata_path = tempdir.path().join("thrift_metadata.dat");
2042 write_metadata_to_local_file(metadata, &metadata_path);
2043 let metadata = read_metadata_from_local_file(&metadata_path);
2044
2045 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2046 let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2047 let reader =
2048 ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
2049 .build()
2050 .unwrap();
2051
2052 let result = reader.try_collect::<Vec<_>>().await.unwrap();
2054 assert_eq!(result.len(), 1);
2055 }
2056
2057 #[tokio::test]
2058 async fn test_cached_array_reader_sparse_offset_error() {
2059 use futures::TryStreamExt;
2060
2061 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
2062 use arrow_array::{BooleanArray, RecordBatch};
2063
2064 let testdata = arrow::util::test_util::parquet_test_data();
2065 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
2066 let data = Bytes::from(std::fs::read(path).unwrap());
2067
2068 let async_reader = TestReader::new(data);
2069
2070 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2072 let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
2073 .await
2074 .unwrap();
2075
2076 let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
2080
2081 let parquet_schema = builder.parquet_schema();
2085 let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
2086 let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
2087 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2088 });
2089 let filter = RowFilter::new(vec![Box::new(always_true)]);
2090
2091 let stream = builder
2094 .with_batch_size(8)
2095 .with_projection(proj)
2096 .with_row_selection(selection)
2097 .with_row_filter(filter)
2098 .build()
2099 .unwrap();
2100
2101 let _result: Vec<_> = stream.try_collect().await.unwrap();
2104 }
2105
2106 #[tokio::test]
2107 async fn test_predicate_cache_disabled() {
2108 let k = Int32Array::from_iter_values(0..10);
2109 let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
2110
2111 let mut buf = Vec::new();
2112 let props = WriterProperties::builder()
2114 .set_data_page_row_count_limit(1)
2115 .set_write_batch_size(1)
2116 .set_max_row_group_size(10)
2117 .set_write_page_header_statistics(true)
2118 .build();
2119 let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
2120 writer.write(&data).unwrap();
2121 writer.close().unwrap();
2122
2123 let data = Bytes::from(buf);
2124 let metadata = ParquetMetaDataReader::new()
2125 .with_page_index_policy(PageIndexPolicy::Required)
2126 .parse_and_finish(&data)
2127 .unwrap();
2128 let parquet_schema = metadata.file_metadata().schema_descr_ptr();
2129
2130 let build_filter = || {
2132 let scalar = Int32Array::from_iter_values([5]);
2133 let predicate = ArrowPredicateFn::new(
2134 ProjectionMask::leaves(&parquet_schema, vec![0]),
2135 move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
2136 );
2137 RowFilter::new(vec![Box::new(predicate)])
2138 };
2139
2140 let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
2142
2143 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
2144 let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
2145
2146 let reader_with_cache = TestReader::new(data.clone());
2148 let requests_with_cache = reader_with_cache.requests.clone();
2149 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2150 reader_with_cache,
2151 reader_metadata.clone(),
2152 )
2153 .with_batch_size(1000)
2154 .with_row_selection(selection.clone())
2155 .with_row_filter(build_filter())
2156 .build()
2157 .unwrap();
2158 let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
2159
2160 let reader_without_cache = TestReader::new(data);
2162 let requests_without_cache = reader_without_cache.requests.clone();
2163 let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
2164 reader_without_cache,
2165 reader_metadata,
2166 )
2167 .with_batch_size(1000)
2168 .with_row_selection(selection)
2169 .with_row_filter(build_filter())
2170 .with_max_predicate_cache_size(0) .build()
2172 .unwrap();
2173 let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
2174
2175 assert_eq!(batches_with_cache, batches_without_cache);
2176
2177 let requests_with_cache = requests_with_cache.lock().unwrap();
2178 let requests_without_cache = requests_without_cache.lock().unwrap();
2179
2180 assert_eq!(requests_with_cache.len(), 11);
2182 assert_eq!(requests_without_cache.len(), 2);
2183
2184 assert_eq!(
2186 requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
2187 433
2188 );
2189 assert_eq!(
2190 requests_without_cache
2191 .iter()
2192 .map(|r| r.len())
2193 .sum::<usize>(),
2194 92
2195 );
2196 }
2197
2198 #[test]
2199 fn test_row_numbers_with_multiple_row_groups() {
2200 test_row_numbers_with_multiple_row_groups_helper(
2201 false,
2202 |path, selection, _row_filter, batch_size| {
2203 let runtime = tokio::runtime::Builder::new_current_thread()
2204 .enable_all()
2205 .build()
2206 .expect("Could not create runtime");
2207 runtime.block_on(async move {
2208 let file = tokio::fs::File::open(path).await.unwrap();
2209 let row_number_field = Arc::new(
2210 Field::new("row_number", DataType::Int64, false)
2211 .with_extension_type(RowNumber),
2212 );
2213 let options = ArrowReaderOptions::new()
2214 .with_virtual_columns(vec![row_number_field])
2215 .unwrap();
2216 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2217 .await
2218 .unwrap()
2219 .with_row_selection(selection)
2220 .with_batch_size(batch_size)
2221 .build()
2222 .expect("Could not create reader");
2223 reader.try_collect::<Vec<_>>().await.unwrap()
2224 })
2225 },
2226 );
2227 }
2228
2229 #[test]
2230 fn test_row_numbers_with_multiple_row_groups_and_filter() {
2231 test_row_numbers_with_multiple_row_groups_helper(
2232 true,
2233 |path, selection, row_filter, batch_size| {
2234 let runtime = tokio::runtime::Builder::new_current_thread()
2235 .enable_all()
2236 .build()
2237 .expect("Could not create runtime");
2238 runtime.block_on(async move {
2239 let file = tokio::fs::File::open(path).await.unwrap();
2240 let row_number_field = Arc::new(
2241 Field::new("row_number", DataType::Int64, false)
2242 .with_extension_type(RowNumber),
2243 );
2244 let options = ArrowReaderOptions::new()
2245 .with_virtual_columns(vec![row_number_field])
2246 .unwrap();
2247 let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
2248 .await
2249 .unwrap()
2250 .with_row_selection(selection)
2251 .with_row_filter(row_filter.expect("No row filter"))
2252 .with_batch_size(batch_size)
2253 .build()
2254 .expect("Could not create reader");
2255 reader.try_collect::<Vec<_>>().await.unwrap()
2256 })
2257 },
2258 );
2259 }
2260
2261 #[tokio::test]
2262 async fn test_nested_lists() -> Result<()> {
2263 let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
2265 let table_schema = Arc::new(Schema::new(vec![
2266 Field::new("id", DataType::Int32, false),
2267 Field::new("vector", DataType::List(list_inner_field.clone()), true),
2268 ]));
2269
2270 let mut list_builder =
2271 ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
2272 list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
2273 list_builder.append(true);
2274 list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
2275 list_builder.append(true);
2276 list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
2277 list_builder.append(true);
2278 list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
2279 list_builder.append(true);
2280 let list_array = list_builder.finish();
2281
2282 let data = vec![RecordBatch::try_new(
2283 table_schema.clone(),
2284 vec![
2285 Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
2286 Arc::new(list_array),
2287 ],
2288 )?];
2289
2290 let mut buffer = Vec::new();
2291 let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
2292
2293 for batch in data {
2294 writer.write(&batch).await?;
2295 }
2296
2297 writer.close().await?;
2298
2299 let reader = TestReader::new(Bytes::from(buffer));
2300 let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
2301
2302 let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
2303 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
2304 });
2305
2306 let projection_mask = ProjectionMask::all();
2307
2308 let mut stream = builder
2309 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
2310 .with_projection(projection_mask)
2311 .build()?;
2312
2313 while let Some(batch) = stream.next().await {
2314 let _ = batch.unwrap(); }
2316
2317 Ok(())
2318 }
2319}