1use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24use arrow_select::filter::prep_null_mask_filter;
25pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
26pub use selection::{RowSelection, RowSelector};
27use std::collections::VecDeque;
28use std::sync::Arc;
29
30pub use crate::arrow::array_reader::RowGroups;
31use crate::arrow::array_reader::{build_array_reader, ArrayReader};
32use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
33use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use crate::column::page::{PageIterator, PageReader};
35#[cfg(feature = "encryption")]
36use crate::encryption::decrypt::FileDecryptionProperties;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39use crate::file::reader::{ChunkReader, SerializedPageReader};
40use crate::schema::types::SchemaDescriptor;
41
42mod filter;
43mod selection;
44pub mod statistics;
45
46pub struct ArrowReaderBuilder<T> {
92 pub(crate) input: T,
93
94 pub(crate) metadata: Arc<ParquetMetaData>,
95
96 pub(crate) schema: SchemaRef,
97
98 pub(crate) fields: Option<Arc<ParquetField>>,
99
100 pub(crate) batch_size: usize,
101
102 pub(crate) row_groups: Option<Vec<usize>>,
103
104 pub(crate) projection: ProjectionMask,
105
106 pub(crate) filter: Option<RowFilter>,
107
108 pub(crate) selection: Option<RowSelection>,
109
110 pub(crate) limit: Option<usize>,
111
112 pub(crate) offset: Option<usize>,
113}
114
115impl<T> ArrowReaderBuilder<T> {
116 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
117 Self {
118 input,
119 metadata: metadata.metadata,
120 schema: metadata.schema,
121 fields: metadata.fields,
122 batch_size: 1024,
123 row_groups: None,
124 projection: ProjectionMask::all(),
125 filter: None,
126 selection: None,
127 limit: None,
128 offset: None,
129 }
130 }
131
132 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
134 &self.metadata
135 }
136
137 pub fn parquet_schema(&self) -> &SchemaDescriptor {
139 self.metadata.file_metadata().schema_descr()
140 }
141
142 pub fn schema(&self) -> &SchemaRef {
144 &self.schema
145 }
146
147 pub fn with_batch_size(self, batch_size: usize) -> Self {
150 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
152 Self { batch_size, ..self }
153 }
154
155 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
159 Self {
160 row_groups: Some(row_groups),
161 ..self
162 }
163 }
164
165 pub fn with_projection(self, mask: ProjectionMask) -> Self {
167 Self {
168 projection: mask,
169 ..self
170 }
171 }
172
173 pub fn with_row_selection(self, selection: RowSelection) -> Self {
233 Self {
234 selection: Some(selection),
235 ..self
236 }
237 }
238
239 pub fn with_row_filter(self, filter: RowFilter) -> Self {
246 Self {
247 filter: Some(filter),
248 ..self
249 }
250 }
251
252 pub fn with_limit(self, limit: usize) -> Self {
260 Self {
261 limit: Some(limit),
262 ..self
263 }
264 }
265
266 pub fn with_offset(self, offset: usize) -> Self {
274 Self {
275 offset: Some(offset),
276 ..self
277 }
278 }
279}
280
281#[derive(Debug, Clone, Default)]
286pub struct ArrowReaderOptions {
287 skip_arrow_metadata: bool,
289 supplied_schema: Option<SchemaRef>,
294 pub(crate) page_index: bool,
296 #[cfg(feature = "encryption")]
298 pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
299}
300
301impl ArrowReaderOptions {
302 pub fn new() -> Self {
304 Self::default()
305 }
306
307 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
314 Self {
315 skip_arrow_metadata,
316 ..self
317 }
318 }
319
320 pub fn with_schema(self, schema: SchemaRef) -> Self {
377 Self {
378 supplied_schema: Some(schema),
379 skip_arrow_metadata: true,
380 ..self
381 }
382 }
383
384 pub fn with_page_index(self, page_index: bool) -> Self {
397 Self { page_index, ..self }
398 }
399
400 #[cfg(feature = "encryption")]
404 pub fn with_file_decryption_properties(
405 self,
406 file_decryption_properties: FileDecryptionProperties,
407 ) -> Self {
408 Self {
409 file_decryption_properties: Some(file_decryption_properties),
410 ..self
411 }
412 }
413
414 pub fn page_index(&self) -> bool {
418 self.page_index
419 }
420
421 #[cfg(feature = "encryption")]
426 pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
427 self.file_decryption_properties.as_ref()
428 }
429}
430
431#[derive(Debug, Clone)]
446pub struct ArrowReaderMetadata {
447 pub(crate) metadata: Arc<ParquetMetaData>,
449 pub(crate) schema: SchemaRef,
451
452 pub(crate) fields: Option<Arc<ParquetField>>,
453}
454
455impl ArrowReaderMetadata {
456 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
467 let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
468 #[cfg(feature = "encryption")]
469 let metadata =
470 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
471 let metadata = metadata.parse_and_finish(reader)?;
472 Self::try_new(Arc::new(metadata), options)
473 }
474
475 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
482 match options.supplied_schema {
483 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
484 None => {
485 let kv_metadata = match options.skip_arrow_metadata {
486 true => None,
487 false => metadata.file_metadata().key_value_metadata(),
488 };
489
490 let (schema, fields) = parquet_to_arrow_schema_and_fields(
491 metadata.file_metadata().schema_descr(),
492 ProjectionMask::all(),
493 kv_metadata,
494 )?;
495
496 Ok(Self {
497 metadata,
498 schema: Arc::new(schema),
499 fields: fields.map(Arc::new),
500 })
501 }
502 }
503 }
504
505 fn with_supplied_schema(
506 metadata: Arc<ParquetMetaData>,
507 supplied_schema: SchemaRef,
508 ) -> Result<Self> {
509 let parquet_schema = metadata.file_metadata().schema_descr();
510 let field_levels = parquet_to_arrow_field_levels(
511 parquet_schema,
512 ProjectionMask::all(),
513 Some(supplied_schema.fields()),
514 )?;
515 let fields = field_levels.fields;
516 let inferred_len = fields.len();
517 let supplied_len = supplied_schema.fields().len();
518 if inferred_len != supplied_len {
522 Err(arrow_err!(format!(
523 "incompatible arrow schema, expected {} columns received {}",
524 inferred_len, supplied_len
525 )))
526 } else {
527 let diff_fields: Vec<_> = supplied_schema
528 .fields()
529 .iter()
530 .zip(fields.iter())
531 .filter_map(|(field1, field2)| {
532 if field1 != field2 {
533 Some(field1.name().clone())
534 } else {
535 None
536 }
537 })
538 .collect();
539
540 if !diff_fields.is_empty() {
541 Err(ParquetError::ArrowError(format!(
542 "incompatible arrow schema, the following fields could not be cast: [{}]",
543 diff_fields.join(", ")
544 )))
545 } else {
546 Ok(Self {
547 metadata,
548 schema: supplied_schema,
549 fields: field_levels.levels.map(Arc::new),
550 })
551 }
552 }
553 }
554
555 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
557 &self.metadata
558 }
559
560 pub fn parquet_schema(&self) -> &SchemaDescriptor {
562 self.metadata.file_metadata().schema_descr()
563 }
564
565 pub fn schema(&self) -> &SchemaRef {
567 &self.schema
568 }
569}
570
571#[doc(hidden)]
572pub struct SyncReader<T: ChunkReader>(T);
574
575pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
581
582impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
583 pub fn try_new(reader: T) -> Result<Self> {
612 Self::try_new_with_options(reader, Default::default())
613 }
614
615 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
617 let metadata = ArrowReaderMetadata::load(&reader, options)?;
618 Ok(Self::new_with_metadata(reader, metadata))
619 }
620
621 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
659 Self::new_builder(SyncReader(input), metadata)
660 }
661
662 pub fn build(self) -> Result<ParquetRecordBatchReader> {
666 let batch_size = self
668 .batch_size
669 .min(self.metadata.file_metadata().num_rows() as usize);
670
671 let row_groups = self
672 .row_groups
673 .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
674
675 let reader = ReaderRowGroups {
676 reader: Arc::new(self.input.0),
677 metadata: self.metadata,
678 row_groups,
679 };
680
681 let mut filter = self.filter;
682 let mut selection = self.selection;
683
684 if let Some(filter) = filter.as_mut() {
685 for predicate in filter.predicates.iter_mut() {
686 if !selects_any(selection.as_ref()) {
687 break;
688 }
689
690 let array_reader =
691 build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
692
693 selection = Some(evaluate_predicate(
694 batch_size,
695 array_reader,
696 selection,
697 predicate.as_mut(),
698 )?);
699 }
700 }
701
702 let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
703
704 if !selects_any(selection.as_ref()) {
706 selection = Some(RowSelection::from(vec![]));
707 }
708
709 Ok(ParquetRecordBatchReader::new(
710 batch_size,
711 array_reader,
712 apply_range(selection, reader.num_rows(), self.offset, self.limit),
713 ))
714 }
715}
716
717struct ReaderRowGroups<T: ChunkReader> {
718 reader: Arc<T>,
719
720 metadata: Arc<ParquetMetaData>,
721 row_groups: Vec<usize>,
723}
724
725impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
726 fn num_rows(&self) -> usize {
727 let meta = self.metadata.row_groups();
728 self.row_groups
729 .iter()
730 .map(|x| meta[*x].num_rows() as usize)
731 .sum()
732 }
733
734 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
735 Ok(Box::new(ReaderPageIterator {
736 column_idx: i,
737 reader: self.reader.clone(),
738 metadata: self.metadata.clone(),
739 row_groups: self.row_groups.clone().into_iter(),
740 }))
741 }
742}
743
744struct ReaderPageIterator<T: ChunkReader> {
745 reader: Arc<T>,
746 column_idx: usize,
747 row_groups: std::vec::IntoIter<usize>,
748 metadata: Arc<ParquetMetaData>,
749}
750
751impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
752 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
754 let rg = self.metadata.row_group(rg_idx);
755 let column_chunk_metadata = rg.column(self.column_idx);
756 let offset_index = self.metadata.offset_index();
757 let page_locations = offset_index
760 .filter(|i| !i[rg_idx].is_empty())
761 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
762 let total_rows = rg.num_rows() as usize;
763 let reader = self.reader.clone();
764
765 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
766 .add_crypto_context(
767 rg_idx,
768 self.column_idx,
769 self.metadata.as_ref(),
770 column_chunk_metadata,
771 )
772 }
773}
774
775impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
776 type Item = Result<Box<dyn PageReader>>;
777
778 fn next(&mut self) -> Option<Self::Item> {
779 let rg_idx = self.row_groups.next()?;
780 let page_reader = self
781 .next_page_reader(rg_idx)
782 .map(|page_reader| Box::new(page_reader) as _);
783 Some(page_reader)
784 }
785}
786
787impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
788
789pub struct ParquetRecordBatchReader {
792 batch_size: usize,
793 array_reader: Box<dyn ArrayReader>,
794 schema: SchemaRef,
795 selection: Option<VecDeque<RowSelector>>,
797}
798
799impl Iterator for ParquetRecordBatchReader {
800 type Item = Result<RecordBatch, ArrowError>;
801
802 fn next(&mut self) -> Option<Self::Item> {
803 self.next_inner()
804 .map_err(|arrow_err| arrow_err.into())
805 .transpose()
806 }
807}
808
809impl ParquetRecordBatchReader {
810 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
816 let mut read_records = 0;
817 match self.selection.as_mut() {
818 Some(selection) => {
819 while read_records < self.batch_size && !selection.is_empty() {
820 let front = selection.pop_front().unwrap();
821 if front.skip {
822 let skipped = self.array_reader.skip_records(front.row_count)?;
823
824 if skipped != front.row_count {
825 return Err(general_err!(
826 "failed to skip rows, expected {}, got {}",
827 front.row_count,
828 skipped
829 ));
830 }
831 continue;
832 }
833
834 if front.row_count == 0 {
837 continue;
838 }
839
840 let need_read = self.batch_size - read_records;
842 let to_read = match front.row_count.checked_sub(need_read) {
843 Some(remaining) if remaining != 0 => {
844 selection.push_front(RowSelector::select(remaining));
847 need_read
848 }
849 _ => front.row_count,
850 };
851 match self.array_reader.read_records(to_read)? {
852 0 => break,
853 rec => read_records += rec,
854 };
855 }
856 }
857 None => {
858 self.array_reader.read_records(self.batch_size)?;
859 }
860 };
861
862 let array = self.array_reader.consume_batch()?;
863 let struct_array = array.as_struct_opt().ok_or_else(|| {
864 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
865 })?;
866
867 Ok(if struct_array.len() > 0 {
868 Some(RecordBatch::from(struct_array))
869 } else {
870 None
871 })
872 }
873}
874
875impl RecordBatchReader for ParquetRecordBatchReader {
876 fn schema(&self) -> SchemaRef {
881 self.schema.clone()
882 }
883}
884
885impl ParquetRecordBatchReader {
886 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
890 ParquetRecordBatchReaderBuilder::try_new(reader)?
891 .with_batch_size(batch_size)
892 .build()
893 }
894
895 pub fn try_new_with_row_groups(
900 levels: &FieldLevels,
901 row_groups: &dyn RowGroups,
902 batch_size: usize,
903 selection: Option<RowSelection>,
904 ) -> Result<Self> {
905 let array_reader =
906 build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
907
908 Ok(Self {
909 batch_size,
910 array_reader,
911 schema: Arc::new(Schema::new(levels.fields.clone())),
912 selection: selection.map(|s| s.trim().into()),
913 })
914 }
915
916 pub(crate) fn new(
920 batch_size: usize,
921 array_reader: Box<dyn ArrayReader>,
922 selection: Option<RowSelection>,
923 ) -> Self {
924 let schema = match array_reader.get_data_type() {
925 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
926 _ => unreachable!("Struct array reader's data type is not struct!"),
927 };
928
929 Self {
930 batch_size,
931 array_reader,
932 schema: Arc::new(schema),
933 selection: selection.map(|s| s.trim().into()),
934 }
935 }
936}
937
938pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
940 selection.map(|x| x.selects_any()).unwrap_or(true)
941}
942
943pub(crate) fn apply_range(
945 mut selection: Option<RowSelection>,
946 row_count: usize,
947 offset: Option<usize>,
948 limit: Option<usize>,
949) -> Option<RowSelection> {
950 if let Some(offset) = offset {
952 selection = Some(match row_count.checked_sub(offset) {
953 None => RowSelection::from(vec![]),
954 Some(remaining) => selection
955 .map(|selection| selection.offset(offset))
956 .unwrap_or_else(|| {
957 RowSelection::from(vec![
958 RowSelector::skip(offset),
959 RowSelector::select(remaining),
960 ])
961 }),
962 });
963 }
964
965 if let Some(limit) = limit {
967 selection = Some(
968 selection
969 .map(|selection| selection.limit(limit))
970 .unwrap_or_else(|| {
971 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
972 }),
973 );
974 }
975 selection
976}
977
978pub(crate) fn evaluate_predicate(
989 batch_size: usize,
990 array_reader: Box<dyn ArrayReader>,
991 input_selection: Option<RowSelection>,
992 predicate: &mut dyn ArrowPredicate,
993) -> Result<RowSelection> {
994 let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
995 let mut filters = vec![];
996 for maybe_batch in reader {
997 let maybe_batch = maybe_batch?;
998 let input_rows = maybe_batch.num_rows();
999 let filter = predicate.evaluate(maybe_batch)?;
1000 if filter.len() != input_rows {
1002 return Err(arrow_err!(
1003 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
1004 filter.len()
1005 ));
1006 }
1007 match filter.null_count() {
1008 0 => filters.push(filter),
1009 _ => filters.push(prep_null_mask_filter(&filter)),
1010 };
1011 }
1012
1013 let raw = RowSelection::from_filters(&filters);
1014 Ok(match input_selection {
1015 Some(selection) => selection.and_then(&raw),
1016 None => raw,
1017 })
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use std::cmp::min;
1023 use std::collections::{HashMap, VecDeque};
1024 use std::fmt::Formatter;
1025 use std::fs::File;
1026 use std::io::Seek;
1027 use std::path::PathBuf;
1028 use std::sync::Arc;
1029
1030 use arrow_array::builder::*;
1031 use arrow_array::cast::AsArray;
1032 use arrow_array::types::{
1033 Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
1034 Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
1035 };
1036 use arrow_array::*;
1037 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1038 use arrow_data::{ArrayData, ArrayDataBuilder};
1039 use arrow_schema::{
1040 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1041 };
1042 use arrow_select::concat::concat_batches;
1043 use bytes::Bytes;
1044 use half::f16;
1045 use num::PrimInt;
1046 use rand::{rng, Rng, RngCore};
1047 use tempfile::tempfile;
1048
1049 use crate::arrow::arrow_reader::{
1050 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1051 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1052 };
1053 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1054 use crate::arrow::{ArrowWriter, ProjectionMask};
1055 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1056 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1057 use crate::data_type::{
1058 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1059 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1060 };
1061 use crate::errors::Result;
1062 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1063 use crate::file::writer::SerializedFileWriter;
1064 use crate::schema::parser::parse_message_type;
1065 use crate::schema::types::{Type, TypePtr};
1066 use crate::util::test_common::rand_gen::RandGen;
1067
1068 #[test]
1069 fn test_arrow_reader_all_columns() {
1070 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1071
1072 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1073 let original_schema = Arc::clone(builder.schema());
1074 let reader = builder.build().unwrap();
1075
1076 assert_eq!(original_schema.fields(), reader.schema().fields());
1078 }
1079
1080 #[test]
1081 fn test_arrow_reader_single_column() {
1082 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1083
1084 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1085 let original_schema = Arc::clone(builder.schema());
1086
1087 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1088 let reader = builder.with_projection(mask).build().unwrap();
1089
1090 assert_eq!(1, reader.schema().fields().len());
1092 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1093 }
1094
1095 #[test]
1096 fn test_arrow_reader_single_column_by_name() {
1097 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1098
1099 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1100 let original_schema = Arc::clone(builder.schema());
1101
1102 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1103 let reader = builder.with_projection(mask).build().unwrap();
1104
1105 assert_eq!(1, reader.schema().fields().len());
1107 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1108 }
1109
1110 #[test]
1111 fn test_null_column_reader_test() {
1112 let mut file = tempfile::tempfile().unwrap();
1113
1114 let schema = "
1115 message message {
1116 OPTIONAL INT32 int32;
1117 }
1118 ";
1119 let schema = Arc::new(parse_message_type(schema).unwrap());
1120
1121 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1122 generate_single_column_file_with_data::<Int32Type>(
1123 &[vec![], vec![]],
1124 Some(&def_levels),
1125 file.try_clone().unwrap(), schema,
1127 Some(Field::new("int32", ArrowDataType::Null, true)),
1128 &Default::default(),
1129 )
1130 .unwrap();
1131
1132 file.rewind().unwrap();
1133
1134 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1135 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1136
1137 assert_eq!(batches.len(), 4);
1138 for batch in &batches[0..3] {
1139 assert_eq!(batch.num_rows(), 2);
1140 assert_eq!(batch.num_columns(), 1);
1141 assert_eq!(batch.column(0).null_count(), 2);
1142 }
1143
1144 assert_eq!(batches[3].num_rows(), 1);
1145 assert_eq!(batches[3].num_columns(), 1);
1146 assert_eq!(batches[3].column(0).null_count(), 1);
1147 }
1148
1149 #[test]
1150 fn test_primitive_single_column_reader_test() {
1151 run_single_column_reader_tests::<BoolType, _, BoolType>(
1152 2,
1153 ConvertedType::NONE,
1154 None,
1155 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1156 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1157 );
1158 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1159 2,
1160 ConvertedType::NONE,
1161 None,
1162 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1163 &[
1164 Encoding::PLAIN,
1165 Encoding::RLE_DICTIONARY,
1166 Encoding::DELTA_BINARY_PACKED,
1167 Encoding::BYTE_STREAM_SPLIT,
1168 ],
1169 );
1170 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1171 2,
1172 ConvertedType::NONE,
1173 None,
1174 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1175 &[
1176 Encoding::PLAIN,
1177 Encoding::RLE_DICTIONARY,
1178 Encoding::DELTA_BINARY_PACKED,
1179 Encoding::BYTE_STREAM_SPLIT,
1180 ],
1181 );
1182 run_single_column_reader_tests::<FloatType, _, FloatType>(
1183 2,
1184 ConvertedType::NONE,
1185 None,
1186 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1187 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1188 );
1189 }
1190
1191 #[test]
1192 fn test_unsigned_primitive_single_column_reader_test() {
1193 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1194 2,
1195 ConvertedType::UINT_32,
1196 Some(ArrowDataType::UInt32),
1197 |vals| {
1198 Arc::new(UInt32Array::from_iter(
1199 vals.iter().map(|x| x.map(|x| x as u32)),
1200 ))
1201 },
1202 &[
1203 Encoding::PLAIN,
1204 Encoding::RLE_DICTIONARY,
1205 Encoding::DELTA_BINARY_PACKED,
1206 ],
1207 );
1208 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1209 2,
1210 ConvertedType::UINT_64,
1211 Some(ArrowDataType::UInt64),
1212 |vals| {
1213 Arc::new(UInt64Array::from_iter(
1214 vals.iter().map(|x| x.map(|x| x as u64)),
1215 ))
1216 },
1217 &[
1218 Encoding::PLAIN,
1219 Encoding::RLE_DICTIONARY,
1220 Encoding::DELTA_BINARY_PACKED,
1221 ],
1222 );
1223 }
1224
1225 #[test]
1226 fn test_unsigned_roundtrip() {
1227 let schema = Arc::new(Schema::new(vec![
1228 Field::new("uint32", ArrowDataType::UInt32, true),
1229 Field::new("uint64", ArrowDataType::UInt64, true),
1230 ]));
1231
1232 let mut buf = Vec::with_capacity(1024);
1233 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1234
1235 let original = RecordBatch::try_new(
1236 schema,
1237 vec![
1238 Arc::new(UInt32Array::from_iter_values([
1239 0,
1240 i32::MAX as u32,
1241 u32::MAX,
1242 ])),
1243 Arc::new(UInt64Array::from_iter_values([
1244 0,
1245 i64::MAX as u64,
1246 u64::MAX,
1247 ])),
1248 ],
1249 )
1250 .unwrap();
1251
1252 writer.write(&original).unwrap();
1253 writer.close().unwrap();
1254
1255 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1256 let ret = reader.next().unwrap().unwrap();
1257 assert_eq!(ret, original);
1258
1259 ret.column(0)
1261 .as_any()
1262 .downcast_ref::<UInt32Array>()
1263 .unwrap();
1264
1265 ret.column(1)
1266 .as_any()
1267 .downcast_ref::<UInt64Array>()
1268 .unwrap();
1269 }
1270
1271 #[test]
1272 fn test_float16_roundtrip() -> Result<()> {
1273 let schema = Arc::new(Schema::new(vec![
1274 Field::new("float16", ArrowDataType::Float16, false),
1275 Field::new("float16-nullable", ArrowDataType::Float16, true),
1276 ]));
1277
1278 let mut buf = Vec::with_capacity(1024);
1279 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1280
1281 let original = RecordBatch::try_new(
1282 schema,
1283 vec![
1284 Arc::new(Float16Array::from_iter_values([
1285 f16::EPSILON,
1286 f16::MIN,
1287 f16::MAX,
1288 f16::NAN,
1289 f16::INFINITY,
1290 f16::NEG_INFINITY,
1291 f16::ONE,
1292 f16::NEG_ONE,
1293 f16::ZERO,
1294 f16::NEG_ZERO,
1295 f16::E,
1296 f16::PI,
1297 f16::FRAC_1_PI,
1298 ])),
1299 Arc::new(Float16Array::from(vec![
1300 None,
1301 None,
1302 None,
1303 Some(f16::NAN),
1304 Some(f16::INFINITY),
1305 Some(f16::NEG_INFINITY),
1306 None,
1307 None,
1308 None,
1309 None,
1310 None,
1311 None,
1312 Some(f16::FRAC_1_PI),
1313 ])),
1314 ],
1315 )?;
1316
1317 writer.write(&original)?;
1318 writer.close()?;
1319
1320 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1321 let ret = reader.next().unwrap()?;
1322 assert_eq!(ret, original);
1323
1324 ret.column(0).as_primitive::<Float16Type>();
1326 ret.column(1).as_primitive::<Float16Type>();
1327
1328 Ok(())
1329 }
1330
1331 #[test]
1332 fn test_time_utc_roundtrip() -> Result<()> {
1333 let schema = Arc::new(Schema::new(vec![
1334 Field::new(
1335 "time_millis",
1336 ArrowDataType::Time32(TimeUnit::Millisecond),
1337 true,
1338 )
1339 .with_metadata(HashMap::from_iter(vec![(
1340 "adjusted_to_utc".to_string(),
1341 "".to_string(),
1342 )])),
1343 Field::new(
1344 "time_micros",
1345 ArrowDataType::Time64(TimeUnit::Microsecond),
1346 true,
1347 )
1348 .with_metadata(HashMap::from_iter(vec![(
1349 "adjusted_to_utc".to_string(),
1350 "".to_string(),
1351 )])),
1352 ]));
1353
1354 let mut buf = Vec::with_capacity(1024);
1355 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1356
1357 let original = RecordBatch::try_new(
1358 schema,
1359 vec![
1360 Arc::new(Time32MillisecondArray::from(vec![
1361 Some(-1),
1362 Some(0),
1363 Some(86_399_000),
1364 Some(86_400_000),
1365 Some(86_401_000),
1366 None,
1367 ])),
1368 Arc::new(Time64MicrosecondArray::from(vec![
1369 Some(-1),
1370 Some(0),
1371 Some(86_399 * 1_000_000),
1372 Some(86_400 * 1_000_000),
1373 Some(86_401 * 1_000_000),
1374 None,
1375 ])),
1376 ],
1377 )?;
1378
1379 writer.write(&original)?;
1380 writer.close()?;
1381
1382 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1383 let ret = reader.next().unwrap()?;
1384 assert_eq!(ret, original);
1385
1386 ret.column(0).as_primitive::<Time32MillisecondType>();
1388 ret.column(1).as_primitive::<Time64MicrosecondType>();
1389
1390 Ok(())
1391 }
1392
1393 #[test]
1394 fn test_date32_roundtrip() -> Result<()> {
1395 use arrow_array::Date32Array;
1396
1397 let schema = Arc::new(Schema::new(vec![Field::new(
1398 "date32",
1399 ArrowDataType::Date32,
1400 false,
1401 )]));
1402
1403 let mut buf = Vec::with_capacity(1024);
1404
1405 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1406
1407 let original = RecordBatch::try_new(
1408 schema,
1409 vec![Arc::new(Date32Array::from(vec![
1410 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1411 ]))],
1412 )?;
1413
1414 writer.write(&original)?;
1415 writer.close()?;
1416
1417 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1418 let ret = reader.next().unwrap()?;
1419 assert_eq!(ret, original);
1420
1421 ret.column(0).as_primitive::<Date32Type>();
1423
1424 Ok(())
1425 }
1426
1427 #[test]
1428 fn test_date64_roundtrip() -> Result<()> {
1429 use arrow_array::Date64Array;
1430
1431 let schema = Arc::new(Schema::new(vec![
1432 Field::new("small-date64", ArrowDataType::Date64, false),
1433 Field::new("big-date64", ArrowDataType::Date64, false),
1434 Field::new("invalid-date64", ArrowDataType::Date64, false),
1435 ]));
1436
1437 let mut default_buf = Vec::with_capacity(1024);
1438 let mut coerce_buf = Vec::with_capacity(1024);
1439
1440 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1441
1442 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1443 let mut coerce_writer =
1444 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1445
1446 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1447
1448 let original = RecordBatch::try_new(
1449 schema,
1450 vec![
1451 Arc::new(Date64Array::from(vec![
1453 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1454 -1_000 * NUM_MILLISECONDS_IN_DAY,
1455 0,
1456 1_000 * NUM_MILLISECONDS_IN_DAY,
1457 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1458 ])),
1459 Arc::new(Date64Array::from(vec![
1461 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1462 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1463 0,
1464 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1465 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1466 ])),
1467 Arc::new(Date64Array::from(vec![
1469 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1470 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1471 1,
1472 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1473 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1474 ])),
1475 ],
1476 )?;
1477
1478 default_writer.write(&original)?;
1479 coerce_writer.write(&original)?;
1480
1481 default_writer.close()?;
1482 coerce_writer.close()?;
1483
1484 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1485 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1486
1487 let default_ret = default_reader.next().unwrap()?;
1488 let coerce_ret = coerce_reader.next().unwrap()?;
1489
1490 assert_eq!(default_ret, original);
1492
1493 assert_eq!(coerce_ret.column(0), original.column(0));
1495 assert_ne!(coerce_ret.column(1), original.column(1));
1496 assert_ne!(coerce_ret.column(2), original.column(2));
1497
1498 default_ret.column(0).as_primitive::<Date64Type>();
1500 coerce_ret.column(0).as_primitive::<Date64Type>();
1501
1502 Ok(())
1503 }
1504 struct RandFixedLenGen {}
1505
1506 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1507 fn gen(len: i32) -> FixedLenByteArray {
1508 let mut v = vec![0u8; len as usize];
1509 rng().fill_bytes(&mut v);
1510 ByteArray::from(v).into()
1511 }
1512 }
1513
1514 #[test]
1515 fn test_fixed_length_binary_column_reader() {
1516 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1517 20,
1518 ConvertedType::NONE,
1519 None,
1520 |vals| {
1521 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1522 for val in vals {
1523 match val {
1524 Some(b) => builder.append_value(b).unwrap(),
1525 None => builder.append_null(),
1526 }
1527 }
1528 Arc::new(builder.finish())
1529 },
1530 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1531 );
1532 }
1533
1534 #[test]
1535 fn test_interval_day_time_column_reader() {
1536 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1537 12,
1538 ConvertedType::INTERVAL,
1539 None,
1540 |vals| {
1541 Arc::new(
1542 vals.iter()
1543 .map(|x| {
1544 x.as_ref().map(|b| IntervalDayTime {
1545 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1546 milliseconds: i32::from_le_bytes(
1547 b.as_ref()[8..12].try_into().unwrap(),
1548 ),
1549 })
1550 })
1551 .collect::<IntervalDayTimeArray>(),
1552 )
1553 },
1554 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1555 );
1556 }
1557
1558 #[test]
1559 fn test_int96_single_column_reader_test() {
1560 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1561
1562 type TypeHintAndConversionFunction =
1563 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1564
1565 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1566 (None, |vals: &[Option<Int96>]| {
1568 Arc::new(TimestampNanosecondArray::from_iter(
1569 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1570 )) as ArrayRef
1571 }),
1572 (
1574 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1575 |vals: &[Option<Int96>]| {
1576 Arc::new(TimestampSecondArray::from_iter(
1577 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1578 )) as ArrayRef
1579 },
1580 ),
1581 (
1582 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1583 |vals: &[Option<Int96>]| {
1584 Arc::new(TimestampMillisecondArray::from_iter(
1585 vals.iter().map(|x| x.map(|x| x.to_millis())),
1586 )) as ArrayRef
1587 },
1588 ),
1589 (
1590 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1591 |vals: &[Option<Int96>]| {
1592 Arc::new(TimestampMicrosecondArray::from_iter(
1593 vals.iter().map(|x| x.map(|x| x.to_micros())),
1594 )) as ArrayRef
1595 },
1596 ),
1597 (
1598 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1599 |vals: &[Option<Int96>]| {
1600 Arc::new(TimestampNanosecondArray::from_iter(
1601 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1602 )) as ArrayRef
1603 },
1604 ),
1605 (
1607 Some(ArrowDataType::Timestamp(
1608 TimeUnit::Second,
1609 Some(Arc::from("-05:00")),
1610 )),
1611 |vals: &[Option<Int96>]| {
1612 Arc::new(
1613 TimestampSecondArray::from_iter(
1614 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1615 )
1616 .with_timezone("-05:00"),
1617 ) as ArrayRef
1618 },
1619 ),
1620 ];
1621
1622 resolutions.iter().for_each(|(arrow_type, converter)| {
1623 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1624 2,
1625 ConvertedType::NONE,
1626 arrow_type.clone(),
1627 converter,
1628 encodings,
1629 );
1630 })
1631 }
1632
1633 #[test]
1634 fn test_int96_from_spark_file_with_provided_schema() {
1635 use arrow_schema::DataType::Timestamp;
1639 let test_data = arrow::util::test_util::parquet_test_data();
1640 let path = format!("{test_data}/int96_from_spark.parquet");
1641 let file = File::open(path).unwrap();
1642
1643 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1644 "a",
1645 Timestamp(TimeUnit::Microsecond, None),
1646 true,
1647 )]));
1648 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1649
1650 let mut record_reader =
1651 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1652 .unwrap()
1653 .build()
1654 .unwrap();
1655
1656 let batch = record_reader.next().unwrap().unwrap();
1657 assert_eq!(batch.num_columns(), 1);
1658 let column = batch.column(0);
1659 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1660
1661 let expected = Arc::new(Int64Array::from(vec![
1662 Some(1704141296123456),
1663 Some(1704070800000000),
1664 Some(253402225200000000),
1665 Some(1735599600000000),
1666 None,
1667 Some(9089380393200000000),
1668 ]));
1669
1670 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1675 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1676
1677 assert_eq!(casted_timestamps.len(), expected.len());
1678
1679 casted_timestamps
1680 .iter()
1681 .zip(expected.iter())
1682 .for_each(|(lhs, rhs)| {
1683 assert_eq!(lhs, rhs);
1684 });
1685 }
1686
1687 #[test]
1688 fn test_int96_from_spark_file_without_provided_schema() {
1689 use arrow_schema::DataType::Timestamp;
1693 let test_data = arrow::util::test_util::parquet_test_data();
1694 let path = format!("{test_data}/int96_from_spark.parquet");
1695 let file = File::open(path).unwrap();
1696
1697 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1698 .unwrap()
1699 .build()
1700 .unwrap();
1701
1702 let batch = record_reader.next().unwrap().unwrap();
1703 assert_eq!(batch.num_columns(), 1);
1704 let column = batch.column(0);
1705 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1706
1707 let expected = Arc::new(Int64Array::from(vec![
1708 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1713 Some(-4864435138808946688), ]));
1715
1716 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1721 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1722
1723 assert_eq!(casted_timestamps.len(), expected.len());
1724
1725 casted_timestamps
1726 .iter()
1727 .zip(expected.iter())
1728 .for_each(|(lhs, rhs)| {
1729 assert_eq!(lhs, rhs);
1730 });
1731 }
1732
1733 struct RandUtf8Gen {}
1734
1735 impl RandGen<ByteArrayType> for RandUtf8Gen {
1736 fn gen(len: i32) -> ByteArray {
1737 Int32Type::gen(len).to_string().as_str().into()
1738 }
1739 }
1740
1741 #[test]
1742 fn test_utf8_single_column_reader_test() {
1743 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1744 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1745 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1746 })))
1747 }
1748
1749 let encodings = &[
1750 Encoding::PLAIN,
1751 Encoding::RLE_DICTIONARY,
1752 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1753 Encoding::DELTA_BYTE_ARRAY,
1754 ];
1755
1756 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1757 2,
1758 ConvertedType::NONE,
1759 None,
1760 |vals| {
1761 Arc::new(BinaryArray::from_iter(
1762 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1763 ))
1764 },
1765 encodings,
1766 );
1767
1768 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1769 2,
1770 ConvertedType::UTF8,
1771 None,
1772 string_converter::<i32>,
1773 encodings,
1774 );
1775
1776 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1777 2,
1778 ConvertedType::UTF8,
1779 Some(ArrowDataType::Utf8),
1780 string_converter::<i32>,
1781 encodings,
1782 );
1783
1784 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1785 2,
1786 ConvertedType::UTF8,
1787 Some(ArrowDataType::LargeUtf8),
1788 string_converter::<i64>,
1789 encodings,
1790 );
1791
1792 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1793 for key in &small_key_types {
1794 for encoding in encodings {
1795 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1796 opts.encoding = *encoding;
1797
1798 let data_type =
1799 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1800
1801 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1803 opts,
1804 2,
1805 ConvertedType::UTF8,
1806 Some(data_type.clone()),
1807 move |vals| {
1808 let vals = string_converter::<i32>(vals);
1809 arrow::compute::cast(&vals, &data_type).unwrap()
1810 },
1811 );
1812 }
1813 }
1814
1815 let key_types = [
1816 ArrowDataType::Int16,
1817 ArrowDataType::UInt16,
1818 ArrowDataType::Int32,
1819 ArrowDataType::UInt32,
1820 ArrowDataType::Int64,
1821 ArrowDataType::UInt64,
1822 ];
1823
1824 for key in &key_types {
1825 let data_type =
1826 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1827
1828 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1829 2,
1830 ConvertedType::UTF8,
1831 Some(data_type.clone()),
1832 move |vals| {
1833 let vals = string_converter::<i32>(vals);
1834 arrow::compute::cast(&vals, &data_type).unwrap()
1835 },
1836 encodings,
1837 );
1838
1839 }
1856 }
1857
1858 #[test]
1859 fn test_decimal_nullable_struct() {
1860 let decimals = Decimal256Array::from_iter_values(
1861 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1862 );
1863
1864 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1865 "decimals",
1866 decimals.data_type().clone(),
1867 false,
1868 )])))
1869 .len(8)
1870 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1871 .child_data(vec![decimals.into_data()])
1872 .build()
1873 .unwrap();
1874
1875 let written =
1876 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1877 .unwrap();
1878
1879 let mut buffer = Vec::with_capacity(1024);
1880 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1881 writer.write(&written).unwrap();
1882 writer.close().unwrap();
1883
1884 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1885 .unwrap()
1886 .collect::<Result<Vec<_>, _>>()
1887 .unwrap();
1888
1889 assert_eq!(&written.slice(0, 3), &read[0]);
1890 assert_eq!(&written.slice(3, 3), &read[1]);
1891 assert_eq!(&written.slice(6, 2), &read[2]);
1892 }
1893
1894 #[test]
1895 fn test_int32_nullable_struct() {
1896 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1897 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1898 "int32",
1899 int32.data_type().clone(),
1900 false,
1901 )])))
1902 .len(8)
1903 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1904 .child_data(vec![int32.into_data()])
1905 .build()
1906 .unwrap();
1907
1908 let written =
1909 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1910 .unwrap();
1911
1912 let mut buffer = Vec::with_capacity(1024);
1913 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1914 writer.write(&written).unwrap();
1915 writer.close().unwrap();
1916
1917 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1918 .unwrap()
1919 .collect::<Result<Vec<_>, _>>()
1920 .unwrap();
1921
1922 assert_eq!(&written.slice(0, 3), &read[0]);
1923 assert_eq!(&written.slice(3, 3), &read[1]);
1924 assert_eq!(&written.slice(6, 2), &read[2]);
1925 }
1926
1927 #[test]
1928 fn test_decimal_list() {
1929 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1930
1931 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1933 decimals.data_type().clone(),
1934 false,
1935 ))))
1936 .len(7)
1937 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1938 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1939 .child_data(vec![decimals.into_data()])
1940 .build()
1941 .unwrap();
1942
1943 let written =
1944 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1945 .unwrap();
1946
1947 let mut buffer = Vec::with_capacity(1024);
1948 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1949 writer.write(&written).unwrap();
1950 writer.close().unwrap();
1951
1952 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1953 .unwrap()
1954 .collect::<Result<Vec<_>, _>>()
1955 .unwrap();
1956
1957 assert_eq!(&written.slice(0, 3), &read[0]);
1958 assert_eq!(&written.slice(3, 3), &read[1]);
1959 assert_eq!(&written.slice(6, 1), &read[2]);
1960 }
1961
1962 #[test]
1963 fn test_read_decimal_file() {
1964 use arrow_array::Decimal128Array;
1965 let testdata = arrow::util::test_util::parquet_test_data();
1966 let file_variants = vec![
1967 ("byte_array", 4),
1968 ("fixed_length", 25),
1969 ("int32", 4),
1970 ("int64", 10),
1971 ];
1972 for (prefix, target_precision) in file_variants {
1973 let path = format!("{testdata}/{prefix}_decimal.parquet");
1974 let file = File::open(path).unwrap();
1975 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1976
1977 let batch = record_reader.next().unwrap().unwrap();
1978 assert_eq!(batch.num_rows(), 24);
1979 let col = batch
1980 .column(0)
1981 .as_any()
1982 .downcast_ref::<Decimal128Array>()
1983 .unwrap();
1984
1985 let expected = 1..25;
1986
1987 assert_eq!(col.precision(), target_precision);
1988 assert_eq!(col.scale(), 2);
1989
1990 for (i, v) in expected.enumerate() {
1991 assert_eq!(col.value(i), v * 100_i128);
1992 }
1993 }
1994 }
1995
1996 #[test]
1997 fn test_read_float16_nonzeros_file() {
1998 use arrow_array::Float16Array;
1999 let testdata = arrow::util::test_util::parquet_test_data();
2000 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2002 let file = File::open(path).unwrap();
2003 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2004
2005 let batch = record_reader.next().unwrap().unwrap();
2006 assert_eq!(batch.num_rows(), 8);
2007 let col = batch
2008 .column(0)
2009 .as_any()
2010 .downcast_ref::<Float16Array>()
2011 .unwrap();
2012
2013 let f16_two = f16::ONE + f16::ONE;
2014
2015 assert_eq!(col.null_count(), 1);
2016 assert!(col.is_null(0));
2017 assert_eq!(col.value(1), f16::ONE);
2018 assert_eq!(col.value(2), -f16_two);
2019 assert!(col.value(3).is_nan());
2020 assert_eq!(col.value(4), f16::ZERO);
2021 assert!(col.value(4).is_sign_positive());
2022 assert_eq!(col.value(5), f16::NEG_ONE);
2023 assert_eq!(col.value(6), f16::NEG_ZERO);
2024 assert!(col.value(6).is_sign_negative());
2025 assert_eq!(col.value(7), f16_two);
2026 }
2027
2028 #[test]
2029 fn test_read_float16_zeros_file() {
2030 use arrow_array::Float16Array;
2031 let testdata = arrow::util::test_util::parquet_test_data();
2032 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2034 let file = File::open(path).unwrap();
2035 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2036
2037 let batch = record_reader.next().unwrap().unwrap();
2038 assert_eq!(batch.num_rows(), 3);
2039 let col = batch
2040 .column(0)
2041 .as_any()
2042 .downcast_ref::<Float16Array>()
2043 .unwrap();
2044
2045 assert_eq!(col.null_count(), 1);
2046 assert!(col.is_null(0));
2047 assert_eq!(col.value(1), f16::ZERO);
2048 assert!(col.value(1).is_sign_positive());
2049 assert!(col.value(2).is_nan());
2050 }
2051
2052 #[test]
2053 fn test_read_float32_float64_byte_stream_split() {
2054 let path = format!(
2055 "{}/byte_stream_split.zstd.parquet",
2056 arrow::util::test_util::parquet_test_data(),
2057 );
2058 let file = File::open(path).unwrap();
2059 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2060
2061 let mut row_count = 0;
2062 for batch in record_reader {
2063 let batch = batch.unwrap();
2064 row_count += batch.num_rows();
2065 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2066 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2067
2068 for &x in f32_col.values() {
2070 assert!(x > -10.0);
2071 assert!(x < 10.0);
2072 }
2073 for &x in f64_col.values() {
2074 assert!(x > -10.0);
2075 assert!(x < 10.0);
2076 }
2077 }
2078 assert_eq!(row_count, 300);
2079 }
2080
2081 #[test]
2082 fn test_read_extended_byte_stream_split() {
2083 let path = format!(
2084 "{}/byte_stream_split_extended.gzip.parquet",
2085 arrow::util::test_util::parquet_test_data(),
2086 );
2087 let file = File::open(path).unwrap();
2088 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2089
2090 let mut row_count = 0;
2091 for batch in record_reader {
2092 let batch = batch.unwrap();
2093 row_count += batch.num_rows();
2094
2095 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2097 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2098 assert_eq!(f16_col.len(), f16_bss.len());
2099 f16_col
2100 .iter()
2101 .zip(f16_bss.iter())
2102 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2103
2104 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2106 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2107 assert_eq!(f32_col.len(), f32_bss.len());
2108 f32_col
2109 .iter()
2110 .zip(f32_bss.iter())
2111 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2112
2113 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2115 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2116 assert_eq!(f64_col.len(), f64_bss.len());
2117 f64_col
2118 .iter()
2119 .zip(f64_bss.iter())
2120 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2121
2122 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2124 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2125 assert_eq!(i32_col.len(), i32_bss.len());
2126 i32_col
2127 .iter()
2128 .zip(i32_bss.iter())
2129 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2130
2131 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2133 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2134 assert_eq!(i64_col.len(), i64_bss.len());
2135 i64_col
2136 .iter()
2137 .zip(i64_bss.iter())
2138 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2139
2140 let flba_col = batch.column(10).as_fixed_size_binary();
2142 let flba_bss = batch.column(11).as_fixed_size_binary();
2143 assert_eq!(flba_col.len(), flba_bss.len());
2144 flba_col
2145 .iter()
2146 .zip(flba_bss.iter())
2147 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2148
2149 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2151 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2152 assert_eq!(dec_col.len(), dec_bss.len());
2153 dec_col
2154 .iter()
2155 .zip(dec_bss.iter())
2156 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2157 }
2158 assert_eq!(row_count, 200);
2159 }
2160
2161 #[test]
2162 fn test_read_incorrect_map_schema_file() {
2163 let testdata = arrow::util::test_util::parquet_test_data();
2164 let path = format!("{testdata}/incorrect_map_schema.parquet");
2166 let file = File::open(path).unwrap();
2167 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2168
2169 let batch = record_reader.next().unwrap().unwrap();
2170 assert_eq!(batch.num_rows(), 1);
2171
2172 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2173 "my_map",
2174 ArrowDataType::Map(
2175 Arc::new(Field::new(
2176 "key_value",
2177 ArrowDataType::Struct(Fields::from(vec![
2178 Field::new("key", ArrowDataType::Utf8, false),
2179 Field::new("value", ArrowDataType::Utf8, true),
2180 ])),
2181 false,
2182 )),
2183 false,
2184 ),
2185 true,
2186 )]));
2187 assert_eq!(batch.schema().as_ref(), &expected_schema);
2188
2189 assert_eq!(batch.num_rows(), 1);
2190 assert_eq!(batch.column(0).null_count(), 0);
2191 assert_eq!(
2192 batch.column(0).as_map().keys().as_ref(),
2193 &StringArray::from(vec!["parent", "name"])
2194 );
2195 assert_eq!(
2196 batch.column(0).as_map().values().as_ref(),
2197 &StringArray::from(vec!["another", "report"])
2198 );
2199 }
2200
2201 #[derive(Clone)]
2203 struct TestOptions {
2204 num_row_groups: usize,
2207 num_rows: usize,
2209 record_batch_size: usize,
2211 null_percent: Option<usize>,
2213 write_batch_size: usize,
2218 max_data_page_size: usize,
2220 max_dict_page_size: usize,
2222 writer_version: WriterVersion,
2224 enabled_statistics: EnabledStatistics,
2226 encoding: Encoding,
2228 row_selections: Option<(RowSelection, usize)>,
2230 row_filter: Option<Vec<bool>>,
2232 limit: Option<usize>,
2234 offset: Option<usize>,
2236 }
2237
2238 impl std::fmt::Debug for TestOptions {
2240 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2241 f.debug_struct("TestOptions")
2242 .field("num_row_groups", &self.num_row_groups)
2243 .field("num_rows", &self.num_rows)
2244 .field("record_batch_size", &self.record_batch_size)
2245 .field("null_percent", &self.null_percent)
2246 .field("write_batch_size", &self.write_batch_size)
2247 .field("max_data_page_size", &self.max_data_page_size)
2248 .field("max_dict_page_size", &self.max_dict_page_size)
2249 .field("writer_version", &self.writer_version)
2250 .field("enabled_statistics", &self.enabled_statistics)
2251 .field("encoding", &self.encoding)
2252 .field("row_selections", &self.row_selections.is_some())
2253 .field("row_filter", &self.row_filter.is_some())
2254 .field("limit", &self.limit)
2255 .field("offset", &self.offset)
2256 .finish()
2257 }
2258 }
2259
2260 impl Default for TestOptions {
2261 fn default() -> Self {
2262 Self {
2263 num_row_groups: 2,
2264 num_rows: 100,
2265 record_batch_size: 15,
2266 null_percent: None,
2267 write_batch_size: 64,
2268 max_data_page_size: 1024 * 1024,
2269 max_dict_page_size: 1024 * 1024,
2270 writer_version: WriterVersion::PARQUET_1_0,
2271 enabled_statistics: EnabledStatistics::Page,
2272 encoding: Encoding::PLAIN,
2273 row_selections: None,
2274 row_filter: None,
2275 limit: None,
2276 offset: None,
2277 }
2278 }
2279 }
2280
2281 impl TestOptions {
2282 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2283 Self {
2284 num_row_groups,
2285 num_rows,
2286 record_batch_size,
2287 ..Default::default()
2288 }
2289 }
2290
2291 fn with_null_percent(self, null_percent: usize) -> Self {
2292 Self {
2293 null_percent: Some(null_percent),
2294 ..self
2295 }
2296 }
2297
2298 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2299 Self {
2300 max_data_page_size,
2301 ..self
2302 }
2303 }
2304
2305 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2306 Self {
2307 max_dict_page_size,
2308 ..self
2309 }
2310 }
2311
2312 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2313 Self {
2314 enabled_statistics,
2315 ..self
2316 }
2317 }
2318
2319 fn with_row_selections(self) -> Self {
2320 assert!(self.row_filter.is_none(), "Must set row selection first");
2321
2322 let mut rng = rng();
2323 let step = rng.random_range(self.record_batch_size..self.num_rows);
2324 let row_selections = create_test_selection(
2325 step,
2326 self.num_row_groups * self.num_rows,
2327 rng.random::<bool>(),
2328 );
2329 Self {
2330 row_selections: Some(row_selections),
2331 ..self
2332 }
2333 }
2334
2335 fn with_row_filter(self) -> Self {
2336 let row_count = match &self.row_selections {
2337 Some((_, count)) => *count,
2338 None => self.num_row_groups * self.num_rows,
2339 };
2340
2341 let mut rng = rng();
2342 Self {
2343 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2344 ..self
2345 }
2346 }
2347
2348 fn with_limit(self, limit: usize) -> Self {
2349 Self {
2350 limit: Some(limit),
2351 ..self
2352 }
2353 }
2354
2355 fn with_offset(self, offset: usize) -> Self {
2356 Self {
2357 offset: Some(offset),
2358 ..self
2359 }
2360 }
2361
2362 fn writer_props(&self) -> WriterProperties {
2363 let builder = WriterProperties::builder()
2364 .set_data_page_size_limit(self.max_data_page_size)
2365 .set_write_batch_size(self.write_batch_size)
2366 .set_writer_version(self.writer_version)
2367 .set_statistics_enabled(self.enabled_statistics);
2368
2369 let builder = match self.encoding {
2370 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2371 .set_dictionary_enabled(true)
2372 .set_dictionary_page_size_limit(self.max_dict_page_size),
2373 _ => builder
2374 .set_dictionary_enabled(false)
2375 .set_encoding(self.encoding),
2376 };
2377
2378 builder.build()
2379 }
2380 }
2381
2382 fn run_single_column_reader_tests<T, F, G>(
2389 rand_max: i32,
2390 converted_type: ConvertedType,
2391 arrow_type: Option<ArrowDataType>,
2392 converter: F,
2393 encodings: &[Encoding],
2394 ) where
2395 T: DataType,
2396 G: RandGen<T>,
2397 F: Fn(&[Option<T::T>]) -> ArrayRef,
2398 {
2399 let all_options = vec![
2400 TestOptions::new(2, 100, 15),
2403 TestOptions::new(3, 25, 5),
2408 TestOptions::new(4, 100, 25),
2412 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2414 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2416 TestOptions::new(2, 256, 127).with_null_percent(0),
2418 TestOptions::new(2, 256, 93).with_null_percent(25),
2420 TestOptions::new(4, 100, 25).with_limit(0),
2422 TestOptions::new(4, 100, 25).with_limit(50),
2424 TestOptions::new(4, 100, 25).with_limit(10),
2426 TestOptions::new(4, 100, 25).with_limit(101),
2428 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2430 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2432 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2434 TestOptions::new(2, 256, 91)
2436 .with_null_percent(25)
2437 .with_enabled_statistics(EnabledStatistics::Chunk),
2438 TestOptions::new(2, 256, 91)
2440 .with_null_percent(25)
2441 .with_enabled_statistics(EnabledStatistics::None),
2442 TestOptions::new(2, 128, 91)
2444 .with_null_percent(100)
2445 .with_enabled_statistics(EnabledStatistics::None),
2446 TestOptions::new(2, 100, 15).with_row_selections(),
2451 TestOptions::new(3, 25, 5).with_row_selections(),
2456 TestOptions::new(4, 100, 25).with_row_selections(),
2460 TestOptions::new(3, 256, 73)
2462 .with_max_data_page_size(128)
2463 .with_row_selections(),
2464 TestOptions::new(3, 256, 57)
2466 .with_max_dict_page_size(128)
2467 .with_row_selections(),
2468 TestOptions::new(2, 256, 127)
2470 .with_null_percent(0)
2471 .with_row_selections(),
2472 TestOptions::new(2, 256, 93)
2474 .with_null_percent(25)
2475 .with_row_selections(),
2476 TestOptions::new(2, 256, 93)
2478 .with_null_percent(25)
2479 .with_row_selections()
2480 .with_limit(10),
2481 TestOptions::new(2, 256, 93)
2483 .with_null_percent(25)
2484 .with_row_selections()
2485 .with_offset(20)
2486 .with_limit(10),
2487 TestOptions::new(4, 100, 25).with_row_filter(),
2491 TestOptions::new(4, 100, 25)
2493 .with_row_selections()
2494 .with_row_filter(),
2495 TestOptions::new(2, 256, 93)
2497 .with_null_percent(25)
2498 .with_max_data_page_size(10)
2499 .with_row_filter(),
2500 TestOptions::new(2, 256, 93)
2502 .with_null_percent(25)
2503 .with_max_data_page_size(10)
2504 .with_row_selections()
2505 .with_row_filter(),
2506 TestOptions::new(2, 256, 93)
2508 .with_enabled_statistics(EnabledStatistics::None)
2509 .with_max_data_page_size(10)
2510 .with_row_selections(),
2511 ];
2512
2513 all_options.into_iter().for_each(|opts| {
2514 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2515 for encoding in encodings {
2516 let opts = TestOptions {
2517 writer_version,
2518 encoding: *encoding,
2519 ..opts.clone()
2520 };
2521
2522 single_column_reader_test::<T, _, G>(
2523 opts,
2524 rand_max,
2525 converted_type,
2526 arrow_type.clone(),
2527 &converter,
2528 )
2529 }
2530 }
2531 });
2532 }
2533
2534 fn single_column_reader_test<T, F, G>(
2538 opts: TestOptions,
2539 rand_max: i32,
2540 converted_type: ConvertedType,
2541 arrow_type: Option<ArrowDataType>,
2542 converter: F,
2543 ) where
2544 T: DataType,
2545 G: RandGen<T>,
2546 F: Fn(&[Option<T::T>]) -> ArrayRef,
2547 {
2548 println!(
2550 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2551 T::get_physical_type(), converted_type, arrow_type, opts
2552 );
2553
2554 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2556 Some(null_percent) => {
2557 let mut rng = rng();
2558
2559 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2560 .map(|_| {
2561 std::iter::from_fn(|| {
2562 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2563 })
2564 .take(opts.num_rows)
2565 .collect()
2566 })
2567 .collect();
2568 (Repetition::OPTIONAL, Some(def_levels))
2569 }
2570 None => (Repetition::REQUIRED, None),
2571 };
2572
2573 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2575 .map(|idx| {
2576 let null_count = match def_levels.as_ref() {
2577 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2578 None => 0,
2579 };
2580 G::gen_vec(rand_max, opts.num_rows - null_count)
2581 })
2582 .collect();
2583
2584 let len = match T::get_physical_type() {
2585 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2586 crate::basic::Type::INT96 => 12,
2587 _ => -1,
2588 };
2589
2590 let fields = vec![Arc::new(
2591 Type::primitive_type_builder("leaf", T::get_physical_type())
2592 .with_repetition(repetition)
2593 .with_converted_type(converted_type)
2594 .with_length(len)
2595 .build()
2596 .unwrap(),
2597 )];
2598
2599 let schema = Arc::new(
2600 Type::group_type_builder("test_schema")
2601 .with_fields(fields)
2602 .build()
2603 .unwrap(),
2604 );
2605
2606 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2607
2608 let mut file = tempfile::tempfile().unwrap();
2609
2610 generate_single_column_file_with_data::<T>(
2611 &values,
2612 def_levels.as_ref(),
2613 file.try_clone().unwrap(), schema,
2615 arrow_field,
2616 &opts,
2617 )
2618 .unwrap();
2619
2620 file.rewind().unwrap();
2621
2622 let options = ArrowReaderOptions::new()
2623 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2624
2625 let mut builder =
2626 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2627
2628 let expected_data = match opts.row_selections {
2629 Some((selections, row_count)) => {
2630 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2631
2632 let mut skip_data: Vec<Option<T::T>> = vec![];
2633 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2634 for select in dequeue {
2635 if select.skip {
2636 without_skip_data.drain(0..select.row_count);
2637 } else {
2638 skip_data.extend(without_skip_data.drain(0..select.row_count));
2639 }
2640 }
2641 builder = builder.with_row_selection(selections);
2642
2643 assert_eq!(skip_data.len(), row_count);
2644 skip_data
2645 }
2646 None => {
2647 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2649 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2650 expected_data
2651 }
2652 };
2653
2654 let mut expected_data = match opts.row_filter {
2655 Some(filter) => {
2656 let expected_data = expected_data
2657 .into_iter()
2658 .zip(filter.iter())
2659 .filter_map(|(d, f)| f.then(|| d))
2660 .collect();
2661
2662 let mut filter_offset = 0;
2663 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2664 ProjectionMask::all(),
2665 move |b| {
2666 let array = BooleanArray::from_iter(
2667 filter
2668 .iter()
2669 .skip(filter_offset)
2670 .take(b.num_rows())
2671 .map(|x| Some(*x)),
2672 );
2673 filter_offset += b.num_rows();
2674 Ok(array)
2675 },
2676 ))]);
2677
2678 builder = builder.with_row_filter(filter);
2679 expected_data
2680 }
2681 None => expected_data,
2682 };
2683
2684 if let Some(offset) = opts.offset {
2685 builder = builder.with_offset(offset);
2686 expected_data = expected_data.into_iter().skip(offset).collect();
2687 }
2688
2689 if let Some(limit) = opts.limit {
2690 builder = builder.with_limit(limit);
2691 expected_data = expected_data.into_iter().take(limit).collect();
2692 }
2693
2694 let mut record_reader = builder
2695 .with_batch_size(opts.record_batch_size)
2696 .build()
2697 .unwrap();
2698
2699 let mut total_read = 0;
2700 loop {
2701 let maybe_batch = record_reader.next();
2702 if total_read < expected_data.len() {
2703 let end = min(total_read + opts.record_batch_size, expected_data.len());
2704 let batch = maybe_batch.unwrap().unwrap();
2705 assert_eq!(end - total_read, batch.num_rows());
2706
2707 let a = converter(&expected_data[total_read..end]);
2708 let b = Arc::clone(batch.column(0));
2709
2710 assert_eq!(a.data_type(), b.data_type());
2711 assert_eq!(a.to_data(), b.to_data());
2712 assert_eq!(
2713 a.as_any().type_id(),
2714 b.as_any().type_id(),
2715 "incorrect type ids"
2716 );
2717
2718 total_read = end;
2719 } else {
2720 assert!(maybe_batch.is_none());
2721 break;
2722 }
2723 }
2724 }
2725
2726 fn gen_expected_data<T: DataType>(
2727 def_levels: Option<&Vec<Vec<i16>>>,
2728 values: &[Vec<T::T>],
2729 ) -> Vec<Option<T::T>> {
2730 let data: Vec<Option<T::T>> = match def_levels {
2731 Some(levels) => {
2732 let mut values_iter = values.iter().flatten();
2733 levels
2734 .iter()
2735 .flatten()
2736 .map(|d| match d {
2737 1 => Some(values_iter.next().cloned().unwrap()),
2738 0 => None,
2739 _ => unreachable!(),
2740 })
2741 .collect()
2742 }
2743 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2744 };
2745 data
2746 }
2747
2748 fn generate_single_column_file_with_data<T: DataType>(
2749 values: &[Vec<T::T>],
2750 def_levels: Option<&Vec<Vec<i16>>>,
2751 file: File,
2752 schema: TypePtr,
2753 field: Option<Field>,
2754 opts: &TestOptions,
2755 ) -> Result<crate::format::FileMetaData> {
2756 let mut writer_props = opts.writer_props();
2757 if let Some(field) = field {
2758 let arrow_schema = Schema::new(vec![field]);
2759 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2760 }
2761
2762 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2763
2764 for (idx, v) in values.iter().enumerate() {
2765 let def_levels = def_levels.map(|d| d[idx].as_slice());
2766 let mut row_group_writer = writer.next_row_group()?;
2767 {
2768 let mut column_writer = row_group_writer
2769 .next_column()?
2770 .expect("Column writer is none!");
2771
2772 column_writer
2773 .typed::<T>()
2774 .write_batch(v, def_levels, None)?;
2775
2776 column_writer.close()?;
2777 }
2778 row_group_writer.close()?;
2779 }
2780
2781 writer.close()
2782 }
2783
2784 fn get_test_file(file_name: &str) -> File {
2785 let mut path = PathBuf::new();
2786 path.push(arrow::util::test_util::arrow_test_data());
2787 path.push(file_name);
2788
2789 File::open(path.as_path()).expect("File not found!")
2790 }
2791
2792 #[test]
2793 fn test_read_structs() {
2794 let testdata = arrow::util::test_util::parquet_test_data();
2798 let path = format!("{testdata}/nested_structs.rust.parquet");
2799 let file = File::open(&path).unwrap();
2800 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2801
2802 for batch in record_batch_reader {
2803 batch.unwrap();
2804 }
2805
2806 let file = File::open(&path).unwrap();
2807 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2808
2809 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2810 let projected_reader = builder
2811 .with_projection(mask)
2812 .with_batch_size(60)
2813 .build()
2814 .unwrap();
2815
2816 let expected_schema = Schema::new(vec![
2817 Field::new(
2818 "roll_num",
2819 ArrowDataType::Struct(Fields::from(vec![Field::new(
2820 "count",
2821 ArrowDataType::UInt64,
2822 false,
2823 )])),
2824 false,
2825 ),
2826 Field::new(
2827 "PC_CUR",
2828 ArrowDataType::Struct(Fields::from(vec![
2829 Field::new("mean", ArrowDataType::Int64, false),
2830 Field::new("sum", ArrowDataType::Int64, false),
2831 ])),
2832 false,
2833 ),
2834 ]);
2835
2836 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2838
2839 for batch in projected_reader {
2840 let batch = batch.unwrap();
2841 assert_eq!(batch.schema().as_ref(), &expected_schema);
2842 }
2843 }
2844
2845 #[test]
2846 fn test_read_structs_by_name() {
2848 let testdata = arrow::util::test_util::parquet_test_data();
2849 let path = format!("{testdata}/nested_structs.rust.parquet");
2850 let file = File::open(&path).unwrap();
2851 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2852
2853 for batch in record_batch_reader {
2854 batch.unwrap();
2855 }
2856
2857 let file = File::open(&path).unwrap();
2858 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2859
2860 let mask = ProjectionMask::columns(
2861 builder.parquet_schema(),
2862 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2863 );
2864 let projected_reader = builder
2865 .with_projection(mask)
2866 .with_batch_size(60)
2867 .build()
2868 .unwrap();
2869
2870 let expected_schema = Schema::new(vec![
2871 Field::new(
2872 "roll_num",
2873 ArrowDataType::Struct(Fields::from(vec![Field::new(
2874 "count",
2875 ArrowDataType::UInt64,
2876 false,
2877 )])),
2878 false,
2879 ),
2880 Field::new(
2881 "PC_CUR",
2882 ArrowDataType::Struct(Fields::from(vec![
2883 Field::new("mean", ArrowDataType::Int64, false),
2884 Field::new("sum", ArrowDataType::Int64, false),
2885 ])),
2886 false,
2887 ),
2888 ]);
2889
2890 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2891
2892 for batch in projected_reader {
2893 let batch = batch.unwrap();
2894 assert_eq!(batch.schema().as_ref(), &expected_schema);
2895 }
2896 }
2897
2898 #[test]
2899 fn test_read_maps() {
2900 let testdata = arrow::util::test_util::parquet_test_data();
2901 let path = format!("{testdata}/nested_maps.snappy.parquet");
2902 let file = File::open(path).unwrap();
2903 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2904
2905 for batch in record_batch_reader {
2906 batch.unwrap();
2907 }
2908 }
2909
2910 #[test]
2911 fn test_nested_nullability() {
2912 let message_type = "message nested {
2913 OPTIONAL Group group {
2914 REQUIRED INT32 leaf;
2915 }
2916 }";
2917
2918 let file = tempfile::tempfile().unwrap();
2919 let schema = Arc::new(parse_message_type(message_type).unwrap());
2920
2921 {
2922 let mut writer =
2924 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2925 .unwrap();
2926
2927 {
2928 let mut row_group_writer = writer.next_row_group().unwrap();
2929 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2930
2931 column_writer
2932 .typed::<Int32Type>()
2933 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2934 .unwrap();
2935
2936 column_writer.close().unwrap();
2937 row_group_writer.close().unwrap();
2938 }
2939
2940 writer.close().unwrap();
2941 }
2942
2943 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2944 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2945
2946 let reader = builder.with_projection(mask).build().unwrap();
2947
2948 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2949 "group",
2950 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2951 true,
2952 )]));
2953
2954 let batch = reader.into_iter().next().unwrap().unwrap();
2955 assert_eq!(batch.schema().as_ref(), &expected_schema);
2956 assert_eq!(batch.num_rows(), 4);
2957 assert_eq!(batch.column(0).null_count(), 2);
2958 }
2959
2960 #[test]
2961 fn test_invalid_utf8() {
2962 let data = vec![
2964 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2965 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2966 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2967 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2968 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2969 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2970 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2971 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2972 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2973 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2974 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2975 82, 49,
2976 ];
2977
2978 let file = Bytes::from(data);
2979 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2980
2981 let error = record_batch_reader.next().unwrap().unwrap_err();
2982
2983 assert!(
2984 error.to_string().contains("invalid utf-8 sequence"),
2985 "{}",
2986 error
2987 );
2988 }
2989
2990 #[test]
2991 fn test_invalid_utf8_string_array() {
2992 test_invalid_utf8_string_array_inner::<i32>();
2993 }
2994
2995 #[test]
2996 fn test_invalid_utf8_large_string_array() {
2997 test_invalid_utf8_string_array_inner::<i64>();
2998 }
2999
3000 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3001 let cases = [
3002 invalid_utf8_first_char::<O>(),
3003 invalid_utf8_first_char_long_strings::<O>(),
3004 invalid_utf8_later_char::<O>(),
3005 invalid_utf8_later_char_long_strings::<O>(),
3006 invalid_utf8_later_char_really_long_strings::<O>(),
3007 invalid_utf8_later_char_really_long_strings2::<O>(),
3008 ];
3009 for array in &cases {
3010 for encoding in STRING_ENCODINGS {
3011 let array = unsafe {
3014 GenericStringArray::<O>::new_unchecked(
3015 array.offsets().clone(),
3016 array.values().clone(),
3017 array.nulls().cloned(),
3018 )
3019 };
3020 let data_type = array.data_type().clone();
3021 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3022 let err = read_from_parquet(data).unwrap_err();
3023 let expected_err =
3024 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3025 assert!(
3026 err.to_string().contains(expected_err),
3027 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3028 );
3029 }
3030 }
3031 }
3032
3033 #[test]
3034 fn test_invalid_utf8_string_view_array() {
3035 let cases = [
3036 invalid_utf8_first_char::<i32>(),
3037 invalid_utf8_first_char_long_strings::<i32>(),
3038 invalid_utf8_later_char::<i32>(),
3039 invalid_utf8_later_char_long_strings::<i32>(),
3040 invalid_utf8_later_char_really_long_strings::<i32>(),
3041 invalid_utf8_later_char_really_long_strings2::<i32>(),
3042 ];
3043
3044 for encoding in STRING_ENCODINGS {
3045 for array in &cases {
3046 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3047 let array = array.as_binary_view();
3048
3049 let array = unsafe {
3052 StringViewArray::new_unchecked(
3053 array.views().clone(),
3054 array.data_buffers().to_vec(),
3055 array.nulls().cloned(),
3056 )
3057 };
3058
3059 let data_type = array.data_type().clone();
3060 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3061 let err = read_from_parquet(data).unwrap_err();
3062 let expected_err =
3063 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3064 assert!(
3065 err.to_string().contains(expected_err),
3066 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3067 );
3068 }
3069 }
3070 }
3071
3072 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3074 None,
3075 Some(Encoding::PLAIN),
3076 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3077 Some(Encoding::DELTA_BYTE_ARRAY),
3078 ];
3079
3080 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3083
3084 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3087
3088 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3090 let valid: &[u8] = b" ";
3091 let invalid = INVALID_UTF8_FIRST_CHAR;
3092 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3093 }
3094
3095 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3099 let valid: &[u8] = b" ";
3100 let mut invalid = vec![];
3101 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3102 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3103 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3104 }
3105
3106 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3109 let valid: &[u8] = b" ";
3110 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3111 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3112 }
3113
3114 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3118 let valid: &[u8] = b" ";
3119 let mut invalid = vec![];
3120 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3121 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3122 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3123 }
3124
3125 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3129 let valid: &[u8] = b" ";
3130 let mut invalid = vec![];
3131 for _ in 0..10 {
3132 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3134 }
3135 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3136 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3137 }
3138
3139 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3142 let valid: &[u8] = b" ";
3143 let mut valid_long = vec![];
3144 for _ in 0..10 {
3145 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3147 }
3148 let invalid = INVALID_UTF8_LATER_CHAR;
3149 GenericBinaryArray::<O>::from_iter(vec![
3150 None,
3151 Some(valid),
3152 Some(invalid),
3153 None,
3154 Some(&valid_long),
3155 Some(valid),
3156 ])
3157 }
3158
3159 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3164 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3165 let mut data = vec![];
3166 let schema = batch.schema();
3167 let props = encoding.map(|encoding| {
3168 WriterProperties::builder()
3169 .set_dictionary_enabled(false)
3171 .set_encoding(encoding)
3172 .build()
3173 });
3174
3175 {
3176 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3177 writer.write(&batch).unwrap();
3178 writer.flush().unwrap();
3179 writer.close().unwrap();
3180 };
3181 data
3182 }
3183
3184 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3186 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3187 .unwrap()
3188 .build()
3189 .unwrap();
3190
3191 reader.collect()
3192 }
3193
3194 #[test]
3195 fn test_dictionary_preservation() {
3196 let fields = vec![Arc::new(
3197 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3198 .with_repetition(Repetition::OPTIONAL)
3199 .with_converted_type(ConvertedType::UTF8)
3200 .build()
3201 .unwrap(),
3202 )];
3203
3204 let schema = Arc::new(
3205 Type::group_type_builder("test_schema")
3206 .with_fields(fields)
3207 .build()
3208 .unwrap(),
3209 );
3210
3211 let dict_type = ArrowDataType::Dictionary(
3212 Box::new(ArrowDataType::Int32),
3213 Box::new(ArrowDataType::Utf8),
3214 );
3215
3216 let arrow_field = Field::new("leaf", dict_type, true);
3217
3218 let mut file = tempfile::tempfile().unwrap();
3219
3220 let values = vec![
3221 vec![
3222 ByteArray::from("hello"),
3223 ByteArray::from("a"),
3224 ByteArray::from("b"),
3225 ByteArray::from("d"),
3226 ],
3227 vec![
3228 ByteArray::from("c"),
3229 ByteArray::from("a"),
3230 ByteArray::from("b"),
3231 ],
3232 ];
3233
3234 let def_levels = vec![
3235 vec![1, 0, 0, 1, 0, 0, 1, 1],
3236 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3237 ];
3238
3239 let opts = TestOptions {
3240 encoding: Encoding::RLE_DICTIONARY,
3241 ..Default::default()
3242 };
3243
3244 generate_single_column_file_with_data::<ByteArrayType>(
3245 &values,
3246 Some(&def_levels),
3247 file.try_clone().unwrap(), schema,
3249 Some(arrow_field),
3250 &opts,
3251 )
3252 .unwrap();
3253
3254 file.rewind().unwrap();
3255
3256 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3257
3258 let batches = record_reader
3259 .collect::<Result<Vec<RecordBatch>, _>>()
3260 .unwrap();
3261
3262 assert_eq!(batches.len(), 6);
3263 assert!(batches.iter().all(|x| x.num_columns() == 1));
3264
3265 let row_counts = batches
3266 .iter()
3267 .map(|x| (x.num_rows(), x.column(0).null_count()))
3268 .collect::<Vec<_>>();
3269
3270 assert_eq!(
3271 row_counts,
3272 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3273 );
3274
3275 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3276
3277 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3279 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3281 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3282 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3284 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3285 }
3286
3287 #[test]
3288 fn test_read_null_list() {
3289 let testdata = arrow::util::test_util::parquet_test_data();
3290 let path = format!("{testdata}/null_list.parquet");
3291 let file = File::open(path).unwrap();
3292 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3293
3294 let batch = record_batch_reader.next().unwrap().unwrap();
3295 assert_eq!(batch.num_rows(), 1);
3296 assert_eq!(batch.num_columns(), 1);
3297 assert_eq!(batch.column(0).len(), 1);
3298
3299 let list = batch
3300 .column(0)
3301 .as_any()
3302 .downcast_ref::<ListArray>()
3303 .unwrap();
3304 assert_eq!(list.len(), 1);
3305 assert!(list.is_valid(0));
3306
3307 let val = list.value(0);
3308 assert_eq!(val.len(), 0);
3309 }
3310
3311 #[test]
3312 fn test_null_schema_inference() {
3313 let testdata = arrow::util::test_util::parquet_test_data();
3314 let path = format!("{testdata}/null_list.parquet");
3315 let file = File::open(path).unwrap();
3316
3317 let arrow_field = Field::new(
3318 "emptylist",
3319 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3320 true,
3321 );
3322
3323 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3324 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3325 let schema = builder.schema();
3326 assert_eq!(schema.fields().len(), 1);
3327 assert_eq!(schema.field(0), &arrow_field);
3328 }
3329
3330 #[test]
3331 fn test_skip_metadata() {
3332 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3333 let field = Field::new("col", col.data_type().clone(), true);
3334
3335 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3336
3337 let metadata = [("key".to_string(), "value".to_string())]
3338 .into_iter()
3339 .collect();
3340
3341 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3342
3343 assert_ne!(schema_with_metadata, schema_without_metadata);
3344
3345 let batch =
3346 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3347
3348 let file = |version: WriterVersion| {
3349 let props = WriterProperties::builder()
3350 .set_writer_version(version)
3351 .build();
3352
3353 let file = tempfile().unwrap();
3354 let mut writer =
3355 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3356 .unwrap();
3357 writer.write(&batch).unwrap();
3358 writer.close().unwrap();
3359 file
3360 };
3361
3362 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3363
3364 let v1_reader = file(WriterVersion::PARQUET_1_0);
3365 let v2_reader = file(WriterVersion::PARQUET_2_0);
3366
3367 let arrow_reader =
3368 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3369 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3370
3371 let reader =
3372 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3373 .unwrap()
3374 .build()
3375 .unwrap();
3376 assert_eq!(reader.schema(), schema_without_metadata);
3377
3378 let arrow_reader =
3379 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3380 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3381
3382 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3383 .unwrap()
3384 .build()
3385 .unwrap();
3386 assert_eq!(reader.schema(), schema_without_metadata);
3387 }
3388
3389 fn write_parquet_from_iter<I, F>(value: I) -> File
3390 where
3391 I: IntoIterator<Item = (F, ArrayRef)>,
3392 F: AsRef<str>,
3393 {
3394 let batch = RecordBatch::try_from_iter(value).unwrap();
3395 let file = tempfile().unwrap();
3396 let mut writer =
3397 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3398 writer.write(&batch).unwrap();
3399 writer.close().unwrap();
3400 file
3401 }
3402
3403 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3404 where
3405 I: IntoIterator<Item = (F, ArrayRef)>,
3406 F: AsRef<str>,
3407 {
3408 let file = write_parquet_from_iter(value);
3409 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3410 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3411 file.try_clone().unwrap(),
3412 options_with_schema,
3413 );
3414 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3415 }
3416
3417 #[test]
3418 fn test_schema_too_few_columns() {
3419 run_schema_test_with_error(
3420 vec![
3421 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3422 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3423 ],
3424 Arc::new(Schema::new(vec![Field::new(
3425 "int64",
3426 ArrowDataType::Int64,
3427 false,
3428 )])),
3429 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3430 );
3431 }
3432
3433 #[test]
3434 fn test_schema_too_many_columns() {
3435 run_schema_test_with_error(
3436 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3437 Arc::new(Schema::new(vec![
3438 Field::new("int64", ArrowDataType::Int64, false),
3439 Field::new("int32", ArrowDataType::Int32, false),
3440 ])),
3441 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3442 );
3443 }
3444
3445 #[test]
3446 fn test_schema_mismatched_column_names() {
3447 run_schema_test_with_error(
3448 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3449 Arc::new(Schema::new(vec![Field::new(
3450 "other",
3451 ArrowDataType::Int64,
3452 false,
3453 )])),
3454 "Arrow: incompatible arrow schema, expected field named int64 got other",
3455 );
3456 }
3457
3458 #[test]
3459 fn test_schema_incompatible_columns() {
3460 run_schema_test_with_error(
3461 vec![
3462 (
3463 "col1_invalid",
3464 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3465 ),
3466 (
3467 "col2_valid",
3468 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3469 ),
3470 (
3471 "col3_invalid",
3472 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3473 ),
3474 ],
3475 Arc::new(Schema::new(vec![
3476 Field::new("col1_invalid", ArrowDataType::Int32, false),
3477 Field::new("col2_valid", ArrowDataType::Int32, false),
3478 Field::new("col3_invalid", ArrowDataType::Int32, false),
3479 ])),
3480 "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3481 );
3482 }
3483
3484 #[test]
3485 fn test_one_incompatible_nested_column() {
3486 let nested_fields = Fields::from(vec![
3487 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3488 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3489 ]);
3490 let nested = StructArray::try_new(
3491 nested_fields,
3492 vec![
3493 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3494 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3495 ],
3496 None,
3497 )
3498 .expect("struct array");
3499 let supplied_nested_fields = Fields::from(vec![
3500 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3501 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3502 ]);
3503 run_schema_test_with_error(
3504 vec![
3505 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3506 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3507 ("nested", Arc::new(nested) as ArrayRef),
3508 ],
3509 Arc::new(Schema::new(vec![
3510 Field::new("col1", ArrowDataType::Int64, false),
3511 Field::new("col2", ArrowDataType::Int32, false),
3512 Field::new(
3513 "nested",
3514 ArrowDataType::Struct(supplied_nested_fields),
3515 false,
3516 ),
3517 ])),
3518 "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3519 );
3520 }
3521
3522 #[test]
3523 fn test_read_binary_as_utf8() {
3524 let file = write_parquet_from_iter(vec![
3525 (
3526 "binary_to_utf8",
3527 Arc::new(BinaryArray::from(vec![
3528 b"one".as_ref(),
3529 b"two".as_ref(),
3530 b"three".as_ref(),
3531 ])) as ArrayRef,
3532 ),
3533 (
3534 "large_binary_to_large_utf8",
3535 Arc::new(LargeBinaryArray::from(vec![
3536 b"one".as_ref(),
3537 b"two".as_ref(),
3538 b"three".as_ref(),
3539 ])) as ArrayRef,
3540 ),
3541 (
3542 "binary_view_to_utf8_view",
3543 Arc::new(BinaryViewArray::from(vec![
3544 b"one".as_ref(),
3545 b"two".as_ref(),
3546 b"three".as_ref(),
3547 ])) as ArrayRef,
3548 ),
3549 ]);
3550 let supplied_fields = Fields::from(vec![
3551 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3552 Field::new(
3553 "large_binary_to_large_utf8",
3554 ArrowDataType::LargeUtf8,
3555 false,
3556 ),
3557 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3558 ]);
3559
3560 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3561 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3562 file.try_clone().unwrap(),
3563 options,
3564 )
3565 .expect("reader builder with schema")
3566 .build()
3567 .expect("reader with schema");
3568
3569 let batch = arrow_reader.next().unwrap().unwrap();
3570 assert_eq!(batch.num_columns(), 3);
3571 assert_eq!(batch.num_rows(), 3);
3572 assert_eq!(
3573 batch
3574 .column(0)
3575 .as_string::<i32>()
3576 .iter()
3577 .collect::<Vec<_>>(),
3578 vec![Some("one"), Some("two"), Some("three")]
3579 );
3580
3581 assert_eq!(
3582 batch
3583 .column(1)
3584 .as_string::<i64>()
3585 .iter()
3586 .collect::<Vec<_>>(),
3587 vec![Some("one"), Some("two"), Some("three")]
3588 );
3589
3590 assert_eq!(
3591 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3592 vec![Some("one"), Some("two"), Some("three")]
3593 );
3594 }
3595
3596 #[test]
3597 #[should_panic(expected = "Invalid UTF8 sequence at")]
3598 fn test_read_non_utf8_binary_as_utf8() {
3599 let file = write_parquet_from_iter(vec![(
3600 "non_utf8_binary",
3601 Arc::new(BinaryArray::from(vec![
3602 b"\xDE\x00\xFF".as_ref(),
3603 b"\xDE\x01\xAA".as_ref(),
3604 b"\xDE\x02\xFF".as_ref(),
3605 ])) as ArrayRef,
3606 )]);
3607 let supplied_fields = Fields::from(vec![Field::new(
3608 "non_utf8_binary",
3609 ArrowDataType::Utf8,
3610 false,
3611 )]);
3612
3613 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3614 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3615 file.try_clone().unwrap(),
3616 options,
3617 )
3618 .expect("reader builder with schema")
3619 .build()
3620 .expect("reader with schema");
3621 arrow_reader.next().unwrap().unwrap_err();
3622 }
3623
3624 #[test]
3625 fn test_with_schema() {
3626 let nested_fields = Fields::from(vec![
3627 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3628 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3629 ]);
3630
3631 let nested_arrays: Vec<ArrayRef> = vec![
3632 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3633 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3634 ];
3635
3636 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3637
3638 let file = write_parquet_from_iter(vec![
3639 (
3640 "int32_to_ts_second",
3641 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3642 ),
3643 (
3644 "date32_to_date64",
3645 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3646 ),
3647 ("nested", Arc::new(nested) as ArrayRef),
3648 ]);
3649
3650 let supplied_nested_fields = Fields::from(vec![
3651 Field::new(
3652 "utf8_to_dict",
3653 ArrowDataType::Dictionary(
3654 Box::new(ArrowDataType::Int32),
3655 Box::new(ArrowDataType::Utf8),
3656 ),
3657 false,
3658 ),
3659 Field::new(
3660 "int64_to_ts_nano",
3661 ArrowDataType::Timestamp(
3662 arrow::datatypes::TimeUnit::Nanosecond,
3663 Some("+10:00".into()),
3664 ),
3665 false,
3666 ),
3667 ]);
3668
3669 let supplied_schema = Arc::new(Schema::new(vec![
3670 Field::new(
3671 "int32_to_ts_second",
3672 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3673 false,
3674 ),
3675 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3676 Field::new(
3677 "nested",
3678 ArrowDataType::Struct(supplied_nested_fields),
3679 false,
3680 ),
3681 ]));
3682
3683 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3684 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3685 file.try_clone().unwrap(),
3686 options,
3687 )
3688 .expect("reader builder with schema")
3689 .build()
3690 .expect("reader with schema");
3691
3692 assert_eq!(arrow_reader.schema(), supplied_schema);
3693 let batch = arrow_reader.next().unwrap().unwrap();
3694 assert_eq!(batch.num_columns(), 3);
3695 assert_eq!(batch.num_rows(), 4);
3696 assert_eq!(
3697 batch
3698 .column(0)
3699 .as_any()
3700 .downcast_ref::<TimestampSecondArray>()
3701 .expect("downcast to timestamp second")
3702 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3703 .map(|v| v.to_string())
3704 .expect("value as datetime"),
3705 "1970-01-01 01:00:00 +01:00"
3706 );
3707 assert_eq!(
3708 batch
3709 .column(1)
3710 .as_any()
3711 .downcast_ref::<Date64Array>()
3712 .expect("downcast to date64")
3713 .value_as_date(0)
3714 .map(|v| v.to_string())
3715 .expect("value as date"),
3716 "1970-01-01"
3717 );
3718
3719 let nested = batch
3720 .column(2)
3721 .as_any()
3722 .downcast_ref::<StructArray>()
3723 .expect("downcast to struct");
3724
3725 let nested_dict = nested
3726 .column(0)
3727 .as_any()
3728 .downcast_ref::<Int32DictionaryArray>()
3729 .expect("downcast to dictionary");
3730
3731 assert_eq!(
3732 nested_dict
3733 .values()
3734 .as_any()
3735 .downcast_ref::<StringArray>()
3736 .expect("downcast to string")
3737 .iter()
3738 .collect::<Vec<_>>(),
3739 vec![Some("a"), Some("b")]
3740 );
3741
3742 assert_eq!(
3743 nested_dict.keys().iter().collect::<Vec<_>>(),
3744 vec![Some(0), Some(0), Some(0), Some(1)]
3745 );
3746
3747 assert_eq!(
3748 nested
3749 .column(1)
3750 .as_any()
3751 .downcast_ref::<TimestampNanosecondArray>()
3752 .expect("downcast to timestamp nanosecond")
3753 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3754 .map(|v| v.to_string())
3755 .expect("value as datetime"),
3756 "1970-01-01 10:00:00.000000001 +10:00"
3757 );
3758 }
3759
3760 #[test]
3761 fn test_empty_projection() {
3762 let testdata = arrow::util::test_util::parquet_test_data();
3763 let path = format!("{testdata}/alltypes_plain.parquet");
3764 let file = File::open(path).unwrap();
3765
3766 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3767 let file_metadata = builder.metadata().file_metadata();
3768 let expected_rows = file_metadata.num_rows() as usize;
3769
3770 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3771 let batch_reader = builder
3772 .with_projection(mask)
3773 .with_batch_size(2)
3774 .build()
3775 .unwrap();
3776
3777 let mut total_rows = 0;
3778 for maybe_batch in batch_reader {
3779 let batch = maybe_batch.unwrap();
3780 total_rows += batch.num_rows();
3781 assert_eq!(batch.num_columns(), 0);
3782 assert!(batch.num_rows() <= 2);
3783 }
3784
3785 assert_eq!(total_rows, expected_rows);
3786 }
3787
3788 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3789 let schema = Arc::new(Schema::new(vec![Field::new(
3790 "list",
3791 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3792 true,
3793 )]));
3794
3795 let mut buf = Vec::with_capacity(1024);
3796
3797 let mut writer = ArrowWriter::try_new(
3798 &mut buf,
3799 schema.clone(),
3800 Some(
3801 WriterProperties::builder()
3802 .set_max_row_group_size(row_group_size)
3803 .build(),
3804 ),
3805 )
3806 .unwrap();
3807 for _ in 0..2 {
3808 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3809 for _ in 0..(batch_size) {
3810 list_builder.append(true);
3811 }
3812 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3813 .unwrap();
3814 writer.write(&batch).unwrap();
3815 }
3816 writer.close().unwrap();
3817
3818 let mut record_reader =
3819 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3820 assert_eq!(
3821 batch_size,
3822 record_reader.next().unwrap().unwrap().num_rows()
3823 );
3824 assert_eq!(
3825 batch_size,
3826 record_reader.next().unwrap().unwrap().num_rows()
3827 );
3828 }
3829
3830 #[test]
3831 fn test_row_group_exact_multiple() {
3832 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3833 test_row_group_batch(8, 8);
3834 test_row_group_batch(10, 8);
3835 test_row_group_batch(8, 10);
3836 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3837 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3838 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3839 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3840 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3841 }
3842
3843 fn get_expected_batches(
3846 column: &RecordBatch,
3847 selection: &RowSelection,
3848 batch_size: usize,
3849 ) -> Vec<RecordBatch> {
3850 let mut expected_batches = vec![];
3851
3852 let mut selection: VecDeque<_> = selection.clone().into();
3853 let mut row_offset = 0;
3854 let mut last_start = None;
3855 while row_offset < column.num_rows() && !selection.is_empty() {
3856 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3857 while batch_remaining > 0 && !selection.is_empty() {
3858 let (to_read, skip) = match selection.front_mut() {
3859 Some(selection) if selection.row_count > batch_remaining => {
3860 selection.row_count -= batch_remaining;
3861 (batch_remaining, selection.skip)
3862 }
3863 Some(_) => {
3864 let select = selection.pop_front().unwrap();
3865 (select.row_count, select.skip)
3866 }
3867 None => break,
3868 };
3869
3870 batch_remaining -= to_read;
3871
3872 match skip {
3873 true => {
3874 if let Some(last_start) = last_start.take() {
3875 expected_batches.push(column.slice(last_start, row_offset - last_start))
3876 }
3877 row_offset += to_read
3878 }
3879 false => {
3880 last_start.get_or_insert(row_offset);
3881 row_offset += to_read
3882 }
3883 }
3884 }
3885 }
3886
3887 if let Some(last_start) = last_start.take() {
3888 expected_batches.push(column.slice(last_start, row_offset - last_start))
3889 }
3890
3891 for batch in &expected_batches[..expected_batches.len() - 1] {
3893 assert_eq!(batch.num_rows(), batch_size);
3894 }
3895
3896 expected_batches
3897 }
3898
3899 fn create_test_selection(
3900 step_len: usize,
3901 total_len: usize,
3902 skip_first: bool,
3903 ) -> (RowSelection, usize) {
3904 let mut remaining = total_len;
3905 let mut skip = skip_first;
3906 let mut vec = vec![];
3907 let mut selected_count = 0;
3908 while remaining != 0 {
3909 let step = if remaining > step_len {
3910 step_len
3911 } else {
3912 remaining
3913 };
3914 vec.push(RowSelector {
3915 row_count: step,
3916 skip,
3917 });
3918 remaining -= step;
3919 if !skip {
3920 selected_count += step;
3921 }
3922 skip = !skip;
3923 }
3924 (vec.into(), selected_count)
3925 }
3926
3927 #[test]
3928 fn test_scan_row_with_selection() {
3929 let testdata = arrow::util::test_util::parquet_test_data();
3930 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3931 let test_file = File::open(&path).unwrap();
3932
3933 let mut serial_reader =
3934 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3935 let data = serial_reader.next().unwrap().unwrap();
3936
3937 let do_test = |batch_size: usize, selection_len: usize| {
3938 for skip_first in [false, true] {
3939 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3940
3941 let expected = get_expected_batches(&data, &selections, batch_size);
3942 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3943 assert_eq!(
3944 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3945 expected,
3946 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3947 );
3948 }
3949 };
3950
3951 do_test(1000, 1000);
3954
3955 do_test(20, 20);
3957
3958 do_test(20, 5);
3960
3961 do_test(20, 5);
3964
3965 fn create_skip_reader(
3966 test_file: &File,
3967 batch_size: usize,
3968 selections: RowSelection,
3969 ) -> ParquetRecordBatchReader {
3970 let options = ArrowReaderOptions::new().with_page_index(true);
3971 let file = test_file.try_clone().unwrap();
3972 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3973 .unwrap()
3974 .with_batch_size(batch_size)
3975 .with_row_selection(selections)
3976 .build()
3977 .unwrap()
3978 }
3979 }
3980
3981 #[test]
3982 fn test_batch_size_overallocate() {
3983 let testdata = arrow::util::test_util::parquet_test_data();
3984 let path = format!("{testdata}/alltypes_plain.parquet");
3986 let test_file = File::open(path).unwrap();
3987
3988 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3989 let num_rows = builder.metadata.file_metadata().num_rows();
3990 let reader = builder
3991 .with_batch_size(1024)
3992 .with_projection(ProjectionMask::all())
3993 .build()
3994 .unwrap();
3995 assert_ne!(1024, num_rows);
3996 assert_eq!(reader.batch_size, num_rows as usize);
3997 }
3998
3999 #[test]
4000 fn test_read_with_page_index_enabled() {
4001 let testdata = arrow::util::test_util::parquet_test_data();
4002
4003 {
4004 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4006 let test_file = File::open(path).unwrap();
4007 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4008 test_file,
4009 ArrowReaderOptions::new().with_page_index(true),
4010 )
4011 .unwrap();
4012 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4013 let reader = builder.build().unwrap();
4014 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4015 assert_eq!(batches.len(), 8);
4016 }
4017
4018 {
4019 let path = format!("{testdata}/alltypes_plain.parquet");
4021 let test_file = File::open(path).unwrap();
4022 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4023 test_file,
4024 ArrowReaderOptions::new().with_page_index(true),
4025 )
4026 .unwrap();
4027 assert!(builder.metadata().offset_index().is_none());
4030 let reader = builder.build().unwrap();
4031 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4032 assert_eq!(batches.len(), 1);
4033 }
4034 }
4035
4036 #[test]
4037 fn test_raw_repetition() {
4038 const MESSAGE_TYPE: &str = "
4039 message Log {
4040 OPTIONAL INT32 eventType;
4041 REPEATED INT32 category;
4042 REPEATED group filter {
4043 OPTIONAL INT32 error;
4044 }
4045 }
4046 ";
4047 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4048 let props = Default::default();
4049
4050 let mut buf = Vec::with_capacity(1024);
4051 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4052 let mut row_group_writer = writer.next_row_group().unwrap();
4053
4054 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4056 col_writer
4057 .typed::<Int32Type>()
4058 .write_batch(&[1], Some(&[1]), None)
4059 .unwrap();
4060 col_writer.close().unwrap();
4061 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4063 col_writer
4064 .typed::<Int32Type>()
4065 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4066 .unwrap();
4067 col_writer.close().unwrap();
4068 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4070 col_writer
4071 .typed::<Int32Type>()
4072 .write_batch(&[1], Some(&[1]), Some(&[0]))
4073 .unwrap();
4074 col_writer.close().unwrap();
4075
4076 let rg_md = row_group_writer.close().unwrap();
4077 assert_eq!(rg_md.num_rows(), 1);
4078 writer.close().unwrap();
4079
4080 let bytes = Bytes::from(buf);
4081
4082 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4083 let full = no_mask.next().unwrap().unwrap();
4084
4085 assert_eq!(full.num_columns(), 3);
4086
4087 for idx in 0..3 {
4088 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4089 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4090 let mut reader = b.with_projection(mask).build().unwrap();
4091 let projected = reader.next().unwrap().unwrap();
4092
4093 assert_eq!(projected.num_columns(), 1);
4094 assert_eq!(full.column(idx), projected.column(0));
4095 }
4096 }
4097
4098 #[test]
4099 fn test_read_lz4_raw() {
4100 let testdata = arrow::util::test_util::parquet_test_data();
4101 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4102 let file = File::open(path).unwrap();
4103
4104 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4105 .unwrap()
4106 .collect::<Result<Vec<_>, _>>()
4107 .unwrap();
4108 assert_eq!(batches.len(), 1);
4109 let batch = &batches[0];
4110
4111 assert_eq!(batch.num_columns(), 3);
4112 assert_eq!(batch.num_rows(), 4);
4113
4114 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4116 assert_eq!(
4117 a.values(),
4118 &[1593604800, 1593604800, 1593604801, 1593604801]
4119 );
4120
4121 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4122 let a: Vec<_> = a.iter().flatten().collect();
4123 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4124
4125 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4126 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4127 }
4128
4129 #[test]
4139 fn test_read_lz4_hadoop_fallback() {
4140 for file in [
4141 "hadoop_lz4_compressed.parquet",
4142 "non_hadoop_lz4_compressed.parquet",
4143 ] {
4144 let testdata = arrow::util::test_util::parquet_test_data();
4145 let path = format!("{testdata}/{file}");
4146 let file = File::open(path).unwrap();
4147 let expected_rows = 4;
4148
4149 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4150 .unwrap()
4151 .collect::<Result<Vec<_>, _>>()
4152 .unwrap();
4153 assert_eq!(batches.len(), 1);
4154 let batch = &batches[0];
4155
4156 assert_eq!(batch.num_columns(), 3);
4157 assert_eq!(batch.num_rows(), expected_rows);
4158
4159 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4160 assert_eq!(
4161 a.values(),
4162 &[1593604800, 1593604800, 1593604801, 1593604801]
4163 );
4164
4165 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4166 let b: Vec<_> = b.iter().flatten().collect();
4167 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4168
4169 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4170 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4171 }
4172 }
4173
4174 #[test]
4175 fn test_read_lz4_hadoop_large() {
4176 let testdata = arrow::util::test_util::parquet_test_data();
4177 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4178 let file = File::open(path).unwrap();
4179 let expected_rows = 10000;
4180
4181 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4182 .unwrap()
4183 .collect::<Result<Vec<_>, _>>()
4184 .unwrap();
4185 assert_eq!(batches.len(), 1);
4186 let batch = &batches[0];
4187
4188 assert_eq!(batch.num_columns(), 1);
4189 assert_eq!(batch.num_rows(), expected_rows);
4190
4191 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4192 let a: Vec<_> = a.iter().flatten().collect();
4193 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4194 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4195 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4196 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4197 }
4198
4199 #[test]
4200 #[cfg(feature = "snap")]
4201 fn test_read_nested_lists() {
4202 let testdata = arrow::util::test_util::parquet_test_data();
4203 let path = format!("{testdata}/nested_lists.snappy.parquet");
4204 let file = File::open(path).unwrap();
4205
4206 let f = file.try_clone().unwrap();
4207 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4208 let expected = reader.next().unwrap().unwrap();
4209 assert_eq!(expected.num_rows(), 3);
4210
4211 let selection = RowSelection::from(vec![
4212 RowSelector::skip(1),
4213 RowSelector::select(1),
4214 RowSelector::skip(1),
4215 ]);
4216 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4217 .unwrap()
4218 .with_row_selection(selection)
4219 .build()
4220 .unwrap();
4221
4222 let actual = reader.next().unwrap().unwrap();
4223 assert_eq!(actual.num_rows(), 1);
4224 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4225 }
4226
4227 #[test]
4228 fn test_arbitrary_decimal() {
4229 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4230 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4231 .with_precision_and_scale(19, 0)
4232 .unwrap();
4233 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4234 .with_precision_and_scale(12, 0)
4235 .unwrap();
4236 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4237 .with_precision_and_scale(17, 10)
4238 .unwrap();
4239
4240 let written = RecordBatch::try_from_iter([
4241 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4242 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4243 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4244 ])
4245 .unwrap();
4246
4247 let mut buffer = Vec::with_capacity(1024);
4248 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4249 writer.write(&written).unwrap();
4250 writer.close().unwrap();
4251
4252 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4253 .unwrap()
4254 .collect::<Result<Vec<_>, _>>()
4255 .unwrap();
4256
4257 assert_eq!(&written.slice(0, 8), &read[0]);
4258 }
4259
4260 #[test]
4261 fn test_list_skip() {
4262 let mut list = ListBuilder::new(Int32Builder::new());
4263 list.append_value([Some(1), Some(2)]);
4264 list.append_value([Some(3)]);
4265 list.append_value([Some(4)]);
4266 let list = list.finish();
4267 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4268
4269 let props = WriterProperties::builder()
4271 .set_data_page_row_count_limit(1)
4272 .set_write_batch_size(2)
4273 .build();
4274
4275 let mut buffer = Vec::with_capacity(1024);
4276 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4277 writer.write(&batch).unwrap();
4278 writer.close().unwrap();
4279
4280 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4281 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4282 .unwrap()
4283 .with_row_selection(selection.into())
4284 .build()
4285 .unwrap();
4286 let out = reader.next().unwrap().unwrap();
4287 assert_eq!(out.num_rows(), 1);
4288 assert_eq!(out, batch.slice(2, 1));
4289 }
4290
4291 fn test_decimal_roundtrip<T: DecimalType>() {
4292 let d = |values: Vec<usize>, p: u8| {
4297 let iter = values.into_iter().map(T::Native::usize_as);
4298 PrimitiveArray::<T>::from_iter_values(iter)
4299 .with_precision_and_scale(p, 2)
4300 .unwrap()
4301 };
4302
4303 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4304 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4305 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4306 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4307
4308 let batch = RecordBatch::try_from_iter([
4309 ("d1", Arc::new(d1) as ArrayRef),
4310 ("d2", Arc::new(d2) as ArrayRef),
4311 ("d3", Arc::new(d3) as ArrayRef),
4312 ("d4", Arc::new(d4) as ArrayRef),
4313 ])
4314 .unwrap();
4315
4316 let mut buffer = Vec::with_capacity(1024);
4317 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4318 writer.write(&batch).unwrap();
4319 writer.close().unwrap();
4320
4321 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4322 let t1 = builder.parquet_schema().columns()[0].physical_type();
4323 assert_eq!(t1, PhysicalType::INT32);
4324 let t2 = builder.parquet_schema().columns()[1].physical_type();
4325 assert_eq!(t2, PhysicalType::INT64);
4326 let t3 = builder.parquet_schema().columns()[2].physical_type();
4327 assert_eq!(t3, PhysicalType::INT64);
4328 let t4 = builder.parquet_schema().columns()[3].physical_type();
4329 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4330
4331 let mut reader = builder.build().unwrap();
4332 assert_eq!(batch.schema(), reader.schema());
4333
4334 let out = reader.next().unwrap().unwrap();
4335 assert_eq!(batch, out);
4336 }
4337
4338 #[test]
4339 fn test_decimal() {
4340 test_decimal_roundtrip::<Decimal128Type>();
4341 test_decimal_roundtrip::<Decimal256Type>();
4342 }
4343
4344 #[test]
4345 fn test_list_selection() {
4346 let schema = Arc::new(Schema::new(vec![Field::new_list(
4347 "list",
4348 Field::new_list_field(ArrowDataType::Utf8, true),
4349 false,
4350 )]));
4351 let mut buf = Vec::with_capacity(1024);
4352
4353 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4354
4355 for i in 0..2 {
4356 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4357 for j in 0..1024 {
4358 list_a_builder.values().append_value(format!("{i} {j}"));
4359 list_a_builder.append(true);
4360 }
4361 let batch =
4362 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4363 .unwrap();
4364 writer.write(&batch).unwrap();
4365 }
4366 let _metadata = writer.close().unwrap();
4367
4368 let buf = Bytes::from(buf);
4369 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4370 .unwrap()
4371 .with_row_selection(RowSelection::from(vec![
4372 RowSelector::skip(100),
4373 RowSelector::select(924),
4374 RowSelector::skip(100),
4375 RowSelector::select(924),
4376 ]))
4377 .build()
4378 .unwrap();
4379
4380 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4381 let batch = concat_batches(&schema, &batches).unwrap();
4382
4383 assert_eq!(batch.num_rows(), 924 * 2);
4384 let list = batch.column(0).as_list::<i32>();
4385
4386 for w in list.value_offsets().windows(2) {
4387 assert_eq!(w[0] + 1, w[1])
4388 }
4389 let mut values = list.values().as_string::<i32>().iter();
4390
4391 for i in 0..2 {
4392 for j in 100..1024 {
4393 let expected = format!("{i} {j}");
4394 assert_eq!(values.next().unwrap().unwrap(), &expected);
4395 }
4396 }
4397 }
4398
4399 #[test]
4400 fn test_list_selection_fuzz() {
4401 let mut rng = rng();
4402 let schema = Arc::new(Schema::new(vec![Field::new_list(
4403 "list",
4404 Field::new_list(
4405 Field::LIST_FIELD_DEFAULT_NAME,
4406 Field::new_list_field(ArrowDataType::Int32, true),
4407 true,
4408 ),
4409 true,
4410 )]));
4411 let mut buf = Vec::with_capacity(1024);
4412 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4413
4414 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4415
4416 for _ in 0..2048 {
4417 if rng.random_bool(0.2) {
4418 list_a_builder.append(false);
4419 continue;
4420 }
4421
4422 let list_a_len = rng.random_range(0..10);
4423 let list_b_builder = list_a_builder.values();
4424
4425 for _ in 0..list_a_len {
4426 if rng.random_bool(0.2) {
4427 list_b_builder.append(false);
4428 continue;
4429 }
4430
4431 let list_b_len = rng.random_range(0..10);
4432 let int_builder = list_b_builder.values();
4433 for _ in 0..list_b_len {
4434 match rng.random_bool(0.2) {
4435 true => int_builder.append_null(),
4436 false => int_builder.append_value(rng.random()),
4437 }
4438 }
4439 list_b_builder.append(true)
4440 }
4441 list_a_builder.append(true);
4442 }
4443
4444 let array = Arc::new(list_a_builder.finish());
4445 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4446
4447 writer.write(&batch).unwrap();
4448 let _metadata = writer.close().unwrap();
4449
4450 let buf = Bytes::from(buf);
4451
4452 let cases = [
4453 vec![
4454 RowSelector::skip(100),
4455 RowSelector::select(924),
4456 RowSelector::skip(100),
4457 RowSelector::select(924),
4458 ],
4459 vec![
4460 RowSelector::select(924),
4461 RowSelector::skip(100),
4462 RowSelector::select(924),
4463 RowSelector::skip(100),
4464 ],
4465 vec![
4466 RowSelector::skip(1023),
4467 RowSelector::select(1),
4468 RowSelector::skip(1023),
4469 RowSelector::select(1),
4470 ],
4471 vec![
4472 RowSelector::select(1),
4473 RowSelector::skip(1023),
4474 RowSelector::select(1),
4475 RowSelector::skip(1023),
4476 ],
4477 ];
4478
4479 for batch_size in [100, 1024, 2048] {
4480 for selection in &cases {
4481 let selection = RowSelection::from(selection.clone());
4482 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4483 .unwrap()
4484 .with_row_selection(selection.clone())
4485 .with_batch_size(batch_size)
4486 .build()
4487 .unwrap();
4488
4489 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4490 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4491 assert_eq!(actual.num_rows(), selection.row_count());
4492
4493 let mut batch_offset = 0;
4494 let mut actual_offset = 0;
4495 for selector in selection.iter() {
4496 if selector.skip {
4497 batch_offset += selector.row_count;
4498 continue;
4499 }
4500
4501 assert_eq!(
4502 batch.slice(batch_offset, selector.row_count),
4503 actual.slice(actual_offset, selector.row_count)
4504 );
4505
4506 batch_offset += selector.row_count;
4507 actual_offset += selector.row_count;
4508 }
4509 }
4510 }
4511 }
4512
4513 #[test]
4514 fn test_read_old_nested_list() {
4515 use arrow::datatypes::DataType;
4516 use arrow::datatypes::ToByteSlice;
4517
4518 let testdata = arrow::util::test_util::parquet_test_data();
4519 let path = format!("{testdata}/old_list_structure.parquet");
4528 let test_file = File::open(path).unwrap();
4529
4530 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4532
4533 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4535
4536 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4538 "array",
4539 DataType::Int32,
4540 false,
4541 ))))
4542 .len(2)
4543 .add_buffer(a_value_offsets)
4544 .add_child_data(a_values.into_data())
4545 .build()
4546 .unwrap();
4547 let a = ListArray::from(a_list_data);
4548
4549 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4550 let mut reader = builder.build().unwrap();
4551 let out = reader.next().unwrap().unwrap();
4552 assert_eq!(out.num_rows(), 1);
4553 assert_eq!(out.num_columns(), 1);
4554 let c0 = out.column(0);
4556 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4557 let r0 = c0arr.value(0);
4559 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4560 assert_eq!(r0arr, &a);
4561 }
4562
4563 #[test]
4564 fn test_map_no_value() {
4565 let testdata = arrow::util::test_util::parquet_test_data();
4585 let path = format!("{testdata}/map_no_value.parquet");
4586 let file = File::open(path).unwrap();
4587
4588 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4589 .unwrap()
4590 .build()
4591 .unwrap();
4592 let out = reader.next().unwrap().unwrap();
4593 assert_eq!(out.num_rows(), 3);
4594 assert_eq!(out.num_columns(), 3);
4595 let c0 = out.column(1).as_list::<i32>();
4597 let c1 = out.column(2).as_list::<i32>();
4598 assert_eq!(c0.len(), c1.len());
4599 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4600 }
4601}