1use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24use arrow_select::filter::prep_null_mask_filter;
25pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
26pub use selection::{RowSelection, RowSelector};
27use std::collections::VecDeque;
28use std::sync::Arc;
29
30pub use crate::arrow::array_reader::RowGroups;
31use crate::arrow::array_reader::{build_array_reader, ArrayReader};
32use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
33use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use crate::column::page::{PageIterator, PageReader};
35#[cfg(feature = "encryption")]
36use crate::encryption::decrypt::FileDecryptionProperties;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39use crate::file::reader::{ChunkReader, SerializedPageReader};
40use crate::schema::types::SchemaDescriptor;
41
42mod filter;
43mod selection;
44pub mod statistics;
45
46pub struct ArrowReaderBuilder<T> {
92 pub(crate) input: T,
93
94 pub(crate) metadata: Arc<ParquetMetaData>,
95
96 pub(crate) schema: SchemaRef,
97
98 pub(crate) fields: Option<Arc<ParquetField>>,
99
100 pub(crate) batch_size: usize,
101
102 pub(crate) row_groups: Option<Vec<usize>>,
103
104 pub(crate) projection: ProjectionMask,
105
106 pub(crate) filter: Option<RowFilter>,
107
108 pub(crate) selection: Option<RowSelection>,
109
110 pub(crate) limit: Option<usize>,
111
112 pub(crate) offset: Option<usize>,
113}
114
115impl<T> ArrowReaderBuilder<T> {
116 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
117 Self {
118 input,
119 metadata: metadata.metadata,
120 schema: metadata.schema,
121 fields: metadata.fields,
122 batch_size: 1024,
123 row_groups: None,
124 projection: ProjectionMask::all(),
125 filter: None,
126 selection: None,
127 limit: None,
128 offset: None,
129 }
130 }
131
132 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
134 &self.metadata
135 }
136
137 pub fn parquet_schema(&self) -> &SchemaDescriptor {
139 self.metadata.file_metadata().schema_descr()
140 }
141
142 pub fn schema(&self) -> &SchemaRef {
144 &self.schema
145 }
146
147 pub fn with_batch_size(self, batch_size: usize) -> Self {
150 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
152 Self { batch_size, ..self }
153 }
154
155 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
159 Self {
160 row_groups: Some(row_groups),
161 ..self
162 }
163 }
164
165 pub fn with_projection(self, mask: ProjectionMask) -> Self {
167 Self {
168 projection: mask,
169 ..self
170 }
171 }
172
173 pub fn with_row_selection(self, selection: RowSelection) -> Self {
233 Self {
234 selection: Some(selection),
235 ..self
236 }
237 }
238
239 pub fn with_row_filter(self, filter: RowFilter) -> Self {
246 Self {
247 filter: Some(filter),
248 ..self
249 }
250 }
251
252 pub fn with_limit(self, limit: usize) -> Self {
260 Self {
261 limit: Some(limit),
262 ..self
263 }
264 }
265
266 pub fn with_offset(self, offset: usize) -> Self {
274 Self {
275 offset: Some(offset),
276 ..self
277 }
278 }
279}
280
281#[derive(Debug, Clone, Default)]
286pub struct ArrowReaderOptions {
287 skip_arrow_metadata: bool,
289 supplied_schema: Option<SchemaRef>,
291 pub(crate) page_index: bool,
293 #[cfg(feature = "encryption")]
295 pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
296}
297
298impl ArrowReaderOptions {
299 pub fn new() -> Self {
301 Self::default()
302 }
303
304 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
311 Self {
312 skip_arrow_metadata,
313 ..self
314 }
315 }
316
317 pub fn with_schema(self, schema: SchemaRef) -> Self {
364 Self {
365 supplied_schema: Some(schema),
366 skip_arrow_metadata: true,
367 ..self
368 }
369 }
370
371 pub fn with_page_index(self, page_index: bool) -> Self {
384 Self { page_index, ..self }
385 }
386
387 #[cfg(feature = "encryption")]
391 pub fn with_file_decryption_properties(
392 self,
393 file_decryption_properties: FileDecryptionProperties,
394 ) -> Self {
395 Self {
396 file_decryption_properties: Some(file_decryption_properties),
397 ..self
398 }
399 }
400
401 pub fn page_index(&self) -> bool {
405 self.page_index
406 }
407
408 #[cfg(feature = "encryption")]
413 pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
414 self.file_decryption_properties.as_ref()
415 }
416}
417
418#[derive(Debug, Clone)]
433pub struct ArrowReaderMetadata {
434 pub(crate) metadata: Arc<ParquetMetaData>,
436 pub(crate) schema: SchemaRef,
438
439 pub(crate) fields: Option<Arc<ParquetField>>,
440}
441
442impl ArrowReaderMetadata {
443 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
454 let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
455 #[cfg(feature = "encryption")]
456 let metadata =
457 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
458 let metadata = metadata.parse_and_finish(reader)?;
459 Self::try_new(Arc::new(metadata), options)
460 }
461
462 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
469 match options.supplied_schema {
470 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
471 None => {
472 let kv_metadata = match options.skip_arrow_metadata {
473 true => None,
474 false => metadata.file_metadata().key_value_metadata(),
475 };
476
477 let (schema, fields) = parquet_to_arrow_schema_and_fields(
478 metadata.file_metadata().schema_descr(),
479 ProjectionMask::all(),
480 kv_metadata,
481 )?;
482
483 Ok(Self {
484 metadata,
485 schema: Arc::new(schema),
486 fields: fields.map(Arc::new),
487 })
488 }
489 }
490 }
491
492 fn with_supplied_schema(
493 metadata: Arc<ParquetMetaData>,
494 supplied_schema: SchemaRef,
495 ) -> Result<Self> {
496 let parquet_schema = metadata.file_metadata().schema_descr();
497 let field_levels = parquet_to_arrow_field_levels(
498 parquet_schema,
499 ProjectionMask::all(),
500 Some(supplied_schema.fields()),
501 )?;
502 let fields = field_levels.fields;
503 let inferred_len = fields.len();
504 let supplied_len = supplied_schema.fields().len();
505 if inferred_len != supplied_len {
509 Err(arrow_err!(format!(
510 "incompatible arrow schema, expected {} columns received {}",
511 inferred_len, supplied_len
512 )))
513 } else {
514 let diff_fields: Vec<_> = supplied_schema
515 .fields()
516 .iter()
517 .zip(fields.iter())
518 .filter_map(|(field1, field2)| {
519 if field1 != field2 {
520 Some(field1.name().clone())
521 } else {
522 None
523 }
524 })
525 .collect();
526
527 if !diff_fields.is_empty() {
528 Err(ParquetError::ArrowError(format!(
529 "incompatible arrow schema, the following fields could not be cast: [{}]",
530 diff_fields.join(", ")
531 )))
532 } else {
533 Ok(Self {
534 metadata,
535 schema: supplied_schema,
536 fields: field_levels.levels.map(Arc::new),
537 })
538 }
539 }
540 }
541
542 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
544 &self.metadata
545 }
546
547 pub fn parquet_schema(&self) -> &SchemaDescriptor {
549 self.metadata.file_metadata().schema_descr()
550 }
551
552 pub fn schema(&self) -> &SchemaRef {
554 &self.schema
555 }
556}
557
558#[doc(hidden)]
559pub struct SyncReader<T: ChunkReader>(T);
561
562pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
568
569impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
570 pub fn try_new(reader: T) -> Result<Self> {
599 Self::try_new_with_options(reader, Default::default())
600 }
601
602 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
604 let metadata = ArrowReaderMetadata::load(&reader, options)?;
605 Ok(Self::new_with_metadata(reader, metadata))
606 }
607
608 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
646 Self::new_builder(SyncReader(input), metadata)
647 }
648
649 pub fn build(self) -> Result<ParquetRecordBatchReader> {
653 let batch_size = self
655 .batch_size
656 .min(self.metadata.file_metadata().num_rows() as usize);
657
658 let row_groups = self
659 .row_groups
660 .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
661
662 let reader = ReaderRowGroups {
663 reader: Arc::new(self.input.0),
664 metadata: self.metadata,
665 row_groups,
666 };
667
668 let mut filter = self.filter;
669 let mut selection = self.selection;
670
671 if let Some(filter) = filter.as_mut() {
672 for predicate in filter.predicates.iter_mut() {
673 if !selects_any(selection.as_ref()) {
674 break;
675 }
676
677 let array_reader =
678 build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
679
680 selection = Some(evaluate_predicate(
681 batch_size,
682 array_reader,
683 selection,
684 predicate.as_mut(),
685 )?);
686 }
687 }
688
689 let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
690
691 if !selects_any(selection.as_ref()) {
693 selection = Some(RowSelection::from(vec![]));
694 }
695
696 Ok(ParquetRecordBatchReader::new(
697 batch_size,
698 array_reader,
699 apply_range(selection, reader.num_rows(), self.offset, self.limit),
700 ))
701 }
702}
703
704struct ReaderRowGroups<T: ChunkReader> {
705 reader: Arc<T>,
706
707 metadata: Arc<ParquetMetaData>,
708 row_groups: Vec<usize>,
710}
711
712impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
713 fn num_rows(&self) -> usize {
714 let meta = self.metadata.row_groups();
715 self.row_groups
716 .iter()
717 .map(|x| meta[*x].num_rows() as usize)
718 .sum()
719 }
720
721 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
722 Ok(Box::new(ReaderPageIterator {
723 column_idx: i,
724 reader: self.reader.clone(),
725 metadata: self.metadata.clone(),
726 row_groups: self.row_groups.clone().into_iter(),
727 }))
728 }
729}
730
731struct ReaderPageIterator<T: ChunkReader> {
732 reader: Arc<T>,
733 column_idx: usize,
734 row_groups: std::vec::IntoIter<usize>,
735 metadata: Arc<ParquetMetaData>,
736}
737
738impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
739 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
741 let rg = self.metadata.row_group(rg_idx);
742 let column_chunk_metadata = rg.column(self.column_idx);
743 let offset_index = self.metadata.offset_index();
744 let page_locations = offset_index
747 .filter(|i| !i[rg_idx].is_empty())
748 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
749 let total_rows = rg.num_rows() as usize;
750 let reader = self.reader.clone();
751
752 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
753 .add_crypto_context(
754 rg_idx,
755 self.column_idx,
756 self.metadata.as_ref(),
757 column_chunk_metadata,
758 )
759 }
760}
761
762impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
763 type Item = Result<Box<dyn PageReader>>;
764
765 fn next(&mut self) -> Option<Self::Item> {
766 let rg_idx = self.row_groups.next()?;
767 let page_reader = self
768 .next_page_reader(rg_idx)
769 .map(|page_reader| Box::new(page_reader) as _);
770 Some(page_reader)
771 }
772}
773
774impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
775
776pub struct ParquetRecordBatchReader {
779 batch_size: usize,
780 array_reader: Box<dyn ArrayReader>,
781 schema: SchemaRef,
782 selection: Option<VecDeque<RowSelector>>,
783}
784
785impl Iterator for ParquetRecordBatchReader {
786 type Item = Result<RecordBatch, ArrowError>;
787
788 fn next(&mut self) -> Option<Self::Item> {
789 let mut read_records = 0;
790 match self.selection.as_mut() {
791 Some(selection) => {
792 while read_records < self.batch_size && !selection.is_empty() {
793 let front = selection.pop_front().unwrap();
794 if front.skip {
795 let skipped = match self.array_reader.skip_records(front.row_count) {
796 Ok(skipped) => skipped,
797 Err(e) => return Some(Err(e.into())),
798 };
799
800 if skipped != front.row_count {
801 return Some(Err(general_err!(
802 "failed to skip rows, expected {}, got {}",
803 front.row_count,
804 skipped
805 )
806 .into()));
807 }
808 continue;
809 }
810
811 if front.row_count == 0 {
814 continue;
815 }
816
817 let need_read = self.batch_size - read_records;
819 let to_read = match front.row_count.checked_sub(need_read) {
820 Some(remaining) if remaining != 0 => {
821 selection.push_front(RowSelector::select(remaining));
824 need_read
825 }
826 _ => front.row_count,
827 };
828 match self.array_reader.read_records(to_read) {
829 Ok(0) => break,
830 Ok(rec) => read_records += rec,
831 Err(error) => return Some(Err(error.into())),
832 }
833 }
834 }
835 None => {
836 if let Err(error) = self.array_reader.read_records(self.batch_size) {
837 return Some(Err(error.into()));
838 }
839 }
840 };
841
842 match self.array_reader.consume_batch() {
843 Err(error) => Some(Err(error.into())),
844 Ok(array) => {
845 let struct_array = array.as_struct_opt().ok_or_else(|| {
846 ArrowError::ParquetError(
847 "Struct array reader should return struct array".to_string(),
848 )
849 });
850
851 match struct_array {
852 Err(err) => Some(Err(err)),
853 Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
854 }
855 }
856 }
857 }
858}
859
860impl RecordBatchReader for ParquetRecordBatchReader {
861 fn schema(&self) -> SchemaRef {
866 self.schema.clone()
867 }
868}
869
870impl ParquetRecordBatchReader {
871 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
875 ParquetRecordBatchReaderBuilder::try_new(reader)?
876 .with_batch_size(batch_size)
877 .build()
878 }
879
880 pub fn try_new_with_row_groups(
885 levels: &FieldLevels,
886 row_groups: &dyn RowGroups,
887 batch_size: usize,
888 selection: Option<RowSelection>,
889 ) -> Result<Self> {
890 let array_reader =
891 build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
892
893 Ok(Self {
894 batch_size,
895 array_reader,
896 schema: Arc::new(Schema::new(levels.fields.clone())),
897 selection: selection.map(|s| s.trim().into()),
898 })
899 }
900
901 pub(crate) fn new(
905 batch_size: usize,
906 array_reader: Box<dyn ArrayReader>,
907 selection: Option<RowSelection>,
908 ) -> Self {
909 let schema = match array_reader.get_data_type() {
910 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
911 _ => unreachable!("Struct array reader's data type is not struct!"),
912 };
913
914 Self {
915 batch_size,
916 array_reader,
917 schema: Arc::new(schema),
918 selection: selection.map(|s| s.trim().into()),
919 }
920 }
921}
922
923pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
925 selection.map(|x| x.selects_any()).unwrap_or(true)
926}
927
928pub(crate) fn apply_range(
930 mut selection: Option<RowSelection>,
931 row_count: usize,
932 offset: Option<usize>,
933 limit: Option<usize>,
934) -> Option<RowSelection> {
935 if let Some(offset) = offset {
937 selection = Some(match row_count.checked_sub(offset) {
938 None => RowSelection::from(vec![]),
939 Some(remaining) => selection
940 .map(|selection| selection.offset(offset))
941 .unwrap_or_else(|| {
942 RowSelection::from(vec![
943 RowSelector::skip(offset),
944 RowSelector::select(remaining),
945 ])
946 }),
947 });
948 }
949
950 if let Some(limit) = limit {
952 selection = Some(
953 selection
954 .map(|selection| selection.limit(limit))
955 .unwrap_or_else(|| {
956 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
957 }),
958 );
959 }
960 selection
961}
962
963pub(crate) fn evaluate_predicate(
974 batch_size: usize,
975 array_reader: Box<dyn ArrayReader>,
976 input_selection: Option<RowSelection>,
977 predicate: &mut dyn ArrowPredicate,
978) -> Result<RowSelection> {
979 let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
980 let mut filters = vec![];
981 for maybe_batch in reader {
982 let maybe_batch = maybe_batch?;
983 let input_rows = maybe_batch.num_rows();
984 let filter = predicate.evaluate(maybe_batch)?;
985 if filter.len() != input_rows {
987 return Err(arrow_err!(
988 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
989 filter.len()
990 ));
991 }
992 match filter.null_count() {
993 0 => filters.push(filter),
994 _ => filters.push(prep_null_mask_filter(&filter)),
995 };
996 }
997
998 let raw = RowSelection::from_filters(&filters);
999 Ok(match input_selection {
1000 Some(selection) => selection.and_then(&raw),
1001 None => raw,
1002 })
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use std::cmp::min;
1008 use std::collections::{HashMap, VecDeque};
1009 use std::fmt::Formatter;
1010 use std::fs::File;
1011 use std::io::Seek;
1012 use std::path::PathBuf;
1013 use std::sync::Arc;
1014
1015 use arrow_array::builder::*;
1016 use arrow_array::cast::AsArray;
1017 use arrow_array::types::{
1018 Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
1019 Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
1020 };
1021 use arrow_array::*;
1022 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1023 use arrow_data::{ArrayData, ArrayDataBuilder};
1024 use arrow_schema::{
1025 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1026 };
1027 use arrow_select::concat::concat_batches;
1028 use bytes::Bytes;
1029 use half::f16;
1030 use num::PrimInt;
1031 use rand::{rng, Rng, RngCore};
1032 use tempfile::tempfile;
1033
1034 use crate::arrow::arrow_reader::{
1035 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1036 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1037 };
1038 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1039 use crate::arrow::{ArrowWriter, ProjectionMask};
1040 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1041 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1042 use crate::data_type::{
1043 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1044 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1045 };
1046 use crate::errors::Result;
1047 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1048 use crate::file::writer::SerializedFileWriter;
1049 use crate::schema::parser::parse_message_type;
1050 use crate::schema::types::{Type, TypePtr};
1051 use crate::util::test_common::rand_gen::RandGen;
1052
1053 #[test]
1054 fn test_arrow_reader_all_columns() {
1055 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1056
1057 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1058 let original_schema = Arc::clone(builder.schema());
1059 let reader = builder.build().unwrap();
1060
1061 assert_eq!(original_schema.fields(), reader.schema().fields());
1063 }
1064
1065 #[test]
1066 fn test_arrow_reader_single_column() {
1067 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1068
1069 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1070 let original_schema = Arc::clone(builder.schema());
1071
1072 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1073 let reader = builder.with_projection(mask).build().unwrap();
1074
1075 assert_eq!(1, reader.schema().fields().len());
1077 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1078 }
1079
1080 #[test]
1081 fn test_arrow_reader_single_column_by_name() {
1082 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1083
1084 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1085 let original_schema = Arc::clone(builder.schema());
1086
1087 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1088 let reader = builder.with_projection(mask).build().unwrap();
1089
1090 assert_eq!(1, reader.schema().fields().len());
1092 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1093 }
1094
1095 #[test]
1096 fn test_null_column_reader_test() {
1097 let mut file = tempfile::tempfile().unwrap();
1098
1099 let schema = "
1100 message message {
1101 OPTIONAL INT32 int32;
1102 }
1103 ";
1104 let schema = Arc::new(parse_message_type(schema).unwrap());
1105
1106 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1107 generate_single_column_file_with_data::<Int32Type>(
1108 &[vec![], vec![]],
1109 Some(&def_levels),
1110 file.try_clone().unwrap(), schema,
1112 Some(Field::new("int32", ArrowDataType::Null, true)),
1113 &Default::default(),
1114 )
1115 .unwrap();
1116
1117 file.rewind().unwrap();
1118
1119 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1120 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1121
1122 assert_eq!(batches.len(), 4);
1123 for batch in &batches[0..3] {
1124 assert_eq!(batch.num_rows(), 2);
1125 assert_eq!(batch.num_columns(), 1);
1126 assert_eq!(batch.column(0).null_count(), 2);
1127 }
1128
1129 assert_eq!(batches[3].num_rows(), 1);
1130 assert_eq!(batches[3].num_columns(), 1);
1131 assert_eq!(batches[3].column(0).null_count(), 1);
1132 }
1133
1134 #[test]
1135 fn test_primitive_single_column_reader_test() {
1136 run_single_column_reader_tests::<BoolType, _, BoolType>(
1137 2,
1138 ConvertedType::NONE,
1139 None,
1140 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1141 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1142 );
1143 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1144 2,
1145 ConvertedType::NONE,
1146 None,
1147 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1148 &[
1149 Encoding::PLAIN,
1150 Encoding::RLE_DICTIONARY,
1151 Encoding::DELTA_BINARY_PACKED,
1152 Encoding::BYTE_STREAM_SPLIT,
1153 ],
1154 );
1155 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1156 2,
1157 ConvertedType::NONE,
1158 None,
1159 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1160 &[
1161 Encoding::PLAIN,
1162 Encoding::RLE_DICTIONARY,
1163 Encoding::DELTA_BINARY_PACKED,
1164 Encoding::BYTE_STREAM_SPLIT,
1165 ],
1166 );
1167 run_single_column_reader_tests::<FloatType, _, FloatType>(
1168 2,
1169 ConvertedType::NONE,
1170 None,
1171 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1172 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1173 );
1174 }
1175
1176 #[test]
1177 fn test_unsigned_primitive_single_column_reader_test() {
1178 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1179 2,
1180 ConvertedType::UINT_32,
1181 Some(ArrowDataType::UInt32),
1182 |vals| {
1183 Arc::new(UInt32Array::from_iter(
1184 vals.iter().map(|x| x.map(|x| x as u32)),
1185 ))
1186 },
1187 &[
1188 Encoding::PLAIN,
1189 Encoding::RLE_DICTIONARY,
1190 Encoding::DELTA_BINARY_PACKED,
1191 ],
1192 );
1193 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1194 2,
1195 ConvertedType::UINT_64,
1196 Some(ArrowDataType::UInt64),
1197 |vals| {
1198 Arc::new(UInt64Array::from_iter(
1199 vals.iter().map(|x| x.map(|x| x as u64)),
1200 ))
1201 },
1202 &[
1203 Encoding::PLAIN,
1204 Encoding::RLE_DICTIONARY,
1205 Encoding::DELTA_BINARY_PACKED,
1206 ],
1207 );
1208 }
1209
1210 #[test]
1211 fn test_unsigned_roundtrip() {
1212 let schema = Arc::new(Schema::new(vec![
1213 Field::new("uint32", ArrowDataType::UInt32, true),
1214 Field::new("uint64", ArrowDataType::UInt64, true),
1215 ]));
1216
1217 let mut buf = Vec::with_capacity(1024);
1218 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1219
1220 let original = RecordBatch::try_new(
1221 schema,
1222 vec![
1223 Arc::new(UInt32Array::from_iter_values([
1224 0,
1225 i32::MAX as u32,
1226 u32::MAX,
1227 ])),
1228 Arc::new(UInt64Array::from_iter_values([
1229 0,
1230 i64::MAX as u64,
1231 u64::MAX,
1232 ])),
1233 ],
1234 )
1235 .unwrap();
1236
1237 writer.write(&original).unwrap();
1238 writer.close().unwrap();
1239
1240 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1241 let ret = reader.next().unwrap().unwrap();
1242 assert_eq!(ret, original);
1243
1244 ret.column(0)
1246 .as_any()
1247 .downcast_ref::<UInt32Array>()
1248 .unwrap();
1249
1250 ret.column(1)
1251 .as_any()
1252 .downcast_ref::<UInt64Array>()
1253 .unwrap();
1254 }
1255
1256 #[test]
1257 fn test_float16_roundtrip() -> Result<()> {
1258 let schema = Arc::new(Schema::new(vec![
1259 Field::new("float16", ArrowDataType::Float16, false),
1260 Field::new("float16-nullable", ArrowDataType::Float16, true),
1261 ]));
1262
1263 let mut buf = Vec::with_capacity(1024);
1264 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1265
1266 let original = RecordBatch::try_new(
1267 schema,
1268 vec![
1269 Arc::new(Float16Array::from_iter_values([
1270 f16::EPSILON,
1271 f16::MIN,
1272 f16::MAX,
1273 f16::NAN,
1274 f16::INFINITY,
1275 f16::NEG_INFINITY,
1276 f16::ONE,
1277 f16::NEG_ONE,
1278 f16::ZERO,
1279 f16::NEG_ZERO,
1280 f16::E,
1281 f16::PI,
1282 f16::FRAC_1_PI,
1283 ])),
1284 Arc::new(Float16Array::from(vec![
1285 None,
1286 None,
1287 None,
1288 Some(f16::NAN),
1289 Some(f16::INFINITY),
1290 Some(f16::NEG_INFINITY),
1291 None,
1292 None,
1293 None,
1294 None,
1295 None,
1296 None,
1297 Some(f16::FRAC_1_PI),
1298 ])),
1299 ],
1300 )?;
1301
1302 writer.write(&original)?;
1303 writer.close()?;
1304
1305 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1306 let ret = reader.next().unwrap()?;
1307 assert_eq!(ret, original);
1308
1309 ret.column(0).as_primitive::<Float16Type>();
1311 ret.column(1).as_primitive::<Float16Type>();
1312
1313 Ok(())
1314 }
1315
1316 #[test]
1317 fn test_time_utc_roundtrip() -> Result<()> {
1318 let schema = Arc::new(Schema::new(vec![
1319 Field::new(
1320 "time_millis",
1321 ArrowDataType::Time32(TimeUnit::Millisecond),
1322 true,
1323 )
1324 .with_metadata(HashMap::from_iter(vec![(
1325 "adjusted_to_utc".to_string(),
1326 "".to_string(),
1327 )])),
1328 Field::new(
1329 "time_micros",
1330 ArrowDataType::Time64(TimeUnit::Microsecond),
1331 true,
1332 )
1333 .with_metadata(HashMap::from_iter(vec![(
1334 "adjusted_to_utc".to_string(),
1335 "".to_string(),
1336 )])),
1337 ]));
1338
1339 let mut buf = Vec::with_capacity(1024);
1340 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1341
1342 let original = RecordBatch::try_new(
1343 schema,
1344 vec![
1345 Arc::new(Time32MillisecondArray::from(vec![
1346 Some(-1),
1347 Some(0),
1348 Some(86_399_000),
1349 Some(86_400_000),
1350 Some(86_401_000),
1351 None,
1352 ])),
1353 Arc::new(Time64MicrosecondArray::from(vec![
1354 Some(-1),
1355 Some(0),
1356 Some(86_399 * 1_000_000),
1357 Some(86_400 * 1_000_000),
1358 Some(86_401 * 1_000_000),
1359 None,
1360 ])),
1361 ],
1362 )?;
1363
1364 writer.write(&original)?;
1365 writer.close()?;
1366
1367 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1368 let ret = reader.next().unwrap()?;
1369 assert_eq!(ret, original);
1370
1371 ret.column(0).as_primitive::<Time32MillisecondType>();
1373 ret.column(1).as_primitive::<Time64MicrosecondType>();
1374
1375 Ok(())
1376 }
1377
1378 #[test]
1379 fn test_date32_roundtrip() -> Result<()> {
1380 use arrow_array::Date32Array;
1381
1382 let schema = Arc::new(Schema::new(vec![Field::new(
1383 "date32",
1384 ArrowDataType::Date32,
1385 false,
1386 )]));
1387
1388 let mut buf = Vec::with_capacity(1024);
1389
1390 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1391
1392 let original = RecordBatch::try_new(
1393 schema,
1394 vec![Arc::new(Date32Array::from(vec![
1395 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1396 ]))],
1397 )?;
1398
1399 writer.write(&original)?;
1400 writer.close()?;
1401
1402 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1403 let ret = reader.next().unwrap()?;
1404 assert_eq!(ret, original);
1405
1406 ret.column(0).as_primitive::<Date32Type>();
1408
1409 Ok(())
1410 }
1411
1412 #[test]
1413 fn test_date64_roundtrip() -> Result<()> {
1414 use arrow_array::Date64Array;
1415
1416 let schema = Arc::new(Schema::new(vec![
1417 Field::new("small-date64", ArrowDataType::Date64, false),
1418 Field::new("big-date64", ArrowDataType::Date64, false),
1419 Field::new("invalid-date64", ArrowDataType::Date64, false),
1420 ]));
1421
1422 let mut default_buf = Vec::with_capacity(1024);
1423 let mut coerce_buf = Vec::with_capacity(1024);
1424
1425 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1426
1427 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1428 let mut coerce_writer =
1429 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1430
1431 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1432
1433 let original = RecordBatch::try_new(
1434 schema,
1435 vec![
1436 Arc::new(Date64Array::from(vec![
1438 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1439 -1_000 * NUM_MILLISECONDS_IN_DAY,
1440 0,
1441 1_000 * NUM_MILLISECONDS_IN_DAY,
1442 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1443 ])),
1444 Arc::new(Date64Array::from(vec![
1446 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1447 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1448 0,
1449 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1450 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1451 ])),
1452 Arc::new(Date64Array::from(vec![
1454 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1455 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1456 1,
1457 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1458 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1459 ])),
1460 ],
1461 )?;
1462
1463 default_writer.write(&original)?;
1464 coerce_writer.write(&original)?;
1465
1466 default_writer.close()?;
1467 coerce_writer.close()?;
1468
1469 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1470 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1471
1472 let default_ret = default_reader.next().unwrap()?;
1473 let coerce_ret = coerce_reader.next().unwrap()?;
1474
1475 assert_eq!(default_ret, original);
1477
1478 assert_eq!(coerce_ret.column(0), original.column(0));
1480 assert_ne!(coerce_ret.column(1), original.column(1));
1481 assert_ne!(coerce_ret.column(2), original.column(2));
1482
1483 default_ret.column(0).as_primitive::<Date64Type>();
1485 coerce_ret.column(0).as_primitive::<Date64Type>();
1486
1487 Ok(())
1488 }
1489 struct RandFixedLenGen {}
1490
1491 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1492 fn gen(len: i32) -> FixedLenByteArray {
1493 let mut v = vec![0u8; len as usize];
1494 rng().fill_bytes(&mut v);
1495 ByteArray::from(v).into()
1496 }
1497 }
1498
1499 #[test]
1500 fn test_fixed_length_binary_column_reader() {
1501 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1502 20,
1503 ConvertedType::NONE,
1504 None,
1505 |vals| {
1506 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1507 for val in vals {
1508 match val {
1509 Some(b) => builder.append_value(b).unwrap(),
1510 None => builder.append_null(),
1511 }
1512 }
1513 Arc::new(builder.finish())
1514 },
1515 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1516 );
1517 }
1518
1519 #[test]
1520 fn test_interval_day_time_column_reader() {
1521 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1522 12,
1523 ConvertedType::INTERVAL,
1524 None,
1525 |vals| {
1526 Arc::new(
1527 vals.iter()
1528 .map(|x| {
1529 x.as_ref().map(|b| IntervalDayTime {
1530 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1531 milliseconds: i32::from_le_bytes(
1532 b.as_ref()[8..12].try_into().unwrap(),
1533 ),
1534 })
1535 })
1536 .collect::<IntervalDayTimeArray>(),
1537 )
1538 },
1539 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1540 );
1541 }
1542
1543 #[test]
1544 fn test_int96_single_column_reader_test() {
1545 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1546
1547 type TypeHintAndConversionFunction =
1548 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1549
1550 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1551 (None, |vals: &[Option<Int96>]| {
1553 Arc::new(TimestampNanosecondArray::from_iter(
1554 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1555 )) as ArrayRef
1556 }),
1557 (
1559 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1560 |vals: &[Option<Int96>]| {
1561 Arc::new(TimestampSecondArray::from_iter(
1562 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1563 )) as ArrayRef
1564 },
1565 ),
1566 (
1567 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1568 |vals: &[Option<Int96>]| {
1569 Arc::new(TimestampMillisecondArray::from_iter(
1570 vals.iter().map(|x| x.map(|x| x.to_millis())),
1571 )) as ArrayRef
1572 },
1573 ),
1574 (
1575 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1576 |vals: &[Option<Int96>]| {
1577 Arc::new(TimestampMicrosecondArray::from_iter(
1578 vals.iter().map(|x| x.map(|x| x.to_micros())),
1579 )) as ArrayRef
1580 },
1581 ),
1582 (
1583 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1584 |vals: &[Option<Int96>]| {
1585 Arc::new(TimestampNanosecondArray::from_iter(
1586 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1587 )) as ArrayRef
1588 },
1589 ),
1590 (
1592 Some(ArrowDataType::Timestamp(
1593 TimeUnit::Second,
1594 Some(Arc::from("-05:00")),
1595 )),
1596 |vals: &[Option<Int96>]| {
1597 Arc::new(
1598 TimestampSecondArray::from_iter(
1599 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1600 )
1601 .with_timezone("-05:00"),
1602 ) as ArrayRef
1603 },
1604 ),
1605 ];
1606
1607 resolutions.iter().for_each(|(arrow_type, converter)| {
1608 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1609 2,
1610 ConvertedType::NONE,
1611 arrow_type.clone(),
1612 converter,
1613 encodings,
1614 );
1615 })
1616 }
1617
1618 #[test]
1619 fn test_int96_from_spark_file_with_provided_schema() {
1620 use arrow_schema::DataType::Timestamp;
1624 let test_data = arrow::util::test_util::parquet_test_data();
1625 let path = format!("{test_data}/int96_from_spark.parquet");
1626 let file = File::open(path).unwrap();
1627
1628 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1629 "a",
1630 Timestamp(TimeUnit::Microsecond, None),
1631 true,
1632 )]));
1633 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1634
1635 let mut record_reader =
1636 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1637 .unwrap()
1638 .build()
1639 .unwrap();
1640
1641 let batch = record_reader.next().unwrap().unwrap();
1642 assert_eq!(batch.num_columns(), 1);
1643 let column = batch.column(0);
1644 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1645
1646 let expected = Arc::new(Int64Array::from(vec![
1647 Some(1704141296123456),
1648 Some(1704070800000000),
1649 Some(253402225200000000),
1650 Some(1735599600000000),
1651 None,
1652 Some(9089380393200000000),
1653 ]));
1654
1655 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1660 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1661
1662 assert_eq!(casted_timestamps.len(), expected.len());
1663
1664 casted_timestamps
1665 .iter()
1666 .zip(expected.iter())
1667 .for_each(|(lhs, rhs)| {
1668 assert_eq!(lhs, rhs);
1669 });
1670 }
1671
1672 #[test]
1673 fn test_int96_from_spark_file_without_provided_schema() {
1674 use arrow_schema::DataType::Timestamp;
1678 let test_data = arrow::util::test_util::parquet_test_data();
1679 let path = format!("{test_data}/int96_from_spark.parquet");
1680 let file = File::open(path).unwrap();
1681
1682 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1683 .unwrap()
1684 .build()
1685 .unwrap();
1686
1687 let batch = record_reader.next().unwrap().unwrap();
1688 assert_eq!(batch.num_columns(), 1);
1689 let column = batch.column(0);
1690 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1691
1692 let expected = Arc::new(Int64Array::from(vec![
1693 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1698 Some(-4864435138808946688), ]));
1700
1701 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1706 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1707
1708 assert_eq!(casted_timestamps.len(), expected.len());
1709
1710 casted_timestamps
1711 .iter()
1712 .zip(expected.iter())
1713 .for_each(|(lhs, rhs)| {
1714 assert_eq!(lhs, rhs);
1715 });
1716 }
1717
1718 struct RandUtf8Gen {}
1719
1720 impl RandGen<ByteArrayType> for RandUtf8Gen {
1721 fn gen(len: i32) -> ByteArray {
1722 Int32Type::gen(len).to_string().as_str().into()
1723 }
1724 }
1725
1726 #[test]
1727 fn test_utf8_single_column_reader_test() {
1728 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1729 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1730 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1731 })))
1732 }
1733
1734 let encodings = &[
1735 Encoding::PLAIN,
1736 Encoding::RLE_DICTIONARY,
1737 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1738 Encoding::DELTA_BYTE_ARRAY,
1739 ];
1740
1741 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1742 2,
1743 ConvertedType::NONE,
1744 None,
1745 |vals| {
1746 Arc::new(BinaryArray::from_iter(
1747 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1748 ))
1749 },
1750 encodings,
1751 );
1752
1753 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1754 2,
1755 ConvertedType::UTF8,
1756 None,
1757 string_converter::<i32>,
1758 encodings,
1759 );
1760
1761 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1762 2,
1763 ConvertedType::UTF8,
1764 Some(ArrowDataType::Utf8),
1765 string_converter::<i32>,
1766 encodings,
1767 );
1768
1769 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1770 2,
1771 ConvertedType::UTF8,
1772 Some(ArrowDataType::LargeUtf8),
1773 string_converter::<i64>,
1774 encodings,
1775 );
1776
1777 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1778 for key in &small_key_types {
1779 for encoding in encodings {
1780 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1781 opts.encoding = *encoding;
1782
1783 let data_type =
1784 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1785
1786 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1788 opts,
1789 2,
1790 ConvertedType::UTF8,
1791 Some(data_type.clone()),
1792 move |vals| {
1793 let vals = string_converter::<i32>(vals);
1794 arrow::compute::cast(&vals, &data_type).unwrap()
1795 },
1796 );
1797 }
1798 }
1799
1800 let key_types = [
1801 ArrowDataType::Int16,
1802 ArrowDataType::UInt16,
1803 ArrowDataType::Int32,
1804 ArrowDataType::UInt32,
1805 ArrowDataType::Int64,
1806 ArrowDataType::UInt64,
1807 ];
1808
1809 for key in &key_types {
1810 let data_type =
1811 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1812
1813 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1814 2,
1815 ConvertedType::UTF8,
1816 Some(data_type.clone()),
1817 move |vals| {
1818 let vals = string_converter::<i32>(vals);
1819 arrow::compute::cast(&vals, &data_type).unwrap()
1820 },
1821 encodings,
1822 );
1823
1824 }
1841 }
1842
1843 #[test]
1844 fn test_decimal_nullable_struct() {
1845 let decimals = Decimal256Array::from_iter_values(
1846 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1847 );
1848
1849 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1850 "decimals",
1851 decimals.data_type().clone(),
1852 false,
1853 )])))
1854 .len(8)
1855 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1856 .child_data(vec![decimals.into_data()])
1857 .build()
1858 .unwrap();
1859
1860 let written =
1861 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1862 .unwrap();
1863
1864 let mut buffer = Vec::with_capacity(1024);
1865 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1866 writer.write(&written).unwrap();
1867 writer.close().unwrap();
1868
1869 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1870 .unwrap()
1871 .collect::<Result<Vec<_>, _>>()
1872 .unwrap();
1873
1874 assert_eq!(&written.slice(0, 3), &read[0]);
1875 assert_eq!(&written.slice(3, 3), &read[1]);
1876 assert_eq!(&written.slice(6, 2), &read[2]);
1877 }
1878
1879 #[test]
1880 fn test_int32_nullable_struct() {
1881 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1882 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1883 "int32",
1884 int32.data_type().clone(),
1885 false,
1886 )])))
1887 .len(8)
1888 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1889 .child_data(vec![int32.into_data()])
1890 .build()
1891 .unwrap();
1892
1893 let written =
1894 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1895 .unwrap();
1896
1897 let mut buffer = Vec::with_capacity(1024);
1898 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1899 writer.write(&written).unwrap();
1900 writer.close().unwrap();
1901
1902 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1903 .unwrap()
1904 .collect::<Result<Vec<_>, _>>()
1905 .unwrap();
1906
1907 assert_eq!(&written.slice(0, 3), &read[0]);
1908 assert_eq!(&written.slice(3, 3), &read[1]);
1909 assert_eq!(&written.slice(6, 2), &read[2]);
1910 }
1911
1912 #[test]
1913 fn test_decimal_list() {
1914 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1915
1916 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1918 decimals.data_type().clone(),
1919 false,
1920 ))))
1921 .len(7)
1922 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1923 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1924 .child_data(vec![decimals.into_data()])
1925 .build()
1926 .unwrap();
1927
1928 let written =
1929 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1930 .unwrap();
1931
1932 let mut buffer = Vec::with_capacity(1024);
1933 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1934 writer.write(&written).unwrap();
1935 writer.close().unwrap();
1936
1937 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1938 .unwrap()
1939 .collect::<Result<Vec<_>, _>>()
1940 .unwrap();
1941
1942 assert_eq!(&written.slice(0, 3), &read[0]);
1943 assert_eq!(&written.slice(3, 3), &read[1]);
1944 assert_eq!(&written.slice(6, 1), &read[2]);
1945 }
1946
1947 #[test]
1948 fn test_read_decimal_file() {
1949 use arrow_array::Decimal128Array;
1950 let testdata = arrow::util::test_util::parquet_test_data();
1951 let file_variants = vec![
1952 ("byte_array", 4),
1953 ("fixed_length", 25),
1954 ("int32", 4),
1955 ("int64", 10),
1956 ];
1957 for (prefix, target_precision) in file_variants {
1958 let path = format!("{testdata}/{prefix}_decimal.parquet");
1959 let file = File::open(path).unwrap();
1960 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1961
1962 let batch = record_reader.next().unwrap().unwrap();
1963 assert_eq!(batch.num_rows(), 24);
1964 let col = batch
1965 .column(0)
1966 .as_any()
1967 .downcast_ref::<Decimal128Array>()
1968 .unwrap();
1969
1970 let expected = 1..25;
1971
1972 assert_eq!(col.precision(), target_precision);
1973 assert_eq!(col.scale(), 2);
1974
1975 for (i, v) in expected.enumerate() {
1976 assert_eq!(col.value(i), v * 100_i128);
1977 }
1978 }
1979 }
1980
1981 #[test]
1982 fn test_read_float16_nonzeros_file() {
1983 use arrow_array::Float16Array;
1984 let testdata = arrow::util::test_util::parquet_test_data();
1985 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1987 let file = File::open(path).unwrap();
1988 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1989
1990 let batch = record_reader.next().unwrap().unwrap();
1991 assert_eq!(batch.num_rows(), 8);
1992 let col = batch
1993 .column(0)
1994 .as_any()
1995 .downcast_ref::<Float16Array>()
1996 .unwrap();
1997
1998 let f16_two = f16::ONE + f16::ONE;
1999
2000 assert_eq!(col.null_count(), 1);
2001 assert!(col.is_null(0));
2002 assert_eq!(col.value(1), f16::ONE);
2003 assert_eq!(col.value(2), -f16_two);
2004 assert!(col.value(3).is_nan());
2005 assert_eq!(col.value(4), f16::ZERO);
2006 assert!(col.value(4).is_sign_positive());
2007 assert_eq!(col.value(5), f16::NEG_ONE);
2008 assert_eq!(col.value(6), f16::NEG_ZERO);
2009 assert!(col.value(6).is_sign_negative());
2010 assert_eq!(col.value(7), f16_two);
2011 }
2012
2013 #[test]
2014 fn test_read_float16_zeros_file() {
2015 use arrow_array::Float16Array;
2016 let testdata = arrow::util::test_util::parquet_test_data();
2017 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2019 let file = File::open(path).unwrap();
2020 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2021
2022 let batch = record_reader.next().unwrap().unwrap();
2023 assert_eq!(batch.num_rows(), 3);
2024 let col = batch
2025 .column(0)
2026 .as_any()
2027 .downcast_ref::<Float16Array>()
2028 .unwrap();
2029
2030 assert_eq!(col.null_count(), 1);
2031 assert!(col.is_null(0));
2032 assert_eq!(col.value(1), f16::ZERO);
2033 assert!(col.value(1).is_sign_positive());
2034 assert!(col.value(2).is_nan());
2035 }
2036
2037 #[test]
2038 fn test_read_float32_float64_byte_stream_split() {
2039 let path = format!(
2040 "{}/byte_stream_split.zstd.parquet",
2041 arrow::util::test_util::parquet_test_data(),
2042 );
2043 let file = File::open(path).unwrap();
2044 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2045
2046 let mut row_count = 0;
2047 for batch in record_reader {
2048 let batch = batch.unwrap();
2049 row_count += batch.num_rows();
2050 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2051 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2052
2053 for &x in f32_col.values() {
2055 assert!(x > -10.0);
2056 assert!(x < 10.0);
2057 }
2058 for &x in f64_col.values() {
2059 assert!(x > -10.0);
2060 assert!(x < 10.0);
2061 }
2062 }
2063 assert_eq!(row_count, 300);
2064 }
2065
2066 #[test]
2067 fn test_read_extended_byte_stream_split() {
2068 let path = format!(
2069 "{}/byte_stream_split_extended.gzip.parquet",
2070 arrow::util::test_util::parquet_test_data(),
2071 );
2072 let file = File::open(path).unwrap();
2073 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2074
2075 let mut row_count = 0;
2076 for batch in record_reader {
2077 let batch = batch.unwrap();
2078 row_count += batch.num_rows();
2079
2080 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2082 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2083 assert_eq!(f16_col.len(), f16_bss.len());
2084 f16_col
2085 .iter()
2086 .zip(f16_bss.iter())
2087 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2088
2089 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2091 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2092 assert_eq!(f32_col.len(), f32_bss.len());
2093 f32_col
2094 .iter()
2095 .zip(f32_bss.iter())
2096 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2097
2098 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2100 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2101 assert_eq!(f64_col.len(), f64_bss.len());
2102 f64_col
2103 .iter()
2104 .zip(f64_bss.iter())
2105 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2106
2107 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2109 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2110 assert_eq!(i32_col.len(), i32_bss.len());
2111 i32_col
2112 .iter()
2113 .zip(i32_bss.iter())
2114 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2115
2116 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2118 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2119 assert_eq!(i64_col.len(), i64_bss.len());
2120 i64_col
2121 .iter()
2122 .zip(i64_bss.iter())
2123 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2124
2125 let flba_col = batch.column(10).as_fixed_size_binary();
2127 let flba_bss = batch.column(11).as_fixed_size_binary();
2128 assert_eq!(flba_col.len(), flba_bss.len());
2129 flba_col
2130 .iter()
2131 .zip(flba_bss.iter())
2132 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2133
2134 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2136 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2137 assert_eq!(dec_col.len(), dec_bss.len());
2138 dec_col
2139 .iter()
2140 .zip(dec_bss.iter())
2141 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2142 }
2143 assert_eq!(row_count, 200);
2144 }
2145
2146 #[test]
2147 fn test_read_incorrect_map_schema_file() {
2148 let testdata = arrow::util::test_util::parquet_test_data();
2149 let path = format!("{testdata}/incorrect_map_schema.parquet");
2151 let file = File::open(path).unwrap();
2152 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2153
2154 let batch = record_reader.next().unwrap().unwrap();
2155 assert_eq!(batch.num_rows(), 1);
2156
2157 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2158 "my_map",
2159 ArrowDataType::Map(
2160 Arc::new(Field::new(
2161 "key_value",
2162 ArrowDataType::Struct(Fields::from(vec![
2163 Field::new("key", ArrowDataType::Utf8, false),
2164 Field::new("value", ArrowDataType::Utf8, true),
2165 ])),
2166 false,
2167 )),
2168 false,
2169 ),
2170 true,
2171 )]));
2172 assert_eq!(batch.schema().as_ref(), &expected_schema);
2173
2174 assert_eq!(batch.num_rows(), 1);
2175 assert_eq!(batch.column(0).null_count(), 0);
2176 assert_eq!(
2177 batch.column(0).as_map().keys().as_ref(),
2178 &StringArray::from(vec!["parent", "name"])
2179 );
2180 assert_eq!(
2181 batch.column(0).as_map().values().as_ref(),
2182 &StringArray::from(vec!["another", "report"])
2183 );
2184 }
2185
2186 #[derive(Clone)]
2188 struct TestOptions {
2189 num_row_groups: usize,
2192 num_rows: usize,
2194 record_batch_size: usize,
2196 null_percent: Option<usize>,
2198 write_batch_size: usize,
2203 max_data_page_size: usize,
2205 max_dict_page_size: usize,
2207 writer_version: WriterVersion,
2209 enabled_statistics: EnabledStatistics,
2211 encoding: Encoding,
2213 row_selections: Option<(RowSelection, usize)>,
2215 row_filter: Option<Vec<bool>>,
2217 limit: Option<usize>,
2219 offset: Option<usize>,
2221 }
2222
2223 impl std::fmt::Debug for TestOptions {
2225 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2226 f.debug_struct("TestOptions")
2227 .field("num_row_groups", &self.num_row_groups)
2228 .field("num_rows", &self.num_rows)
2229 .field("record_batch_size", &self.record_batch_size)
2230 .field("null_percent", &self.null_percent)
2231 .field("write_batch_size", &self.write_batch_size)
2232 .field("max_data_page_size", &self.max_data_page_size)
2233 .field("max_dict_page_size", &self.max_dict_page_size)
2234 .field("writer_version", &self.writer_version)
2235 .field("enabled_statistics", &self.enabled_statistics)
2236 .field("encoding", &self.encoding)
2237 .field("row_selections", &self.row_selections.is_some())
2238 .field("row_filter", &self.row_filter.is_some())
2239 .field("limit", &self.limit)
2240 .field("offset", &self.offset)
2241 .finish()
2242 }
2243 }
2244
2245 impl Default for TestOptions {
2246 fn default() -> Self {
2247 Self {
2248 num_row_groups: 2,
2249 num_rows: 100,
2250 record_batch_size: 15,
2251 null_percent: None,
2252 write_batch_size: 64,
2253 max_data_page_size: 1024 * 1024,
2254 max_dict_page_size: 1024 * 1024,
2255 writer_version: WriterVersion::PARQUET_1_0,
2256 enabled_statistics: EnabledStatistics::Page,
2257 encoding: Encoding::PLAIN,
2258 row_selections: None,
2259 row_filter: None,
2260 limit: None,
2261 offset: None,
2262 }
2263 }
2264 }
2265
2266 impl TestOptions {
2267 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2268 Self {
2269 num_row_groups,
2270 num_rows,
2271 record_batch_size,
2272 ..Default::default()
2273 }
2274 }
2275
2276 fn with_null_percent(self, null_percent: usize) -> Self {
2277 Self {
2278 null_percent: Some(null_percent),
2279 ..self
2280 }
2281 }
2282
2283 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2284 Self {
2285 max_data_page_size,
2286 ..self
2287 }
2288 }
2289
2290 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2291 Self {
2292 max_dict_page_size,
2293 ..self
2294 }
2295 }
2296
2297 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2298 Self {
2299 enabled_statistics,
2300 ..self
2301 }
2302 }
2303
2304 fn with_row_selections(self) -> Self {
2305 assert!(self.row_filter.is_none(), "Must set row selection first");
2306
2307 let mut rng = rng();
2308 let step = rng.random_range(self.record_batch_size..self.num_rows);
2309 let row_selections = create_test_selection(
2310 step,
2311 self.num_row_groups * self.num_rows,
2312 rng.random::<bool>(),
2313 );
2314 Self {
2315 row_selections: Some(row_selections),
2316 ..self
2317 }
2318 }
2319
2320 fn with_row_filter(self) -> Self {
2321 let row_count = match &self.row_selections {
2322 Some((_, count)) => *count,
2323 None => self.num_row_groups * self.num_rows,
2324 };
2325
2326 let mut rng = rng();
2327 Self {
2328 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2329 ..self
2330 }
2331 }
2332
2333 fn with_limit(self, limit: usize) -> Self {
2334 Self {
2335 limit: Some(limit),
2336 ..self
2337 }
2338 }
2339
2340 fn with_offset(self, offset: usize) -> Self {
2341 Self {
2342 offset: Some(offset),
2343 ..self
2344 }
2345 }
2346
2347 fn writer_props(&self) -> WriterProperties {
2348 let builder = WriterProperties::builder()
2349 .set_data_page_size_limit(self.max_data_page_size)
2350 .set_write_batch_size(self.write_batch_size)
2351 .set_writer_version(self.writer_version)
2352 .set_statistics_enabled(self.enabled_statistics);
2353
2354 let builder = match self.encoding {
2355 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2356 .set_dictionary_enabled(true)
2357 .set_dictionary_page_size_limit(self.max_dict_page_size),
2358 _ => builder
2359 .set_dictionary_enabled(false)
2360 .set_encoding(self.encoding),
2361 };
2362
2363 builder.build()
2364 }
2365 }
2366
2367 fn run_single_column_reader_tests<T, F, G>(
2374 rand_max: i32,
2375 converted_type: ConvertedType,
2376 arrow_type: Option<ArrowDataType>,
2377 converter: F,
2378 encodings: &[Encoding],
2379 ) where
2380 T: DataType,
2381 G: RandGen<T>,
2382 F: Fn(&[Option<T::T>]) -> ArrayRef,
2383 {
2384 let all_options = vec![
2385 TestOptions::new(2, 100, 15),
2388 TestOptions::new(3, 25, 5),
2393 TestOptions::new(4, 100, 25),
2397 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2399 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2401 TestOptions::new(2, 256, 127).with_null_percent(0),
2403 TestOptions::new(2, 256, 93).with_null_percent(25),
2405 TestOptions::new(4, 100, 25).with_limit(0),
2407 TestOptions::new(4, 100, 25).with_limit(50),
2409 TestOptions::new(4, 100, 25).with_limit(10),
2411 TestOptions::new(4, 100, 25).with_limit(101),
2413 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2415 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2417 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2419 TestOptions::new(2, 256, 91)
2421 .with_null_percent(25)
2422 .with_enabled_statistics(EnabledStatistics::Chunk),
2423 TestOptions::new(2, 256, 91)
2425 .with_null_percent(25)
2426 .with_enabled_statistics(EnabledStatistics::None),
2427 TestOptions::new(2, 128, 91)
2429 .with_null_percent(100)
2430 .with_enabled_statistics(EnabledStatistics::None),
2431 TestOptions::new(2, 100, 15).with_row_selections(),
2436 TestOptions::new(3, 25, 5).with_row_selections(),
2441 TestOptions::new(4, 100, 25).with_row_selections(),
2445 TestOptions::new(3, 256, 73)
2447 .with_max_data_page_size(128)
2448 .with_row_selections(),
2449 TestOptions::new(3, 256, 57)
2451 .with_max_dict_page_size(128)
2452 .with_row_selections(),
2453 TestOptions::new(2, 256, 127)
2455 .with_null_percent(0)
2456 .with_row_selections(),
2457 TestOptions::new(2, 256, 93)
2459 .with_null_percent(25)
2460 .with_row_selections(),
2461 TestOptions::new(2, 256, 93)
2463 .with_null_percent(25)
2464 .with_row_selections()
2465 .with_limit(10),
2466 TestOptions::new(2, 256, 93)
2468 .with_null_percent(25)
2469 .with_row_selections()
2470 .with_offset(20)
2471 .with_limit(10),
2472 TestOptions::new(4, 100, 25).with_row_filter(),
2476 TestOptions::new(4, 100, 25)
2478 .with_row_selections()
2479 .with_row_filter(),
2480 TestOptions::new(2, 256, 93)
2482 .with_null_percent(25)
2483 .with_max_data_page_size(10)
2484 .with_row_filter(),
2485 TestOptions::new(2, 256, 93)
2487 .with_null_percent(25)
2488 .with_max_data_page_size(10)
2489 .with_row_selections()
2490 .with_row_filter(),
2491 TestOptions::new(2, 256, 93)
2493 .with_enabled_statistics(EnabledStatistics::None)
2494 .with_max_data_page_size(10)
2495 .with_row_selections(),
2496 ];
2497
2498 all_options.into_iter().for_each(|opts| {
2499 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2500 for encoding in encodings {
2501 let opts = TestOptions {
2502 writer_version,
2503 encoding: *encoding,
2504 ..opts.clone()
2505 };
2506
2507 single_column_reader_test::<T, _, G>(
2508 opts,
2509 rand_max,
2510 converted_type,
2511 arrow_type.clone(),
2512 &converter,
2513 )
2514 }
2515 }
2516 });
2517 }
2518
2519 fn single_column_reader_test<T, F, G>(
2523 opts: TestOptions,
2524 rand_max: i32,
2525 converted_type: ConvertedType,
2526 arrow_type: Option<ArrowDataType>,
2527 converter: F,
2528 ) where
2529 T: DataType,
2530 G: RandGen<T>,
2531 F: Fn(&[Option<T::T>]) -> ArrayRef,
2532 {
2533 println!(
2535 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2536 T::get_physical_type(), converted_type, arrow_type, opts
2537 );
2538
2539 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2541 Some(null_percent) => {
2542 let mut rng = rng();
2543
2544 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2545 .map(|_| {
2546 std::iter::from_fn(|| {
2547 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2548 })
2549 .take(opts.num_rows)
2550 .collect()
2551 })
2552 .collect();
2553 (Repetition::OPTIONAL, Some(def_levels))
2554 }
2555 None => (Repetition::REQUIRED, None),
2556 };
2557
2558 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2560 .map(|idx| {
2561 let null_count = match def_levels.as_ref() {
2562 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2563 None => 0,
2564 };
2565 G::gen_vec(rand_max, opts.num_rows - null_count)
2566 })
2567 .collect();
2568
2569 let len = match T::get_physical_type() {
2570 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2571 crate::basic::Type::INT96 => 12,
2572 _ => -1,
2573 };
2574
2575 let fields = vec![Arc::new(
2576 Type::primitive_type_builder("leaf", T::get_physical_type())
2577 .with_repetition(repetition)
2578 .with_converted_type(converted_type)
2579 .with_length(len)
2580 .build()
2581 .unwrap(),
2582 )];
2583
2584 let schema = Arc::new(
2585 Type::group_type_builder("test_schema")
2586 .with_fields(fields)
2587 .build()
2588 .unwrap(),
2589 );
2590
2591 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2592
2593 let mut file = tempfile::tempfile().unwrap();
2594
2595 generate_single_column_file_with_data::<T>(
2596 &values,
2597 def_levels.as_ref(),
2598 file.try_clone().unwrap(), schema,
2600 arrow_field,
2601 &opts,
2602 )
2603 .unwrap();
2604
2605 file.rewind().unwrap();
2606
2607 let options = ArrowReaderOptions::new()
2608 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2609
2610 let mut builder =
2611 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2612
2613 let expected_data = match opts.row_selections {
2614 Some((selections, row_count)) => {
2615 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2616
2617 let mut skip_data: Vec<Option<T::T>> = vec![];
2618 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2619 for select in dequeue {
2620 if select.skip {
2621 without_skip_data.drain(0..select.row_count);
2622 } else {
2623 skip_data.extend(without_skip_data.drain(0..select.row_count));
2624 }
2625 }
2626 builder = builder.with_row_selection(selections);
2627
2628 assert_eq!(skip_data.len(), row_count);
2629 skip_data
2630 }
2631 None => {
2632 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2634 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2635 expected_data
2636 }
2637 };
2638
2639 let mut expected_data = match opts.row_filter {
2640 Some(filter) => {
2641 let expected_data = expected_data
2642 .into_iter()
2643 .zip(filter.iter())
2644 .filter_map(|(d, f)| f.then(|| d))
2645 .collect();
2646
2647 let mut filter_offset = 0;
2648 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2649 ProjectionMask::all(),
2650 move |b| {
2651 let array = BooleanArray::from_iter(
2652 filter
2653 .iter()
2654 .skip(filter_offset)
2655 .take(b.num_rows())
2656 .map(|x| Some(*x)),
2657 );
2658 filter_offset += b.num_rows();
2659 Ok(array)
2660 },
2661 ))]);
2662
2663 builder = builder.with_row_filter(filter);
2664 expected_data
2665 }
2666 None => expected_data,
2667 };
2668
2669 if let Some(offset) = opts.offset {
2670 builder = builder.with_offset(offset);
2671 expected_data = expected_data.into_iter().skip(offset).collect();
2672 }
2673
2674 if let Some(limit) = opts.limit {
2675 builder = builder.with_limit(limit);
2676 expected_data = expected_data.into_iter().take(limit).collect();
2677 }
2678
2679 let mut record_reader = builder
2680 .with_batch_size(opts.record_batch_size)
2681 .build()
2682 .unwrap();
2683
2684 let mut total_read = 0;
2685 loop {
2686 let maybe_batch = record_reader.next();
2687 if total_read < expected_data.len() {
2688 let end = min(total_read + opts.record_batch_size, expected_data.len());
2689 let batch = maybe_batch.unwrap().unwrap();
2690 assert_eq!(end - total_read, batch.num_rows());
2691
2692 let a = converter(&expected_data[total_read..end]);
2693 let b = Arc::clone(batch.column(0));
2694
2695 assert_eq!(a.data_type(), b.data_type());
2696 assert_eq!(a.to_data(), b.to_data());
2697 assert_eq!(
2698 a.as_any().type_id(),
2699 b.as_any().type_id(),
2700 "incorrect type ids"
2701 );
2702
2703 total_read = end;
2704 } else {
2705 assert!(maybe_batch.is_none());
2706 break;
2707 }
2708 }
2709 }
2710
2711 fn gen_expected_data<T: DataType>(
2712 def_levels: Option<&Vec<Vec<i16>>>,
2713 values: &[Vec<T::T>],
2714 ) -> Vec<Option<T::T>> {
2715 let data: Vec<Option<T::T>> = match def_levels {
2716 Some(levels) => {
2717 let mut values_iter = values.iter().flatten();
2718 levels
2719 .iter()
2720 .flatten()
2721 .map(|d| match d {
2722 1 => Some(values_iter.next().cloned().unwrap()),
2723 0 => None,
2724 _ => unreachable!(),
2725 })
2726 .collect()
2727 }
2728 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2729 };
2730 data
2731 }
2732
2733 fn generate_single_column_file_with_data<T: DataType>(
2734 values: &[Vec<T::T>],
2735 def_levels: Option<&Vec<Vec<i16>>>,
2736 file: File,
2737 schema: TypePtr,
2738 field: Option<Field>,
2739 opts: &TestOptions,
2740 ) -> Result<crate::format::FileMetaData> {
2741 let mut writer_props = opts.writer_props();
2742 if let Some(field) = field {
2743 let arrow_schema = Schema::new(vec![field]);
2744 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2745 }
2746
2747 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2748
2749 for (idx, v) in values.iter().enumerate() {
2750 let def_levels = def_levels.map(|d| d[idx].as_slice());
2751 let mut row_group_writer = writer.next_row_group()?;
2752 {
2753 let mut column_writer = row_group_writer
2754 .next_column()?
2755 .expect("Column writer is none!");
2756
2757 column_writer
2758 .typed::<T>()
2759 .write_batch(v, def_levels, None)?;
2760
2761 column_writer.close()?;
2762 }
2763 row_group_writer.close()?;
2764 }
2765
2766 writer.close()
2767 }
2768
2769 fn get_test_file(file_name: &str) -> File {
2770 let mut path = PathBuf::new();
2771 path.push(arrow::util::test_util::arrow_test_data());
2772 path.push(file_name);
2773
2774 File::open(path.as_path()).expect("File not found!")
2775 }
2776
2777 #[test]
2778 fn test_read_structs() {
2779 let testdata = arrow::util::test_util::parquet_test_data();
2783 let path = format!("{testdata}/nested_structs.rust.parquet");
2784 let file = File::open(&path).unwrap();
2785 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2786
2787 for batch in record_batch_reader {
2788 batch.unwrap();
2789 }
2790
2791 let file = File::open(&path).unwrap();
2792 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2793
2794 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2795 let projected_reader = builder
2796 .with_projection(mask)
2797 .with_batch_size(60)
2798 .build()
2799 .unwrap();
2800
2801 let expected_schema = Schema::new(vec![
2802 Field::new(
2803 "roll_num",
2804 ArrowDataType::Struct(Fields::from(vec![Field::new(
2805 "count",
2806 ArrowDataType::UInt64,
2807 false,
2808 )])),
2809 false,
2810 ),
2811 Field::new(
2812 "PC_CUR",
2813 ArrowDataType::Struct(Fields::from(vec![
2814 Field::new("mean", ArrowDataType::Int64, false),
2815 Field::new("sum", ArrowDataType::Int64, false),
2816 ])),
2817 false,
2818 ),
2819 ]);
2820
2821 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2823
2824 for batch in projected_reader {
2825 let batch = batch.unwrap();
2826 assert_eq!(batch.schema().as_ref(), &expected_schema);
2827 }
2828 }
2829
2830 #[test]
2831 fn test_read_structs_by_name() {
2833 let testdata = arrow::util::test_util::parquet_test_data();
2834 let path = format!("{testdata}/nested_structs.rust.parquet");
2835 let file = File::open(&path).unwrap();
2836 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2837
2838 for batch in record_batch_reader {
2839 batch.unwrap();
2840 }
2841
2842 let file = File::open(&path).unwrap();
2843 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2844
2845 let mask = ProjectionMask::columns(
2846 builder.parquet_schema(),
2847 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2848 );
2849 let projected_reader = builder
2850 .with_projection(mask)
2851 .with_batch_size(60)
2852 .build()
2853 .unwrap();
2854
2855 let expected_schema = Schema::new(vec![
2856 Field::new(
2857 "roll_num",
2858 ArrowDataType::Struct(Fields::from(vec![Field::new(
2859 "count",
2860 ArrowDataType::UInt64,
2861 false,
2862 )])),
2863 false,
2864 ),
2865 Field::new(
2866 "PC_CUR",
2867 ArrowDataType::Struct(Fields::from(vec![
2868 Field::new("mean", ArrowDataType::Int64, false),
2869 Field::new("sum", ArrowDataType::Int64, false),
2870 ])),
2871 false,
2872 ),
2873 ]);
2874
2875 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2876
2877 for batch in projected_reader {
2878 let batch = batch.unwrap();
2879 assert_eq!(batch.schema().as_ref(), &expected_schema);
2880 }
2881 }
2882
2883 #[test]
2884 fn test_read_maps() {
2885 let testdata = arrow::util::test_util::parquet_test_data();
2886 let path = format!("{testdata}/nested_maps.snappy.parquet");
2887 let file = File::open(path).unwrap();
2888 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2889
2890 for batch in record_batch_reader {
2891 batch.unwrap();
2892 }
2893 }
2894
2895 #[test]
2896 fn test_nested_nullability() {
2897 let message_type = "message nested {
2898 OPTIONAL Group group {
2899 REQUIRED INT32 leaf;
2900 }
2901 }";
2902
2903 let file = tempfile::tempfile().unwrap();
2904 let schema = Arc::new(parse_message_type(message_type).unwrap());
2905
2906 {
2907 let mut writer =
2909 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2910 .unwrap();
2911
2912 {
2913 let mut row_group_writer = writer.next_row_group().unwrap();
2914 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2915
2916 column_writer
2917 .typed::<Int32Type>()
2918 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2919 .unwrap();
2920
2921 column_writer.close().unwrap();
2922 row_group_writer.close().unwrap();
2923 }
2924
2925 writer.close().unwrap();
2926 }
2927
2928 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2929 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2930
2931 let reader = builder.with_projection(mask).build().unwrap();
2932
2933 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2934 "group",
2935 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2936 true,
2937 )]));
2938
2939 let batch = reader.into_iter().next().unwrap().unwrap();
2940 assert_eq!(batch.schema().as_ref(), &expected_schema);
2941 assert_eq!(batch.num_rows(), 4);
2942 assert_eq!(batch.column(0).null_count(), 2);
2943 }
2944
2945 #[test]
2946 fn test_invalid_utf8() {
2947 let data = vec![
2949 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2950 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2951 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2952 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2953 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2954 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2955 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2956 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2957 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2958 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2959 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2960 82, 49,
2961 ];
2962
2963 let file = Bytes::from(data);
2964 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2965
2966 let error = record_batch_reader.next().unwrap().unwrap_err();
2967
2968 assert!(
2969 error.to_string().contains("invalid utf-8 sequence"),
2970 "{}",
2971 error
2972 );
2973 }
2974
2975 #[test]
2976 fn test_invalid_utf8_string_array() {
2977 test_invalid_utf8_string_array_inner::<i32>();
2978 }
2979
2980 #[test]
2981 fn test_invalid_utf8_large_string_array() {
2982 test_invalid_utf8_string_array_inner::<i64>();
2983 }
2984
2985 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2986 let cases = [
2987 invalid_utf8_first_char::<O>(),
2988 invalid_utf8_first_char_long_strings::<O>(),
2989 invalid_utf8_later_char::<O>(),
2990 invalid_utf8_later_char_long_strings::<O>(),
2991 invalid_utf8_later_char_really_long_strings::<O>(),
2992 invalid_utf8_later_char_really_long_strings2::<O>(),
2993 ];
2994 for array in &cases {
2995 for encoding in STRING_ENCODINGS {
2996 let array = unsafe {
2999 GenericStringArray::<O>::new_unchecked(
3000 array.offsets().clone(),
3001 array.values().clone(),
3002 array.nulls().cloned(),
3003 )
3004 };
3005 let data_type = array.data_type().clone();
3006 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3007 let err = read_from_parquet(data).unwrap_err();
3008 let expected_err =
3009 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3010 assert!(
3011 err.to_string().contains(expected_err),
3012 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3013 );
3014 }
3015 }
3016 }
3017
3018 #[test]
3019 fn test_invalid_utf8_string_view_array() {
3020 let cases = [
3021 invalid_utf8_first_char::<i32>(),
3022 invalid_utf8_first_char_long_strings::<i32>(),
3023 invalid_utf8_later_char::<i32>(),
3024 invalid_utf8_later_char_long_strings::<i32>(),
3025 invalid_utf8_later_char_really_long_strings::<i32>(),
3026 invalid_utf8_later_char_really_long_strings2::<i32>(),
3027 ];
3028
3029 for encoding in STRING_ENCODINGS {
3030 for array in &cases {
3031 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3032 let array = array.as_binary_view();
3033
3034 let array = unsafe {
3037 StringViewArray::new_unchecked(
3038 array.views().clone(),
3039 array.data_buffers().to_vec(),
3040 array.nulls().cloned(),
3041 )
3042 };
3043
3044 let data_type = array.data_type().clone();
3045 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3046 let err = read_from_parquet(data).unwrap_err();
3047 let expected_err =
3048 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3049 assert!(
3050 err.to_string().contains(expected_err),
3051 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3052 );
3053 }
3054 }
3055 }
3056
3057 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3059 None,
3060 Some(Encoding::PLAIN),
3061 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3062 Some(Encoding::DELTA_BYTE_ARRAY),
3063 ];
3064
3065 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3068
3069 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3072
3073 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3075 let valid: &[u8] = b" ";
3076 let invalid = INVALID_UTF8_FIRST_CHAR;
3077 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3078 }
3079
3080 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3084 let valid: &[u8] = b" ";
3085 let mut invalid = vec![];
3086 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3087 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3088 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3089 }
3090
3091 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3094 let valid: &[u8] = b" ";
3095 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3096 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3097 }
3098
3099 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3103 let valid: &[u8] = b" ";
3104 let mut invalid = vec![];
3105 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3106 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3107 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3108 }
3109
3110 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3114 let valid: &[u8] = b" ";
3115 let mut invalid = vec![];
3116 for _ in 0..10 {
3117 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3119 }
3120 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3121 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3122 }
3123
3124 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3127 let valid: &[u8] = b" ";
3128 let mut valid_long = vec![];
3129 for _ in 0..10 {
3130 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3132 }
3133 let invalid = INVALID_UTF8_LATER_CHAR;
3134 GenericBinaryArray::<O>::from_iter(vec![
3135 None,
3136 Some(valid),
3137 Some(invalid),
3138 None,
3139 Some(&valid_long),
3140 Some(valid),
3141 ])
3142 }
3143
3144 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3149 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3150 let mut data = vec![];
3151 let schema = batch.schema();
3152 let props = encoding.map(|encoding| {
3153 WriterProperties::builder()
3154 .set_dictionary_enabled(false)
3156 .set_encoding(encoding)
3157 .build()
3158 });
3159
3160 {
3161 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3162 writer.write(&batch).unwrap();
3163 writer.flush().unwrap();
3164 writer.close().unwrap();
3165 };
3166 data
3167 }
3168
3169 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3171 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3172 .unwrap()
3173 .build()
3174 .unwrap();
3175
3176 reader.collect()
3177 }
3178
3179 #[test]
3180 fn test_dictionary_preservation() {
3181 let fields = vec![Arc::new(
3182 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3183 .with_repetition(Repetition::OPTIONAL)
3184 .with_converted_type(ConvertedType::UTF8)
3185 .build()
3186 .unwrap(),
3187 )];
3188
3189 let schema = Arc::new(
3190 Type::group_type_builder("test_schema")
3191 .with_fields(fields)
3192 .build()
3193 .unwrap(),
3194 );
3195
3196 let dict_type = ArrowDataType::Dictionary(
3197 Box::new(ArrowDataType::Int32),
3198 Box::new(ArrowDataType::Utf8),
3199 );
3200
3201 let arrow_field = Field::new("leaf", dict_type, true);
3202
3203 let mut file = tempfile::tempfile().unwrap();
3204
3205 let values = vec![
3206 vec![
3207 ByteArray::from("hello"),
3208 ByteArray::from("a"),
3209 ByteArray::from("b"),
3210 ByteArray::from("d"),
3211 ],
3212 vec![
3213 ByteArray::from("c"),
3214 ByteArray::from("a"),
3215 ByteArray::from("b"),
3216 ],
3217 ];
3218
3219 let def_levels = vec![
3220 vec![1, 0, 0, 1, 0, 0, 1, 1],
3221 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3222 ];
3223
3224 let opts = TestOptions {
3225 encoding: Encoding::RLE_DICTIONARY,
3226 ..Default::default()
3227 };
3228
3229 generate_single_column_file_with_data::<ByteArrayType>(
3230 &values,
3231 Some(&def_levels),
3232 file.try_clone().unwrap(), schema,
3234 Some(arrow_field),
3235 &opts,
3236 )
3237 .unwrap();
3238
3239 file.rewind().unwrap();
3240
3241 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3242
3243 let batches = record_reader
3244 .collect::<Result<Vec<RecordBatch>, _>>()
3245 .unwrap();
3246
3247 assert_eq!(batches.len(), 6);
3248 assert!(batches.iter().all(|x| x.num_columns() == 1));
3249
3250 let row_counts = batches
3251 .iter()
3252 .map(|x| (x.num_rows(), x.column(0).null_count()))
3253 .collect::<Vec<_>>();
3254
3255 assert_eq!(
3256 row_counts,
3257 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3258 );
3259
3260 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3261
3262 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3264 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3266 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3267 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3269 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3270 }
3271
3272 #[test]
3273 fn test_read_null_list() {
3274 let testdata = arrow::util::test_util::parquet_test_data();
3275 let path = format!("{testdata}/null_list.parquet");
3276 let file = File::open(path).unwrap();
3277 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3278
3279 let batch = record_batch_reader.next().unwrap().unwrap();
3280 assert_eq!(batch.num_rows(), 1);
3281 assert_eq!(batch.num_columns(), 1);
3282 assert_eq!(batch.column(0).len(), 1);
3283
3284 let list = batch
3285 .column(0)
3286 .as_any()
3287 .downcast_ref::<ListArray>()
3288 .unwrap();
3289 assert_eq!(list.len(), 1);
3290 assert!(list.is_valid(0));
3291
3292 let val = list.value(0);
3293 assert_eq!(val.len(), 0);
3294 }
3295
3296 #[test]
3297 fn test_null_schema_inference() {
3298 let testdata = arrow::util::test_util::parquet_test_data();
3299 let path = format!("{testdata}/null_list.parquet");
3300 let file = File::open(path).unwrap();
3301
3302 let arrow_field = Field::new(
3303 "emptylist",
3304 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3305 true,
3306 );
3307
3308 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3309 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3310 let schema = builder.schema();
3311 assert_eq!(schema.fields().len(), 1);
3312 assert_eq!(schema.field(0), &arrow_field);
3313 }
3314
3315 #[test]
3316 fn test_skip_metadata() {
3317 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3318 let field = Field::new("col", col.data_type().clone(), true);
3319
3320 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3321
3322 let metadata = [("key".to_string(), "value".to_string())]
3323 .into_iter()
3324 .collect();
3325
3326 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3327
3328 assert_ne!(schema_with_metadata, schema_without_metadata);
3329
3330 let batch =
3331 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3332
3333 let file = |version: WriterVersion| {
3334 let props = WriterProperties::builder()
3335 .set_writer_version(version)
3336 .build();
3337
3338 let file = tempfile().unwrap();
3339 let mut writer =
3340 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3341 .unwrap();
3342 writer.write(&batch).unwrap();
3343 writer.close().unwrap();
3344 file
3345 };
3346
3347 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3348
3349 let v1_reader = file(WriterVersion::PARQUET_1_0);
3350 let v2_reader = file(WriterVersion::PARQUET_2_0);
3351
3352 let arrow_reader =
3353 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3354 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3355
3356 let reader =
3357 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3358 .unwrap()
3359 .build()
3360 .unwrap();
3361 assert_eq!(reader.schema(), schema_without_metadata);
3362
3363 let arrow_reader =
3364 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3365 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3366
3367 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3368 .unwrap()
3369 .build()
3370 .unwrap();
3371 assert_eq!(reader.schema(), schema_without_metadata);
3372 }
3373
3374 fn write_parquet_from_iter<I, F>(value: I) -> File
3375 where
3376 I: IntoIterator<Item = (F, ArrayRef)>,
3377 F: AsRef<str>,
3378 {
3379 let batch = RecordBatch::try_from_iter(value).unwrap();
3380 let file = tempfile().unwrap();
3381 let mut writer =
3382 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3383 writer.write(&batch).unwrap();
3384 writer.close().unwrap();
3385 file
3386 }
3387
3388 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3389 where
3390 I: IntoIterator<Item = (F, ArrayRef)>,
3391 F: AsRef<str>,
3392 {
3393 let file = write_parquet_from_iter(value);
3394 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3395 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3396 file.try_clone().unwrap(),
3397 options_with_schema,
3398 );
3399 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3400 }
3401
3402 #[test]
3403 fn test_schema_too_few_columns() {
3404 run_schema_test_with_error(
3405 vec![
3406 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3407 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3408 ],
3409 Arc::new(Schema::new(vec![Field::new(
3410 "int64",
3411 ArrowDataType::Int64,
3412 false,
3413 )])),
3414 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3415 );
3416 }
3417
3418 #[test]
3419 fn test_schema_too_many_columns() {
3420 run_schema_test_with_error(
3421 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3422 Arc::new(Schema::new(vec![
3423 Field::new("int64", ArrowDataType::Int64, false),
3424 Field::new("int32", ArrowDataType::Int32, false),
3425 ])),
3426 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3427 );
3428 }
3429
3430 #[test]
3431 fn test_schema_mismatched_column_names() {
3432 run_schema_test_with_error(
3433 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3434 Arc::new(Schema::new(vec![Field::new(
3435 "other",
3436 ArrowDataType::Int64,
3437 false,
3438 )])),
3439 "Arrow: incompatible arrow schema, expected field named int64 got other",
3440 );
3441 }
3442
3443 #[test]
3444 fn test_schema_incompatible_columns() {
3445 run_schema_test_with_error(
3446 vec![
3447 (
3448 "col1_invalid",
3449 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3450 ),
3451 (
3452 "col2_valid",
3453 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3454 ),
3455 (
3456 "col3_invalid",
3457 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3458 ),
3459 ],
3460 Arc::new(Schema::new(vec![
3461 Field::new("col1_invalid", ArrowDataType::Int32, false),
3462 Field::new("col2_valid", ArrowDataType::Int32, false),
3463 Field::new("col3_invalid", ArrowDataType::Int32, false),
3464 ])),
3465 "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3466 );
3467 }
3468
3469 #[test]
3470 fn test_one_incompatible_nested_column() {
3471 let nested_fields = Fields::from(vec![
3472 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3473 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3474 ]);
3475 let nested = StructArray::try_new(
3476 nested_fields,
3477 vec![
3478 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3479 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3480 ],
3481 None,
3482 )
3483 .expect("struct array");
3484 let supplied_nested_fields = Fields::from(vec![
3485 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3486 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3487 ]);
3488 run_schema_test_with_error(
3489 vec![
3490 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3491 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3492 ("nested", Arc::new(nested) as ArrayRef),
3493 ],
3494 Arc::new(Schema::new(vec![
3495 Field::new("col1", ArrowDataType::Int64, false),
3496 Field::new("col2", ArrowDataType::Int32, false),
3497 Field::new(
3498 "nested",
3499 ArrowDataType::Struct(supplied_nested_fields),
3500 false,
3501 ),
3502 ])),
3503 "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3504 );
3505 }
3506
3507 #[test]
3508 fn test_read_binary_as_utf8() {
3509 let file = write_parquet_from_iter(vec![
3510 (
3511 "binary_to_utf8",
3512 Arc::new(BinaryArray::from(vec![
3513 b"one".as_ref(),
3514 b"two".as_ref(),
3515 b"three".as_ref(),
3516 ])) as ArrayRef,
3517 ),
3518 (
3519 "large_binary_to_large_utf8",
3520 Arc::new(LargeBinaryArray::from(vec![
3521 b"one".as_ref(),
3522 b"two".as_ref(),
3523 b"three".as_ref(),
3524 ])) as ArrayRef,
3525 ),
3526 (
3527 "binary_view_to_utf8_view",
3528 Arc::new(BinaryViewArray::from(vec![
3529 b"one".as_ref(),
3530 b"two".as_ref(),
3531 b"three".as_ref(),
3532 ])) as ArrayRef,
3533 ),
3534 ]);
3535 let supplied_fields = Fields::from(vec![
3536 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3537 Field::new(
3538 "large_binary_to_large_utf8",
3539 ArrowDataType::LargeUtf8,
3540 false,
3541 ),
3542 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3543 ]);
3544
3545 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3546 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3547 file.try_clone().unwrap(),
3548 options,
3549 )
3550 .expect("reader builder with schema")
3551 .build()
3552 .expect("reader with schema");
3553
3554 let batch = arrow_reader.next().unwrap().unwrap();
3555 assert_eq!(batch.num_columns(), 3);
3556 assert_eq!(batch.num_rows(), 3);
3557 assert_eq!(
3558 batch
3559 .column(0)
3560 .as_string::<i32>()
3561 .iter()
3562 .collect::<Vec<_>>(),
3563 vec![Some("one"), Some("two"), Some("three")]
3564 );
3565
3566 assert_eq!(
3567 batch
3568 .column(1)
3569 .as_string::<i64>()
3570 .iter()
3571 .collect::<Vec<_>>(),
3572 vec![Some("one"), Some("two"), Some("three")]
3573 );
3574
3575 assert_eq!(
3576 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3577 vec![Some("one"), Some("two"), Some("three")]
3578 );
3579 }
3580
3581 #[test]
3582 #[should_panic(expected = "Invalid UTF8 sequence at")]
3583 fn test_read_non_utf8_binary_as_utf8() {
3584 let file = write_parquet_from_iter(vec![(
3585 "non_utf8_binary",
3586 Arc::new(BinaryArray::from(vec![
3587 b"\xDE\x00\xFF".as_ref(),
3588 b"\xDE\x01\xAA".as_ref(),
3589 b"\xDE\x02\xFF".as_ref(),
3590 ])) as ArrayRef,
3591 )]);
3592 let supplied_fields = Fields::from(vec![Field::new(
3593 "non_utf8_binary",
3594 ArrowDataType::Utf8,
3595 false,
3596 )]);
3597
3598 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3599 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3600 file.try_clone().unwrap(),
3601 options,
3602 )
3603 .expect("reader builder with schema")
3604 .build()
3605 .expect("reader with schema");
3606 arrow_reader.next().unwrap().unwrap_err();
3607 }
3608
3609 #[test]
3610 fn test_with_schema() {
3611 let nested_fields = Fields::from(vec![
3612 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3613 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3614 ]);
3615
3616 let nested_arrays: Vec<ArrayRef> = vec![
3617 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3618 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3619 ];
3620
3621 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3622
3623 let file = write_parquet_from_iter(vec![
3624 (
3625 "int32_to_ts_second",
3626 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3627 ),
3628 (
3629 "date32_to_date64",
3630 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3631 ),
3632 ("nested", Arc::new(nested) as ArrayRef),
3633 ]);
3634
3635 let supplied_nested_fields = Fields::from(vec![
3636 Field::new(
3637 "utf8_to_dict",
3638 ArrowDataType::Dictionary(
3639 Box::new(ArrowDataType::Int32),
3640 Box::new(ArrowDataType::Utf8),
3641 ),
3642 false,
3643 ),
3644 Field::new(
3645 "int64_to_ts_nano",
3646 ArrowDataType::Timestamp(
3647 arrow::datatypes::TimeUnit::Nanosecond,
3648 Some("+10:00".into()),
3649 ),
3650 false,
3651 ),
3652 ]);
3653
3654 let supplied_schema = Arc::new(Schema::new(vec![
3655 Field::new(
3656 "int32_to_ts_second",
3657 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3658 false,
3659 ),
3660 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3661 Field::new(
3662 "nested",
3663 ArrowDataType::Struct(supplied_nested_fields),
3664 false,
3665 ),
3666 ]));
3667
3668 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3669 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3670 file.try_clone().unwrap(),
3671 options,
3672 )
3673 .expect("reader builder with schema")
3674 .build()
3675 .expect("reader with schema");
3676
3677 assert_eq!(arrow_reader.schema(), supplied_schema);
3678 let batch = arrow_reader.next().unwrap().unwrap();
3679 assert_eq!(batch.num_columns(), 3);
3680 assert_eq!(batch.num_rows(), 4);
3681 assert_eq!(
3682 batch
3683 .column(0)
3684 .as_any()
3685 .downcast_ref::<TimestampSecondArray>()
3686 .expect("downcast to timestamp second")
3687 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3688 .map(|v| v.to_string())
3689 .expect("value as datetime"),
3690 "1970-01-01 01:00:00 +01:00"
3691 );
3692 assert_eq!(
3693 batch
3694 .column(1)
3695 .as_any()
3696 .downcast_ref::<Date64Array>()
3697 .expect("downcast to date64")
3698 .value_as_date(0)
3699 .map(|v| v.to_string())
3700 .expect("value as date"),
3701 "1970-01-01"
3702 );
3703
3704 let nested = batch
3705 .column(2)
3706 .as_any()
3707 .downcast_ref::<StructArray>()
3708 .expect("downcast to struct");
3709
3710 let nested_dict = nested
3711 .column(0)
3712 .as_any()
3713 .downcast_ref::<Int32DictionaryArray>()
3714 .expect("downcast to dictionary");
3715
3716 assert_eq!(
3717 nested_dict
3718 .values()
3719 .as_any()
3720 .downcast_ref::<StringArray>()
3721 .expect("downcast to string")
3722 .iter()
3723 .collect::<Vec<_>>(),
3724 vec![Some("a"), Some("b")]
3725 );
3726
3727 assert_eq!(
3728 nested_dict.keys().iter().collect::<Vec<_>>(),
3729 vec![Some(0), Some(0), Some(0), Some(1)]
3730 );
3731
3732 assert_eq!(
3733 nested
3734 .column(1)
3735 .as_any()
3736 .downcast_ref::<TimestampNanosecondArray>()
3737 .expect("downcast to timestamp nanosecond")
3738 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3739 .map(|v| v.to_string())
3740 .expect("value as datetime"),
3741 "1970-01-01 10:00:00.000000001 +10:00"
3742 );
3743 }
3744
3745 #[test]
3746 fn test_empty_projection() {
3747 let testdata = arrow::util::test_util::parquet_test_data();
3748 let path = format!("{testdata}/alltypes_plain.parquet");
3749 let file = File::open(path).unwrap();
3750
3751 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3752 let file_metadata = builder.metadata().file_metadata();
3753 let expected_rows = file_metadata.num_rows() as usize;
3754
3755 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3756 let batch_reader = builder
3757 .with_projection(mask)
3758 .with_batch_size(2)
3759 .build()
3760 .unwrap();
3761
3762 let mut total_rows = 0;
3763 for maybe_batch in batch_reader {
3764 let batch = maybe_batch.unwrap();
3765 total_rows += batch.num_rows();
3766 assert_eq!(batch.num_columns(), 0);
3767 assert!(batch.num_rows() <= 2);
3768 }
3769
3770 assert_eq!(total_rows, expected_rows);
3771 }
3772
3773 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3774 let schema = Arc::new(Schema::new(vec![Field::new(
3775 "list",
3776 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3777 true,
3778 )]));
3779
3780 let mut buf = Vec::with_capacity(1024);
3781
3782 let mut writer = ArrowWriter::try_new(
3783 &mut buf,
3784 schema.clone(),
3785 Some(
3786 WriterProperties::builder()
3787 .set_max_row_group_size(row_group_size)
3788 .build(),
3789 ),
3790 )
3791 .unwrap();
3792 for _ in 0..2 {
3793 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3794 for _ in 0..(batch_size) {
3795 list_builder.append(true);
3796 }
3797 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3798 .unwrap();
3799 writer.write(&batch).unwrap();
3800 }
3801 writer.close().unwrap();
3802
3803 let mut record_reader =
3804 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3805 assert_eq!(
3806 batch_size,
3807 record_reader.next().unwrap().unwrap().num_rows()
3808 );
3809 assert_eq!(
3810 batch_size,
3811 record_reader.next().unwrap().unwrap().num_rows()
3812 );
3813 }
3814
3815 #[test]
3816 fn test_row_group_exact_multiple() {
3817 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3818 test_row_group_batch(8, 8);
3819 test_row_group_batch(10, 8);
3820 test_row_group_batch(8, 10);
3821 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3822 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3823 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3824 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3825 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3826 }
3827
3828 fn get_expected_batches(
3831 column: &RecordBatch,
3832 selection: &RowSelection,
3833 batch_size: usize,
3834 ) -> Vec<RecordBatch> {
3835 let mut expected_batches = vec![];
3836
3837 let mut selection: VecDeque<_> = selection.clone().into();
3838 let mut row_offset = 0;
3839 let mut last_start = None;
3840 while row_offset < column.num_rows() && !selection.is_empty() {
3841 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3842 while batch_remaining > 0 && !selection.is_empty() {
3843 let (to_read, skip) = match selection.front_mut() {
3844 Some(selection) if selection.row_count > batch_remaining => {
3845 selection.row_count -= batch_remaining;
3846 (batch_remaining, selection.skip)
3847 }
3848 Some(_) => {
3849 let select = selection.pop_front().unwrap();
3850 (select.row_count, select.skip)
3851 }
3852 None => break,
3853 };
3854
3855 batch_remaining -= to_read;
3856
3857 match skip {
3858 true => {
3859 if let Some(last_start) = last_start.take() {
3860 expected_batches.push(column.slice(last_start, row_offset - last_start))
3861 }
3862 row_offset += to_read
3863 }
3864 false => {
3865 last_start.get_or_insert(row_offset);
3866 row_offset += to_read
3867 }
3868 }
3869 }
3870 }
3871
3872 if let Some(last_start) = last_start.take() {
3873 expected_batches.push(column.slice(last_start, row_offset - last_start))
3874 }
3875
3876 for batch in &expected_batches[..expected_batches.len() - 1] {
3878 assert_eq!(batch.num_rows(), batch_size);
3879 }
3880
3881 expected_batches
3882 }
3883
3884 fn create_test_selection(
3885 step_len: usize,
3886 total_len: usize,
3887 skip_first: bool,
3888 ) -> (RowSelection, usize) {
3889 let mut remaining = total_len;
3890 let mut skip = skip_first;
3891 let mut vec = vec![];
3892 let mut selected_count = 0;
3893 while remaining != 0 {
3894 let step = if remaining > step_len {
3895 step_len
3896 } else {
3897 remaining
3898 };
3899 vec.push(RowSelector {
3900 row_count: step,
3901 skip,
3902 });
3903 remaining -= step;
3904 if !skip {
3905 selected_count += step;
3906 }
3907 skip = !skip;
3908 }
3909 (vec.into(), selected_count)
3910 }
3911
3912 #[test]
3913 fn test_scan_row_with_selection() {
3914 let testdata = arrow::util::test_util::parquet_test_data();
3915 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3916 let test_file = File::open(&path).unwrap();
3917
3918 let mut serial_reader =
3919 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3920 let data = serial_reader.next().unwrap().unwrap();
3921
3922 let do_test = |batch_size: usize, selection_len: usize| {
3923 for skip_first in [false, true] {
3924 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3925
3926 let expected = get_expected_batches(&data, &selections, batch_size);
3927 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3928 assert_eq!(
3929 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3930 expected,
3931 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3932 );
3933 }
3934 };
3935
3936 do_test(1000, 1000);
3939
3940 do_test(20, 20);
3942
3943 do_test(20, 5);
3945
3946 do_test(20, 5);
3949
3950 fn create_skip_reader(
3951 test_file: &File,
3952 batch_size: usize,
3953 selections: RowSelection,
3954 ) -> ParquetRecordBatchReader {
3955 let options = ArrowReaderOptions::new().with_page_index(true);
3956 let file = test_file.try_clone().unwrap();
3957 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3958 .unwrap()
3959 .with_batch_size(batch_size)
3960 .with_row_selection(selections)
3961 .build()
3962 .unwrap()
3963 }
3964 }
3965
3966 #[test]
3967 fn test_batch_size_overallocate() {
3968 let testdata = arrow::util::test_util::parquet_test_data();
3969 let path = format!("{testdata}/alltypes_plain.parquet");
3971 let test_file = File::open(path).unwrap();
3972
3973 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3974 let num_rows = builder.metadata.file_metadata().num_rows();
3975 let reader = builder
3976 .with_batch_size(1024)
3977 .with_projection(ProjectionMask::all())
3978 .build()
3979 .unwrap();
3980 assert_ne!(1024, num_rows);
3981 assert_eq!(reader.batch_size, num_rows as usize);
3982 }
3983
3984 #[test]
3985 fn test_read_with_page_index_enabled() {
3986 let testdata = arrow::util::test_util::parquet_test_data();
3987
3988 {
3989 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3991 let test_file = File::open(path).unwrap();
3992 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3993 test_file,
3994 ArrowReaderOptions::new().with_page_index(true),
3995 )
3996 .unwrap();
3997 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3998 let reader = builder.build().unwrap();
3999 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4000 assert_eq!(batches.len(), 8);
4001 }
4002
4003 {
4004 let path = format!("{testdata}/alltypes_plain.parquet");
4006 let test_file = File::open(path).unwrap();
4007 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4008 test_file,
4009 ArrowReaderOptions::new().with_page_index(true),
4010 )
4011 .unwrap();
4012 assert!(builder.metadata().offset_index().is_none());
4015 let reader = builder.build().unwrap();
4016 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4017 assert_eq!(batches.len(), 1);
4018 }
4019 }
4020
4021 #[test]
4022 fn test_raw_repetition() {
4023 const MESSAGE_TYPE: &str = "
4024 message Log {
4025 OPTIONAL INT32 eventType;
4026 REPEATED INT32 category;
4027 REPEATED group filter {
4028 OPTIONAL INT32 error;
4029 }
4030 }
4031 ";
4032 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4033 let props = Default::default();
4034
4035 let mut buf = Vec::with_capacity(1024);
4036 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4037 let mut row_group_writer = writer.next_row_group().unwrap();
4038
4039 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4041 col_writer
4042 .typed::<Int32Type>()
4043 .write_batch(&[1], Some(&[1]), None)
4044 .unwrap();
4045 col_writer.close().unwrap();
4046 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4048 col_writer
4049 .typed::<Int32Type>()
4050 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4051 .unwrap();
4052 col_writer.close().unwrap();
4053 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4055 col_writer
4056 .typed::<Int32Type>()
4057 .write_batch(&[1], Some(&[1]), Some(&[0]))
4058 .unwrap();
4059 col_writer.close().unwrap();
4060
4061 let rg_md = row_group_writer.close().unwrap();
4062 assert_eq!(rg_md.num_rows(), 1);
4063 writer.close().unwrap();
4064
4065 let bytes = Bytes::from(buf);
4066
4067 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4068 let full = no_mask.next().unwrap().unwrap();
4069
4070 assert_eq!(full.num_columns(), 3);
4071
4072 for idx in 0..3 {
4073 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4074 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4075 let mut reader = b.with_projection(mask).build().unwrap();
4076 let projected = reader.next().unwrap().unwrap();
4077
4078 assert_eq!(projected.num_columns(), 1);
4079 assert_eq!(full.column(idx), projected.column(0));
4080 }
4081 }
4082
4083 #[test]
4084 fn test_read_lz4_raw() {
4085 let testdata = arrow::util::test_util::parquet_test_data();
4086 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4087 let file = File::open(path).unwrap();
4088
4089 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4090 .unwrap()
4091 .collect::<Result<Vec<_>, _>>()
4092 .unwrap();
4093 assert_eq!(batches.len(), 1);
4094 let batch = &batches[0];
4095
4096 assert_eq!(batch.num_columns(), 3);
4097 assert_eq!(batch.num_rows(), 4);
4098
4099 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4101 assert_eq!(
4102 a.values(),
4103 &[1593604800, 1593604800, 1593604801, 1593604801]
4104 );
4105
4106 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4107 let a: Vec<_> = a.iter().flatten().collect();
4108 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4109
4110 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4111 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4112 }
4113
4114 #[test]
4124 fn test_read_lz4_hadoop_fallback() {
4125 for file in [
4126 "hadoop_lz4_compressed.parquet",
4127 "non_hadoop_lz4_compressed.parquet",
4128 ] {
4129 let testdata = arrow::util::test_util::parquet_test_data();
4130 let path = format!("{testdata}/{file}");
4131 let file = File::open(path).unwrap();
4132 let expected_rows = 4;
4133
4134 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4135 .unwrap()
4136 .collect::<Result<Vec<_>, _>>()
4137 .unwrap();
4138 assert_eq!(batches.len(), 1);
4139 let batch = &batches[0];
4140
4141 assert_eq!(batch.num_columns(), 3);
4142 assert_eq!(batch.num_rows(), expected_rows);
4143
4144 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4145 assert_eq!(
4146 a.values(),
4147 &[1593604800, 1593604800, 1593604801, 1593604801]
4148 );
4149
4150 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4151 let b: Vec<_> = b.iter().flatten().collect();
4152 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4153
4154 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4155 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4156 }
4157 }
4158
4159 #[test]
4160 fn test_read_lz4_hadoop_large() {
4161 let testdata = arrow::util::test_util::parquet_test_data();
4162 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4163 let file = File::open(path).unwrap();
4164 let expected_rows = 10000;
4165
4166 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4167 .unwrap()
4168 .collect::<Result<Vec<_>, _>>()
4169 .unwrap();
4170 assert_eq!(batches.len(), 1);
4171 let batch = &batches[0];
4172
4173 assert_eq!(batch.num_columns(), 1);
4174 assert_eq!(batch.num_rows(), expected_rows);
4175
4176 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4177 let a: Vec<_> = a.iter().flatten().collect();
4178 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4179 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4180 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4181 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4182 }
4183
4184 #[test]
4185 #[cfg(feature = "snap")]
4186 fn test_read_nested_lists() {
4187 let testdata = arrow::util::test_util::parquet_test_data();
4188 let path = format!("{testdata}/nested_lists.snappy.parquet");
4189 let file = File::open(path).unwrap();
4190
4191 let f = file.try_clone().unwrap();
4192 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4193 let expected = reader.next().unwrap().unwrap();
4194 assert_eq!(expected.num_rows(), 3);
4195
4196 let selection = RowSelection::from(vec![
4197 RowSelector::skip(1),
4198 RowSelector::select(1),
4199 RowSelector::skip(1),
4200 ]);
4201 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4202 .unwrap()
4203 .with_row_selection(selection)
4204 .build()
4205 .unwrap();
4206
4207 let actual = reader.next().unwrap().unwrap();
4208 assert_eq!(actual.num_rows(), 1);
4209 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4210 }
4211
4212 #[test]
4213 fn test_arbitrary_decimal() {
4214 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4215 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4216 .with_precision_and_scale(19, 0)
4217 .unwrap();
4218 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4219 .with_precision_and_scale(12, 0)
4220 .unwrap();
4221 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4222 .with_precision_and_scale(17, 10)
4223 .unwrap();
4224
4225 let written = RecordBatch::try_from_iter([
4226 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4227 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4228 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4229 ])
4230 .unwrap();
4231
4232 let mut buffer = Vec::with_capacity(1024);
4233 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4234 writer.write(&written).unwrap();
4235 writer.close().unwrap();
4236
4237 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4238 .unwrap()
4239 .collect::<Result<Vec<_>, _>>()
4240 .unwrap();
4241
4242 assert_eq!(&written.slice(0, 8), &read[0]);
4243 }
4244
4245 #[test]
4246 fn test_list_skip() {
4247 let mut list = ListBuilder::new(Int32Builder::new());
4248 list.append_value([Some(1), Some(2)]);
4249 list.append_value([Some(3)]);
4250 list.append_value([Some(4)]);
4251 let list = list.finish();
4252 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4253
4254 let props = WriterProperties::builder()
4256 .set_data_page_row_count_limit(1)
4257 .set_write_batch_size(2)
4258 .build();
4259
4260 let mut buffer = Vec::with_capacity(1024);
4261 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4262 writer.write(&batch).unwrap();
4263 writer.close().unwrap();
4264
4265 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4266 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4267 .unwrap()
4268 .with_row_selection(selection.into())
4269 .build()
4270 .unwrap();
4271 let out = reader.next().unwrap().unwrap();
4272 assert_eq!(out.num_rows(), 1);
4273 assert_eq!(out, batch.slice(2, 1));
4274 }
4275
4276 fn test_decimal_roundtrip<T: DecimalType>() {
4277 let d = |values: Vec<usize>, p: u8| {
4282 let iter = values.into_iter().map(T::Native::usize_as);
4283 PrimitiveArray::<T>::from_iter_values(iter)
4284 .with_precision_and_scale(p, 2)
4285 .unwrap()
4286 };
4287
4288 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4289 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4290 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4291 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4292
4293 let batch = RecordBatch::try_from_iter([
4294 ("d1", Arc::new(d1) as ArrayRef),
4295 ("d2", Arc::new(d2) as ArrayRef),
4296 ("d3", Arc::new(d3) as ArrayRef),
4297 ("d4", Arc::new(d4) as ArrayRef),
4298 ])
4299 .unwrap();
4300
4301 let mut buffer = Vec::with_capacity(1024);
4302 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4303 writer.write(&batch).unwrap();
4304 writer.close().unwrap();
4305
4306 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4307 let t1 = builder.parquet_schema().columns()[0].physical_type();
4308 assert_eq!(t1, PhysicalType::INT32);
4309 let t2 = builder.parquet_schema().columns()[1].physical_type();
4310 assert_eq!(t2, PhysicalType::INT64);
4311 let t3 = builder.parquet_schema().columns()[2].physical_type();
4312 assert_eq!(t3, PhysicalType::INT64);
4313 let t4 = builder.parquet_schema().columns()[3].physical_type();
4314 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4315
4316 let mut reader = builder.build().unwrap();
4317 assert_eq!(batch.schema(), reader.schema());
4318
4319 let out = reader.next().unwrap().unwrap();
4320 assert_eq!(batch, out);
4321 }
4322
4323 #[test]
4324 fn test_decimal() {
4325 test_decimal_roundtrip::<Decimal128Type>();
4326 test_decimal_roundtrip::<Decimal256Type>();
4327 }
4328
4329 #[test]
4330 fn test_list_selection() {
4331 let schema = Arc::new(Schema::new(vec![Field::new_list(
4332 "list",
4333 Field::new_list_field(ArrowDataType::Utf8, true),
4334 false,
4335 )]));
4336 let mut buf = Vec::with_capacity(1024);
4337
4338 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4339
4340 for i in 0..2 {
4341 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4342 for j in 0..1024 {
4343 list_a_builder.values().append_value(format!("{i} {j}"));
4344 list_a_builder.append(true);
4345 }
4346 let batch =
4347 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4348 .unwrap();
4349 writer.write(&batch).unwrap();
4350 }
4351 let _metadata = writer.close().unwrap();
4352
4353 let buf = Bytes::from(buf);
4354 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4355 .unwrap()
4356 .with_row_selection(RowSelection::from(vec![
4357 RowSelector::skip(100),
4358 RowSelector::select(924),
4359 RowSelector::skip(100),
4360 RowSelector::select(924),
4361 ]))
4362 .build()
4363 .unwrap();
4364
4365 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4366 let batch = concat_batches(&schema, &batches).unwrap();
4367
4368 assert_eq!(batch.num_rows(), 924 * 2);
4369 let list = batch.column(0).as_list::<i32>();
4370
4371 for w in list.value_offsets().windows(2) {
4372 assert_eq!(w[0] + 1, w[1])
4373 }
4374 let mut values = list.values().as_string::<i32>().iter();
4375
4376 for i in 0..2 {
4377 for j in 100..1024 {
4378 let expected = format!("{i} {j}");
4379 assert_eq!(values.next().unwrap().unwrap(), &expected);
4380 }
4381 }
4382 }
4383
4384 #[test]
4385 fn test_list_selection_fuzz() {
4386 let mut rng = rng();
4387 let schema = Arc::new(Schema::new(vec![Field::new_list(
4388 "list",
4389 Field::new_list(
4390 Field::LIST_FIELD_DEFAULT_NAME,
4391 Field::new_list_field(ArrowDataType::Int32, true),
4392 true,
4393 ),
4394 true,
4395 )]));
4396 let mut buf = Vec::with_capacity(1024);
4397 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4398
4399 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4400
4401 for _ in 0..2048 {
4402 if rng.random_bool(0.2) {
4403 list_a_builder.append(false);
4404 continue;
4405 }
4406
4407 let list_a_len = rng.random_range(0..10);
4408 let list_b_builder = list_a_builder.values();
4409
4410 for _ in 0..list_a_len {
4411 if rng.random_bool(0.2) {
4412 list_b_builder.append(false);
4413 continue;
4414 }
4415
4416 let list_b_len = rng.random_range(0..10);
4417 let int_builder = list_b_builder.values();
4418 for _ in 0..list_b_len {
4419 match rng.random_bool(0.2) {
4420 true => int_builder.append_null(),
4421 false => int_builder.append_value(rng.random()),
4422 }
4423 }
4424 list_b_builder.append(true)
4425 }
4426 list_a_builder.append(true);
4427 }
4428
4429 let array = Arc::new(list_a_builder.finish());
4430 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4431
4432 writer.write(&batch).unwrap();
4433 let _metadata = writer.close().unwrap();
4434
4435 let buf = Bytes::from(buf);
4436
4437 let cases = [
4438 vec![
4439 RowSelector::skip(100),
4440 RowSelector::select(924),
4441 RowSelector::skip(100),
4442 RowSelector::select(924),
4443 ],
4444 vec![
4445 RowSelector::select(924),
4446 RowSelector::skip(100),
4447 RowSelector::select(924),
4448 RowSelector::skip(100),
4449 ],
4450 vec![
4451 RowSelector::skip(1023),
4452 RowSelector::select(1),
4453 RowSelector::skip(1023),
4454 RowSelector::select(1),
4455 ],
4456 vec![
4457 RowSelector::select(1),
4458 RowSelector::skip(1023),
4459 RowSelector::select(1),
4460 RowSelector::skip(1023),
4461 ],
4462 ];
4463
4464 for batch_size in [100, 1024, 2048] {
4465 for selection in &cases {
4466 let selection = RowSelection::from(selection.clone());
4467 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4468 .unwrap()
4469 .with_row_selection(selection.clone())
4470 .with_batch_size(batch_size)
4471 .build()
4472 .unwrap();
4473
4474 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4475 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4476 assert_eq!(actual.num_rows(), selection.row_count());
4477
4478 let mut batch_offset = 0;
4479 let mut actual_offset = 0;
4480 for selector in selection.iter() {
4481 if selector.skip {
4482 batch_offset += selector.row_count;
4483 continue;
4484 }
4485
4486 assert_eq!(
4487 batch.slice(batch_offset, selector.row_count),
4488 actual.slice(actual_offset, selector.row_count)
4489 );
4490
4491 batch_offset += selector.row_count;
4492 actual_offset += selector.row_count;
4493 }
4494 }
4495 }
4496 }
4497
4498 #[test]
4499 fn test_read_old_nested_list() {
4500 use arrow::datatypes::DataType;
4501 use arrow::datatypes::ToByteSlice;
4502
4503 let testdata = arrow::util::test_util::parquet_test_data();
4504 let path = format!("{testdata}/old_list_structure.parquet");
4513 let test_file = File::open(path).unwrap();
4514
4515 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4517
4518 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4520
4521 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4523 "array",
4524 DataType::Int32,
4525 false,
4526 ))))
4527 .len(2)
4528 .add_buffer(a_value_offsets)
4529 .add_child_data(a_values.into_data())
4530 .build()
4531 .unwrap();
4532 let a = ListArray::from(a_list_data);
4533
4534 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4535 let mut reader = builder.build().unwrap();
4536 let out = reader.next().unwrap().unwrap();
4537 assert_eq!(out.num_rows(), 1);
4538 assert_eq!(out.num_columns(), 1);
4539 let c0 = out.column(0);
4541 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4542 let r0 = c0arr.value(0);
4544 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4545 assert_eq!(r0arr, &a);
4546 }
4547
4548 #[test]
4549 fn test_map_no_value() {
4550 let testdata = arrow::util::test_util::parquet_test_data();
4570 let path = format!("{testdata}/map_no_value.parquet");
4571 let file = File::open(path).unwrap();
4572
4573 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4574 .unwrap()
4575 .build()
4576 .unwrap();
4577 let out = reader.next().unwrap().unwrap();
4578 assert_eq!(out.num_rows(), 3);
4579 assert_eq!(out.num_columns(), 3);
4580 let c0 = out.column(1).as_list::<i32>();
4582 let c1 = out.column(2).as_list::<i32>();
4583 assert_eq!(c0.len(), c1.len());
4584 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4585 }
4586}