1use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
32use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use crate::column::page::{PageIterator, PageReader};
34#[cfg(feature = "encryption")]
35use crate::encryption::decrypt::FileDecryptionProperties;
36use crate::errors::{ParquetError, Result};
37use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
38use crate::file::reader::{ChunkReader, SerializedPageReader};
39use crate::schema::types::SchemaDescriptor;
40
41pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
42
43mod filter;
44mod read_plan;
45mod selection;
46pub mod statistics;
47
48pub struct ArrowReaderBuilder<T> {
94 pub(crate) input: T,
95
96 pub(crate) metadata: Arc<ParquetMetaData>,
97
98 pub(crate) schema: SchemaRef,
99
100 pub(crate) fields: Option<Arc<ParquetField>>,
101
102 pub(crate) batch_size: usize,
103
104 pub(crate) row_groups: Option<Vec<usize>>,
105
106 pub(crate) projection: ProjectionMask,
107
108 pub(crate) filter: Option<RowFilter>,
109
110 pub(crate) selection: Option<RowSelection>,
111
112 pub(crate) limit: Option<usize>,
113
114 pub(crate) offset: Option<usize>,
115}
116
117impl<T: Debug> Debug for ArrowReaderBuilder<T> {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("ArrowReaderBuilder<T>")
120 .field("input", &self.input)
121 .field("metadata", &self.metadata)
122 .field("schema", &self.schema)
123 .field("fields", &self.fields)
124 .field("batch_size", &self.batch_size)
125 .field("row_groups", &self.row_groups)
126 .field("projection", &self.projection)
127 .field("filter", &self.filter)
128 .field("selection", &self.selection)
129 .field("limit", &self.limit)
130 .field("offset", &self.offset)
131 .finish()
132 }
133}
134
135impl<T> ArrowReaderBuilder<T> {
136 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
137 Self {
138 input,
139 metadata: metadata.metadata,
140 schema: metadata.schema,
141 fields: metadata.fields,
142 batch_size: 1024,
143 row_groups: None,
144 projection: ProjectionMask::all(),
145 filter: None,
146 selection: None,
147 limit: None,
148 offset: None,
149 }
150 }
151
152 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
154 &self.metadata
155 }
156
157 pub fn parquet_schema(&self) -> &SchemaDescriptor {
159 self.metadata.file_metadata().schema_descr()
160 }
161
162 pub fn schema(&self) -> &SchemaRef {
164 &self.schema
165 }
166
167 pub fn with_batch_size(self, batch_size: usize) -> Self {
170 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
172 Self { batch_size, ..self }
173 }
174
175 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
179 Self {
180 row_groups: Some(row_groups),
181 ..self
182 }
183 }
184
185 pub fn with_projection(self, mask: ProjectionMask) -> Self {
187 Self {
188 projection: mask,
189 ..self
190 }
191 }
192
193 pub fn with_row_selection(self, selection: RowSelection) -> Self {
253 Self {
254 selection: Some(selection),
255 ..self
256 }
257 }
258
259 pub fn with_row_filter(self, filter: RowFilter) -> Self {
266 Self {
267 filter: Some(filter),
268 ..self
269 }
270 }
271
272 pub fn with_limit(self, limit: usize) -> Self {
280 Self {
281 limit: Some(limit),
282 ..self
283 }
284 }
285
286 pub fn with_offset(self, offset: usize) -> Self {
294 Self {
295 offset: Some(offset),
296 ..self
297 }
298 }
299}
300
301#[derive(Debug, Clone, Default)]
306pub struct ArrowReaderOptions {
307 skip_arrow_metadata: bool,
309 supplied_schema: Option<SchemaRef>,
314 pub(crate) page_index: bool,
316 #[cfg(feature = "encryption")]
318 pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
319}
320
321impl ArrowReaderOptions {
322 pub fn new() -> Self {
324 Self::default()
325 }
326
327 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
334 Self {
335 skip_arrow_metadata,
336 ..self
337 }
338 }
339
340 pub fn with_schema(self, schema: SchemaRef) -> Self {
397 Self {
398 supplied_schema: Some(schema),
399 skip_arrow_metadata: true,
400 ..self
401 }
402 }
403
404 pub fn with_page_index(self, page_index: bool) -> Self {
417 Self { page_index, ..self }
418 }
419
420 #[cfg(feature = "encryption")]
424 pub fn with_file_decryption_properties(
425 self,
426 file_decryption_properties: FileDecryptionProperties,
427 ) -> Self {
428 Self {
429 file_decryption_properties: Some(file_decryption_properties),
430 ..self
431 }
432 }
433
434 pub fn page_index(&self) -> bool {
438 self.page_index
439 }
440
441 #[cfg(feature = "encryption")]
446 pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
447 self.file_decryption_properties.as_ref()
448 }
449}
450
451#[derive(Debug, Clone)]
466pub struct ArrowReaderMetadata {
467 pub(crate) metadata: Arc<ParquetMetaData>,
469 pub(crate) schema: SchemaRef,
471
472 pub(crate) fields: Option<Arc<ParquetField>>,
473}
474
475impl ArrowReaderMetadata {
476 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
487 let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
488 #[cfg(feature = "encryption")]
489 let metadata =
490 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
491 let metadata = metadata.parse_and_finish(reader)?;
492 Self::try_new(Arc::new(metadata), options)
493 }
494
495 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
502 match options.supplied_schema {
503 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
504 None => {
505 let kv_metadata = match options.skip_arrow_metadata {
506 true => None,
507 false => metadata.file_metadata().key_value_metadata(),
508 };
509
510 let (schema, fields) = parquet_to_arrow_schema_and_fields(
511 metadata.file_metadata().schema_descr(),
512 ProjectionMask::all(),
513 kv_metadata,
514 )?;
515
516 Ok(Self {
517 metadata,
518 schema: Arc::new(schema),
519 fields: fields.map(Arc::new),
520 })
521 }
522 }
523 }
524
525 fn with_supplied_schema(
526 metadata: Arc<ParquetMetaData>,
527 supplied_schema: SchemaRef,
528 ) -> Result<Self> {
529 let parquet_schema = metadata.file_metadata().schema_descr();
530 let field_levels = parquet_to_arrow_field_levels(
531 parquet_schema,
532 ProjectionMask::all(),
533 Some(supplied_schema.fields()),
534 )?;
535 let fields = field_levels.fields;
536 let inferred_len = fields.len();
537 let supplied_len = supplied_schema.fields().len();
538 if inferred_len != supplied_len {
542 return Err(arrow_err!(format!(
543 "Incompatible supplied Arrow schema: expected {} columns received {}",
544 inferred_len, supplied_len
545 )));
546 }
547
548 let mut errors = Vec::new();
549
550 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
551
552 for (field1, field2) in field_iter {
553 if field1.data_type() != field2.data_type() {
554 errors.push(format!(
555 "data type mismatch for field {}: requested {:?} but found {:?}",
556 field1.name(),
557 field1.data_type(),
558 field2.data_type()
559 ));
560 }
561 if field1.is_nullable() != field2.is_nullable() {
562 errors.push(format!(
563 "nullability mismatch for field {}: expected {:?} but found {:?}",
564 field1.name(),
565 field1.is_nullable(),
566 field2.is_nullable()
567 ));
568 }
569 if field1.metadata() != field2.metadata() {
570 errors.push(format!(
571 "metadata mismatch for field {}: expected {:?} but found {:?}",
572 field1.name(),
573 field1.metadata(),
574 field2.metadata()
575 ));
576 }
577 }
578
579 if !errors.is_empty() {
580 let message = errors.join(", ");
581 return Err(ParquetError::ArrowError(format!(
582 "Incompatible supplied Arrow schema: {message}",
583 )));
584 }
585
586 Ok(Self {
587 metadata,
588 schema: supplied_schema,
589 fields: field_levels.levels.map(Arc::new),
590 })
591 }
592
593 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
595 &self.metadata
596 }
597
598 pub fn parquet_schema(&self) -> &SchemaDescriptor {
600 self.metadata.file_metadata().schema_descr()
601 }
602
603 pub fn schema(&self) -> &SchemaRef {
605 &self.schema
606 }
607}
608
609#[doc(hidden)]
610pub struct SyncReader<T: ChunkReader>(T);
612
613impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
614 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
615 f.debug_tuple("SyncReader").field(&self.0).finish()
616 }
617}
618
619pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
625
626impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
627 pub fn try_new(reader: T) -> Result<Self> {
656 Self::try_new_with_options(reader, Default::default())
657 }
658
659 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
661 let metadata = ArrowReaderMetadata::load(&reader, options)?;
662 Ok(Self::new_with_metadata(reader, metadata))
663 }
664
665 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
703 Self::new_builder(SyncReader(input), metadata)
704 }
705
706 pub fn build(self) -> Result<ParquetRecordBatchReader> {
710 let batch_size = self
712 .batch_size
713 .min(self.metadata.file_metadata().num_rows() as usize);
714
715 let row_groups = self
716 .row_groups
717 .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
718
719 let reader = ReaderRowGroups {
720 reader: Arc::new(self.input.0),
721 metadata: self.metadata,
722 row_groups,
723 };
724
725 let mut filter = self.filter;
726 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
727
728 if let Some(filter) = filter.as_mut() {
730 for predicate in filter.predicates.iter_mut() {
731 if !plan_builder.selects_any() {
733 break;
734 }
735
736 let array_reader = ArrayReaderBuilder::new(&reader)
737 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
738
739 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
740 }
741 }
742
743 let array_reader = ArrayReaderBuilder::new(&reader)
744 .build_array_reader(self.fields.as_deref(), &self.projection)?;
745
746 let read_plan = plan_builder
747 .limited(reader.num_rows())
748 .with_offset(self.offset)
749 .with_limit(self.limit)
750 .build_limited()
751 .build();
752
753 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
754 }
755}
756
757struct ReaderRowGroups<T: ChunkReader> {
758 reader: Arc<T>,
759
760 metadata: Arc<ParquetMetaData>,
761 row_groups: Vec<usize>,
763}
764
765impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
766 fn num_rows(&self) -> usize {
767 let meta = self.metadata.row_groups();
768 self.row_groups
769 .iter()
770 .map(|x| meta[*x].num_rows() as usize)
771 .sum()
772 }
773
774 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
775 Ok(Box::new(ReaderPageIterator {
776 column_idx: i,
777 reader: self.reader.clone(),
778 metadata: self.metadata.clone(),
779 row_groups: self.row_groups.clone().into_iter(),
780 }))
781 }
782}
783
784struct ReaderPageIterator<T: ChunkReader> {
785 reader: Arc<T>,
786 column_idx: usize,
787 row_groups: std::vec::IntoIter<usize>,
788 metadata: Arc<ParquetMetaData>,
789}
790
791impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
792 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
794 let rg = self.metadata.row_group(rg_idx);
795 let column_chunk_metadata = rg.column(self.column_idx);
796 let offset_index = self.metadata.offset_index();
797 let page_locations = offset_index
800 .filter(|i| !i[rg_idx].is_empty())
801 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
802 let total_rows = rg.num_rows() as usize;
803 let reader = self.reader.clone();
804
805 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
806 .add_crypto_context(
807 rg_idx,
808 self.column_idx,
809 self.metadata.as_ref(),
810 column_chunk_metadata,
811 )
812 }
813}
814
815impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
816 type Item = Result<Box<dyn PageReader>>;
817
818 fn next(&mut self) -> Option<Self::Item> {
819 let rg_idx = self.row_groups.next()?;
820 let page_reader = self
821 .next_page_reader(rg_idx)
822 .map(|page_reader| Box::new(page_reader) as _);
823 Some(page_reader)
824 }
825}
826
827impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
828
829pub struct ParquetRecordBatchReader {
832 array_reader: Box<dyn ArrayReader>,
833 schema: SchemaRef,
834 read_plan: ReadPlan,
835}
836
837impl Iterator for ParquetRecordBatchReader {
838 type Item = Result<RecordBatch, ArrowError>;
839
840 fn next(&mut self) -> Option<Self::Item> {
841 self.next_inner()
842 .map_err(|arrow_err| arrow_err.into())
843 .transpose()
844 }
845}
846
847impl ParquetRecordBatchReader {
848 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
854 let mut read_records = 0;
855 let batch_size = self.batch_size();
856 match self.read_plan.selection_mut() {
857 Some(selection) => {
858 while read_records < batch_size && !selection.is_empty() {
859 let front = selection.pop_front().unwrap();
860 if front.skip {
861 let skipped = self.array_reader.skip_records(front.row_count)?;
862
863 if skipped != front.row_count {
864 return Err(general_err!(
865 "failed to skip rows, expected {}, got {}",
866 front.row_count,
867 skipped
868 ));
869 }
870 continue;
871 }
872
873 if front.row_count == 0 {
876 continue;
877 }
878
879 let need_read = batch_size - read_records;
881 let to_read = match front.row_count.checked_sub(need_read) {
882 Some(remaining) if remaining != 0 => {
883 selection.push_front(RowSelector::select(remaining));
886 need_read
887 }
888 _ => front.row_count,
889 };
890 match self.array_reader.read_records(to_read)? {
891 0 => break,
892 rec => read_records += rec,
893 };
894 }
895 }
896 None => {
897 self.array_reader.read_records(batch_size)?;
898 }
899 };
900
901 let array = self.array_reader.consume_batch()?;
902 let struct_array = array.as_struct_opt().ok_or_else(|| {
903 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
904 })?;
905
906 Ok(if struct_array.len() > 0 {
907 Some(RecordBatch::from(struct_array))
908 } else {
909 None
910 })
911 }
912}
913
914impl RecordBatchReader for ParquetRecordBatchReader {
915 fn schema(&self) -> SchemaRef {
920 self.schema.clone()
921 }
922}
923
924impl ParquetRecordBatchReader {
925 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
929 ParquetRecordBatchReaderBuilder::try_new(reader)?
930 .with_batch_size(batch_size)
931 .build()
932 }
933
934 pub fn try_new_with_row_groups(
939 levels: &FieldLevels,
940 row_groups: &dyn RowGroups,
941 batch_size: usize,
942 selection: Option<RowSelection>,
943 ) -> Result<Self> {
944 let array_reader = ArrayReaderBuilder::new(row_groups)
945 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
946
947 let read_plan = ReadPlanBuilder::new(batch_size)
948 .with_selection(selection)
949 .build();
950
951 Ok(Self {
952 array_reader,
953 schema: Arc::new(Schema::new(levels.fields.clone())),
954 read_plan,
955 })
956 }
957
958 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
962 let schema = match array_reader.get_data_type() {
963 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
964 _ => unreachable!("Struct array reader's data type is not struct!"),
965 };
966
967 Self {
968 array_reader,
969 schema: Arc::new(schema),
970 read_plan,
971 }
972 }
973
974 #[inline(always)]
975 pub(crate) fn batch_size(&self) -> usize {
976 self.read_plan.batch_size()
977 }
978}
979
980#[cfg(test)]
981mod tests {
982 use std::cmp::min;
983 use std::collections::{HashMap, VecDeque};
984 use std::fmt::Formatter;
985 use std::fs::File;
986 use std::io::Seek;
987 use std::path::PathBuf;
988 use std::sync::Arc;
989
990 use arrow_array::builder::*;
991 use arrow_array::cast::AsArray;
992 use arrow_array::types::{
993 Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
994 Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
995 };
996 use arrow_array::*;
997 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
998 use arrow_data::{ArrayData, ArrayDataBuilder};
999 use arrow_schema::{
1000 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1001 };
1002 use arrow_select::concat::concat_batches;
1003 use bytes::Bytes;
1004 use half::f16;
1005 use num::PrimInt;
1006 use rand::{rng, Rng, RngCore};
1007 use tempfile::tempfile;
1008
1009 use crate::arrow::arrow_reader::{
1010 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1011 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1012 };
1013 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1014 use crate::arrow::{ArrowWriter, ProjectionMask};
1015 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1016 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1017 use crate::data_type::{
1018 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1019 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1020 };
1021 use crate::errors::Result;
1022 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1023 use crate::file::writer::SerializedFileWriter;
1024 use crate::schema::parser::parse_message_type;
1025 use crate::schema::types::{Type, TypePtr};
1026 use crate::util::test_common::rand_gen::RandGen;
1027
1028 #[test]
1029 fn test_arrow_reader_all_columns() {
1030 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1031
1032 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1033 let original_schema = Arc::clone(builder.schema());
1034 let reader = builder.build().unwrap();
1035
1036 assert_eq!(original_schema.fields(), reader.schema().fields());
1038 }
1039
1040 #[test]
1041 fn test_arrow_reader_single_column() {
1042 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1043
1044 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1045 let original_schema = Arc::clone(builder.schema());
1046
1047 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1048 let reader = builder.with_projection(mask).build().unwrap();
1049
1050 assert_eq!(1, reader.schema().fields().len());
1052 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1053 }
1054
1055 #[test]
1056 fn test_arrow_reader_single_column_by_name() {
1057 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1058
1059 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1060 let original_schema = Arc::clone(builder.schema());
1061
1062 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1063 let reader = builder.with_projection(mask).build().unwrap();
1064
1065 assert_eq!(1, reader.schema().fields().len());
1067 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1068 }
1069
1070 #[test]
1071 fn test_null_column_reader_test() {
1072 let mut file = tempfile::tempfile().unwrap();
1073
1074 let schema = "
1075 message message {
1076 OPTIONAL INT32 int32;
1077 }
1078 ";
1079 let schema = Arc::new(parse_message_type(schema).unwrap());
1080
1081 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1082 generate_single_column_file_with_data::<Int32Type>(
1083 &[vec![], vec![]],
1084 Some(&def_levels),
1085 file.try_clone().unwrap(), schema,
1087 Some(Field::new("int32", ArrowDataType::Null, true)),
1088 &Default::default(),
1089 )
1090 .unwrap();
1091
1092 file.rewind().unwrap();
1093
1094 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1095 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1096
1097 assert_eq!(batches.len(), 4);
1098 for batch in &batches[0..3] {
1099 assert_eq!(batch.num_rows(), 2);
1100 assert_eq!(batch.num_columns(), 1);
1101 assert_eq!(batch.column(0).null_count(), 2);
1102 }
1103
1104 assert_eq!(batches[3].num_rows(), 1);
1105 assert_eq!(batches[3].num_columns(), 1);
1106 assert_eq!(batches[3].column(0).null_count(), 1);
1107 }
1108
1109 #[test]
1110 fn test_primitive_single_column_reader_test() {
1111 run_single_column_reader_tests::<BoolType, _, BoolType>(
1112 2,
1113 ConvertedType::NONE,
1114 None,
1115 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1116 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1117 );
1118 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1119 2,
1120 ConvertedType::NONE,
1121 None,
1122 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1123 &[
1124 Encoding::PLAIN,
1125 Encoding::RLE_DICTIONARY,
1126 Encoding::DELTA_BINARY_PACKED,
1127 Encoding::BYTE_STREAM_SPLIT,
1128 ],
1129 );
1130 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1131 2,
1132 ConvertedType::NONE,
1133 None,
1134 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1135 &[
1136 Encoding::PLAIN,
1137 Encoding::RLE_DICTIONARY,
1138 Encoding::DELTA_BINARY_PACKED,
1139 Encoding::BYTE_STREAM_SPLIT,
1140 ],
1141 );
1142 run_single_column_reader_tests::<FloatType, _, FloatType>(
1143 2,
1144 ConvertedType::NONE,
1145 None,
1146 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1147 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1148 );
1149 }
1150
1151 #[test]
1152 fn test_unsigned_primitive_single_column_reader_test() {
1153 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1154 2,
1155 ConvertedType::UINT_32,
1156 Some(ArrowDataType::UInt32),
1157 |vals| {
1158 Arc::new(UInt32Array::from_iter(
1159 vals.iter().map(|x| x.map(|x| x as u32)),
1160 ))
1161 },
1162 &[
1163 Encoding::PLAIN,
1164 Encoding::RLE_DICTIONARY,
1165 Encoding::DELTA_BINARY_PACKED,
1166 ],
1167 );
1168 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1169 2,
1170 ConvertedType::UINT_64,
1171 Some(ArrowDataType::UInt64),
1172 |vals| {
1173 Arc::new(UInt64Array::from_iter(
1174 vals.iter().map(|x| x.map(|x| x as u64)),
1175 ))
1176 },
1177 &[
1178 Encoding::PLAIN,
1179 Encoding::RLE_DICTIONARY,
1180 Encoding::DELTA_BINARY_PACKED,
1181 ],
1182 );
1183 }
1184
1185 #[test]
1186 fn test_unsigned_roundtrip() {
1187 let schema = Arc::new(Schema::new(vec![
1188 Field::new("uint32", ArrowDataType::UInt32, true),
1189 Field::new("uint64", ArrowDataType::UInt64, true),
1190 ]));
1191
1192 let mut buf = Vec::with_capacity(1024);
1193 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1194
1195 let original = RecordBatch::try_new(
1196 schema,
1197 vec![
1198 Arc::new(UInt32Array::from_iter_values([
1199 0,
1200 i32::MAX as u32,
1201 u32::MAX,
1202 ])),
1203 Arc::new(UInt64Array::from_iter_values([
1204 0,
1205 i64::MAX as u64,
1206 u64::MAX,
1207 ])),
1208 ],
1209 )
1210 .unwrap();
1211
1212 writer.write(&original).unwrap();
1213 writer.close().unwrap();
1214
1215 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1216 let ret = reader.next().unwrap().unwrap();
1217 assert_eq!(ret, original);
1218
1219 ret.column(0)
1221 .as_any()
1222 .downcast_ref::<UInt32Array>()
1223 .unwrap();
1224
1225 ret.column(1)
1226 .as_any()
1227 .downcast_ref::<UInt64Array>()
1228 .unwrap();
1229 }
1230
1231 #[test]
1232 fn test_float16_roundtrip() -> Result<()> {
1233 let schema = Arc::new(Schema::new(vec![
1234 Field::new("float16", ArrowDataType::Float16, false),
1235 Field::new("float16-nullable", ArrowDataType::Float16, true),
1236 ]));
1237
1238 let mut buf = Vec::with_capacity(1024);
1239 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1240
1241 let original = RecordBatch::try_new(
1242 schema,
1243 vec![
1244 Arc::new(Float16Array::from_iter_values([
1245 f16::EPSILON,
1246 f16::MIN,
1247 f16::MAX,
1248 f16::NAN,
1249 f16::INFINITY,
1250 f16::NEG_INFINITY,
1251 f16::ONE,
1252 f16::NEG_ONE,
1253 f16::ZERO,
1254 f16::NEG_ZERO,
1255 f16::E,
1256 f16::PI,
1257 f16::FRAC_1_PI,
1258 ])),
1259 Arc::new(Float16Array::from(vec![
1260 None,
1261 None,
1262 None,
1263 Some(f16::NAN),
1264 Some(f16::INFINITY),
1265 Some(f16::NEG_INFINITY),
1266 None,
1267 None,
1268 None,
1269 None,
1270 None,
1271 None,
1272 Some(f16::FRAC_1_PI),
1273 ])),
1274 ],
1275 )?;
1276
1277 writer.write(&original)?;
1278 writer.close()?;
1279
1280 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1281 let ret = reader.next().unwrap()?;
1282 assert_eq!(ret, original);
1283
1284 ret.column(0).as_primitive::<Float16Type>();
1286 ret.column(1).as_primitive::<Float16Type>();
1287
1288 Ok(())
1289 }
1290
1291 #[test]
1292 fn test_time_utc_roundtrip() -> Result<()> {
1293 let schema = Arc::new(Schema::new(vec![
1294 Field::new(
1295 "time_millis",
1296 ArrowDataType::Time32(TimeUnit::Millisecond),
1297 true,
1298 )
1299 .with_metadata(HashMap::from_iter(vec![(
1300 "adjusted_to_utc".to_string(),
1301 "".to_string(),
1302 )])),
1303 Field::new(
1304 "time_micros",
1305 ArrowDataType::Time64(TimeUnit::Microsecond),
1306 true,
1307 )
1308 .with_metadata(HashMap::from_iter(vec![(
1309 "adjusted_to_utc".to_string(),
1310 "".to_string(),
1311 )])),
1312 ]));
1313
1314 let mut buf = Vec::with_capacity(1024);
1315 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1316
1317 let original = RecordBatch::try_new(
1318 schema,
1319 vec![
1320 Arc::new(Time32MillisecondArray::from(vec![
1321 Some(-1),
1322 Some(0),
1323 Some(86_399_000),
1324 Some(86_400_000),
1325 Some(86_401_000),
1326 None,
1327 ])),
1328 Arc::new(Time64MicrosecondArray::from(vec![
1329 Some(-1),
1330 Some(0),
1331 Some(86_399 * 1_000_000),
1332 Some(86_400 * 1_000_000),
1333 Some(86_401 * 1_000_000),
1334 None,
1335 ])),
1336 ],
1337 )?;
1338
1339 writer.write(&original)?;
1340 writer.close()?;
1341
1342 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1343 let ret = reader.next().unwrap()?;
1344 assert_eq!(ret, original);
1345
1346 ret.column(0).as_primitive::<Time32MillisecondType>();
1348 ret.column(1).as_primitive::<Time64MicrosecondType>();
1349
1350 Ok(())
1351 }
1352
1353 #[test]
1354 fn test_date32_roundtrip() -> Result<()> {
1355 use arrow_array::Date32Array;
1356
1357 let schema = Arc::new(Schema::new(vec![Field::new(
1358 "date32",
1359 ArrowDataType::Date32,
1360 false,
1361 )]));
1362
1363 let mut buf = Vec::with_capacity(1024);
1364
1365 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1366
1367 let original = RecordBatch::try_new(
1368 schema,
1369 vec![Arc::new(Date32Array::from(vec![
1370 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1371 ]))],
1372 )?;
1373
1374 writer.write(&original)?;
1375 writer.close()?;
1376
1377 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1378 let ret = reader.next().unwrap()?;
1379 assert_eq!(ret, original);
1380
1381 ret.column(0).as_primitive::<Date32Type>();
1383
1384 Ok(())
1385 }
1386
1387 #[test]
1388 fn test_date64_roundtrip() -> Result<()> {
1389 use arrow_array::Date64Array;
1390
1391 let schema = Arc::new(Schema::new(vec![
1392 Field::new("small-date64", ArrowDataType::Date64, false),
1393 Field::new("big-date64", ArrowDataType::Date64, false),
1394 Field::new("invalid-date64", ArrowDataType::Date64, false),
1395 ]));
1396
1397 let mut default_buf = Vec::with_capacity(1024);
1398 let mut coerce_buf = Vec::with_capacity(1024);
1399
1400 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1401
1402 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1403 let mut coerce_writer =
1404 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1405
1406 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1407
1408 let original = RecordBatch::try_new(
1409 schema,
1410 vec![
1411 Arc::new(Date64Array::from(vec![
1413 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1414 -1_000 * NUM_MILLISECONDS_IN_DAY,
1415 0,
1416 1_000 * NUM_MILLISECONDS_IN_DAY,
1417 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1418 ])),
1419 Arc::new(Date64Array::from(vec![
1421 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1422 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1423 0,
1424 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1425 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1426 ])),
1427 Arc::new(Date64Array::from(vec![
1429 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1430 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1431 1,
1432 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1433 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1434 ])),
1435 ],
1436 )?;
1437
1438 default_writer.write(&original)?;
1439 coerce_writer.write(&original)?;
1440
1441 default_writer.close()?;
1442 coerce_writer.close()?;
1443
1444 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1445 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1446
1447 let default_ret = default_reader.next().unwrap()?;
1448 let coerce_ret = coerce_reader.next().unwrap()?;
1449
1450 assert_eq!(default_ret, original);
1452
1453 assert_eq!(coerce_ret.column(0), original.column(0));
1455 assert_ne!(coerce_ret.column(1), original.column(1));
1456 assert_ne!(coerce_ret.column(2), original.column(2));
1457
1458 default_ret.column(0).as_primitive::<Date64Type>();
1460 coerce_ret.column(0).as_primitive::<Date64Type>();
1461
1462 Ok(())
1463 }
1464 struct RandFixedLenGen {}
1465
1466 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1467 fn gen(len: i32) -> FixedLenByteArray {
1468 let mut v = vec![0u8; len as usize];
1469 rng().fill_bytes(&mut v);
1470 ByteArray::from(v).into()
1471 }
1472 }
1473
1474 #[test]
1475 fn test_fixed_length_binary_column_reader() {
1476 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1477 20,
1478 ConvertedType::NONE,
1479 None,
1480 |vals| {
1481 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1482 for val in vals {
1483 match val {
1484 Some(b) => builder.append_value(b).unwrap(),
1485 None => builder.append_null(),
1486 }
1487 }
1488 Arc::new(builder.finish())
1489 },
1490 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1491 );
1492 }
1493
1494 #[test]
1495 fn test_interval_day_time_column_reader() {
1496 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1497 12,
1498 ConvertedType::INTERVAL,
1499 None,
1500 |vals| {
1501 Arc::new(
1502 vals.iter()
1503 .map(|x| {
1504 x.as_ref().map(|b| IntervalDayTime {
1505 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1506 milliseconds: i32::from_le_bytes(
1507 b.as_ref()[8..12].try_into().unwrap(),
1508 ),
1509 })
1510 })
1511 .collect::<IntervalDayTimeArray>(),
1512 )
1513 },
1514 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1515 );
1516 }
1517
1518 #[test]
1519 fn test_int96_single_column_reader_test() {
1520 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1521
1522 type TypeHintAndConversionFunction =
1523 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1524
1525 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1526 (None, |vals: &[Option<Int96>]| {
1528 Arc::new(TimestampNanosecondArray::from_iter(
1529 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1530 )) as ArrayRef
1531 }),
1532 (
1534 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1535 |vals: &[Option<Int96>]| {
1536 Arc::new(TimestampSecondArray::from_iter(
1537 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1538 )) as ArrayRef
1539 },
1540 ),
1541 (
1542 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1543 |vals: &[Option<Int96>]| {
1544 Arc::new(TimestampMillisecondArray::from_iter(
1545 vals.iter().map(|x| x.map(|x| x.to_millis())),
1546 )) as ArrayRef
1547 },
1548 ),
1549 (
1550 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1551 |vals: &[Option<Int96>]| {
1552 Arc::new(TimestampMicrosecondArray::from_iter(
1553 vals.iter().map(|x| x.map(|x| x.to_micros())),
1554 )) as ArrayRef
1555 },
1556 ),
1557 (
1558 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1559 |vals: &[Option<Int96>]| {
1560 Arc::new(TimestampNanosecondArray::from_iter(
1561 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1562 )) as ArrayRef
1563 },
1564 ),
1565 (
1567 Some(ArrowDataType::Timestamp(
1568 TimeUnit::Second,
1569 Some(Arc::from("-05:00")),
1570 )),
1571 |vals: &[Option<Int96>]| {
1572 Arc::new(
1573 TimestampSecondArray::from_iter(
1574 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1575 )
1576 .with_timezone("-05:00"),
1577 ) as ArrayRef
1578 },
1579 ),
1580 ];
1581
1582 resolutions.iter().for_each(|(arrow_type, converter)| {
1583 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1584 2,
1585 ConvertedType::NONE,
1586 arrow_type.clone(),
1587 converter,
1588 encodings,
1589 );
1590 })
1591 }
1592
1593 #[test]
1594 fn test_int96_from_spark_file_with_provided_schema() {
1595 use arrow_schema::DataType::Timestamp;
1599 let test_data = arrow::util::test_util::parquet_test_data();
1600 let path = format!("{test_data}/int96_from_spark.parquet");
1601 let file = File::open(path).unwrap();
1602
1603 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1604 "a",
1605 Timestamp(TimeUnit::Microsecond, None),
1606 true,
1607 )]));
1608 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1609
1610 let mut record_reader =
1611 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1612 .unwrap()
1613 .build()
1614 .unwrap();
1615
1616 let batch = record_reader.next().unwrap().unwrap();
1617 assert_eq!(batch.num_columns(), 1);
1618 let column = batch.column(0);
1619 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1620
1621 let expected = Arc::new(Int64Array::from(vec![
1622 Some(1704141296123456),
1623 Some(1704070800000000),
1624 Some(253402225200000000),
1625 Some(1735599600000000),
1626 None,
1627 Some(9089380393200000000),
1628 ]));
1629
1630 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1635 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1636
1637 assert_eq!(casted_timestamps.len(), expected.len());
1638
1639 casted_timestamps
1640 .iter()
1641 .zip(expected.iter())
1642 .for_each(|(lhs, rhs)| {
1643 assert_eq!(lhs, rhs);
1644 });
1645 }
1646
1647 #[test]
1648 fn test_int96_from_spark_file_without_provided_schema() {
1649 use arrow_schema::DataType::Timestamp;
1653 let test_data = arrow::util::test_util::parquet_test_data();
1654 let path = format!("{test_data}/int96_from_spark.parquet");
1655 let file = File::open(path).unwrap();
1656
1657 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1658 .unwrap()
1659 .build()
1660 .unwrap();
1661
1662 let batch = record_reader.next().unwrap().unwrap();
1663 assert_eq!(batch.num_columns(), 1);
1664 let column = batch.column(0);
1665 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1666
1667 let expected = Arc::new(Int64Array::from(vec![
1668 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1673 Some(-4864435138808946688), ]));
1675
1676 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1681 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1682
1683 assert_eq!(casted_timestamps.len(), expected.len());
1684
1685 casted_timestamps
1686 .iter()
1687 .zip(expected.iter())
1688 .for_each(|(lhs, rhs)| {
1689 assert_eq!(lhs, rhs);
1690 });
1691 }
1692
1693 struct RandUtf8Gen {}
1694
1695 impl RandGen<ByteArrayType> for RandUtf8Gen {
1696 fn gen(len: i32) -> ByteArray {
1697 Int32Type::gen(len).to_string().as_str().into()
1698 }
1699 }
1700
1701 #[test]
1702 fn test_utf8_single_column_reader_test() {
1703 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1704 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1705 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1706 })))
1707 }
1708
1709 let encodings = &[
1710 Encoding::PLAIN,
1711 Encoding::RLE_DICTIONARY,
1712 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1713 Encoding::DELTA_BYTE_ARRAY,
1714 ];
1715
1716 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1717 2,
1718 ConvertedType::NONE,
1719 None,
1720 |vals| {
1721 Arc::new(BinaryArray::from_iter(
1722 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1723 ))
1724 },
1725 encodings,
1726 );
1727
1728 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1729 2,
1730 ConvertedType::UTF8,
1731 None,
1732 string_converter::<i32>,
1733 encodings,
1734 );
1735
1736 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1737 2,
1738 ConvertedType::UTF8,
1739 Some(ArrowDataType::Utf8),
1740 string_converter::<i32>,
1741 encodings,
1742 );
1743
1744 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1745 2,
1746 ConvertedType::UTF8,
1747 Some(ArrowDataType::LargeUtf8),
1748 string_converter::<i64>,
1749 encodings,
1750 );
1751
1752 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1753 for key in &small_key_types {
1754 for encoding in encodings {
1755 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1756 opts.encoding = *encoding;
1757
1758 let data_type =
1759 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1760
1761 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1763 opts,
1764 2,
1765 ConvertedType::UTF8,
1766 Some(data_type.clone()),
1767 move |vals| {
1768 let vals = string_converter::<i32>(vals);
1769 arrow::compute::cast(&vals, &data_type).unwrap()
1770 },
1771 );
1772 }
1773 }
1774
1775 let key_types = [
1776 ArrowDataType::Int16,
1777 ArrowDataType::UInt16,
1778 ArrowDataType::Int32,
1779 ArrowDataType::UInt32,
1780 ArrowDataType::Int64,
1781 ArrowDataType::UInt64,
1782 ];
1783
1784 for key in &key_types {
1785 let data_type =
1786 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1787
1788 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1789 2,
1790 ConvertedType::UTF8,
1791 Some(data_type.clone()),
1792 move |vals| {
1793 let vals = string_converter::<i32>(vals);
1794 arrow::compute::cast(&vals, &data_type).unwrap()
1795 },
1796 encodings,
1797 );
1798
1799 }
1816 }
1817
1818 #[test]
1819 fn test_decimal_nullable_struct() {
1820 let decimals = Decimal256Array::from_iter_values(
1821 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1822 );
1823
1824 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1825 "decimals",
1826 decimals.data_type().clone(),
1827 false,
1828 )])))
1829 .len(8)
1830 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1831 .child_data(vec![decimals.into_data()])
1832 .build()
1833 .unwrap();
1834
1835 let written =
1836 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1837 .unwrap();
1838
1839 let mut buffer = Vec::with_capacity(1024);
1840 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1841 writer.write(&written).unwrap();
1842 writer.close().unwrap();
1843
1844 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1845 .unwrap()
1846 .collect::<Result<Vec<_>, _>>()
1847 .unwrap();
1848
1849 assert_eq!(&written.slice(0, 3), &read[0]);
1850 assert_eq!(&written.slice(3, 3), &read[1]);
1851 assert_eq!(&written.slice(6, 2), &read[2]);
1852 }
1853
1854 #[test]
1855 fn test_int32_nullable_struct() {
1856 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1857 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1858 "int32",
1859 int32.data_type().clone(),
1860 false,
1861 )])))
1862 .len(8)
1863 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1864 .child_data(vec![int32.into_data()])
1865 .build()
1866 .unwrap();
1867
1868 let written =
1869 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1870 .unwrap();
1871
1872 let mut buffer = Vec::with_capacity(1024);
1873 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1874 writer.write(&written).unwrap();
1875 writer.close().unwrap();
1876
1877 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1878 .unwrap()
1879 .collect::<Result<Vec<_>, _>>()
1880 .unwrap();
1881
1882 assert_eq!(&written.slice(0, 3), &read[0]);
1883 assert_eq!(&written.slice(3, 3), &read[1]);
1884 assert_eq!(&written.slice(6, 2), &read[2]);
1885 }
1886
1887 #[test]
1888 fn test_decimal_list() {
1889 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1890
1891 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1893 decimals.data_type().clone(),
1894 false,
1895 ))))
1896 .len(7)
1897 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1898 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1899 .child_data(vec![decimals.into_data()])
1900 .build()
1901 .unwrap();
1902
1903 let written =
1904 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1905 .unwrap();
1906
1907 let mut buffer = Vec::with_capacity(1024);
1908 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1909 writer.write(&written).unwrap();
1910 writer.close().unwrap();
1911
1912 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1913 .unwrap()
1914 .collect::<Result<Vec<_>, _>>()
1915 .unwrap();
1916
1917 assert_eq!(&written.slice(0, 3), &read[0]);
1918 assert_eq!(&written.slice(3, 3), &read[1]);
1919 assert_eq!(&written.slice(6, 1), &read[2]);
1920 }
1921
1922 #[test]
1923 fn test_read_decimal_file() {
1924 use arrow_array::Decimal128Array;
1925 let testdata = arrow::util::test_util::parquet_test_data();
1926 let file_variants = vec![
1927 ("byte_array", 4),
1928 ("fixed_length", 25),
1929 ("int32", 4),
1930 ("int64", 10),
1931 ];
1932 for (prefix, target_precision) in file_variants {
1933 let path = format!("{testdata}/{prefix}_decimal.parquet");
1934 let file = File::open(path).unwrap();
1935 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1936
1937 let batch = record_reader.next().unwrap().unwrap();
1938 assert_eq!(batch.num_rows(), 24);
1939 let col = batch
1940 .column(0)
1941 .as_any()
1942 .downcast_ref::<Decimal128Array>()
1943 .unwrap();
1944
1945 let expected = 1..25;
1946
1947 assert_eq!(col.precision(), target_precision);
1948 assert_eq!(col.scale(), 2);
1949
1950 for (i, v) in expected.enumerate() {
1951 assert_eq!(col.value(i), v * 100_i128);
1952 }
1953 }
1954 }
1955
1956 #[test]
1957 fn test_read_float16_nonzeros_file() {
1958 use arrow_array::Float16Array;
1959 let testdata = arrow::util::test_util::parquet_test_data();
1960 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1962 let file = File::open(path).unwrap();
1963 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1964
1965 let batch = record_reader.next().unwrap().unwrap();
1966 assert_eq!(batch.num_rows(), 8);
1967 let col = batch
1968 .column(0)
1969 .as_any()
1970 .downcast_ref::<Float16Array>()
1971 .unwrap();
1972
1973 let f16_two = f16::ONE + f16::ONE;
1974
1975 assert_eq!(col.null_count(), 1);
1976 assert!(col.is_null(0));
1977 assert_eq!(col.value(1), f16::ONE);
1978 assert_eq!(col.value(2), -f16_two);
1979 assert!(col.value(3).is_nan());
1980 assert_eq!(col.value(4), f16::ZERO);
1981 assert!(col.value(4).is_sign_positive());
1982 assert_eq!(col.value(5), f16::NEG_ONE);
1983 assert_eq!(col.value(6), f16::NEG_ZERO);
1984 assert!(col.value(6).is_sign_negative());
1985 assert_eq!(col.value(7), f16_two);
1986 }
1987
1988 #[test]
1989 fn test_read_float16_zeros_file() {
1990 use arrow_array::Float16Array;
1991 let testdata = arrow::util::test_util::parquet_test_data();
1992 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1994 let file = File::open(path).unwrap();
1995 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1996
1997 let batch = record_reader.next().unwrap().unwrap();
1998 assert_eq!(batch.num_rows(), 3);
1999 let col = batch
2000 .column(0)
2001 .as_any()
2002 .downcast_ref::<Float16Array>()
2003 .unwrap();
2004
2005 assert_eq!(col.null_count(), 1);
2006 assert!(col.is_null(0));
2007 assert_eq!(col.value(1), f16::ZERO);
2008 assert!(col.value(1).is_sign_positive());
2009 assert!(col.value(2).is_nan());
2010 }
2011
2012 #[test]
2013 fn test_read_float32_float64_byte_stream_split() {
2014 let path = format!(
2015 "{}/byte_stream_split.zstd.parquet",
2016 arrow::util::test_util::parquet_test_data(),
2017 );
2018 let file = File::open(path).unwrap();
2019 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2020
2021 let mut row_count = 0;
2022 for batch in record_reader {
2023 let batch = batch.unwrap();
2024 row_count += batch.num_rows();
2025 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2026 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2027
2028 for &x in f32_col.values() {
2030 assert!(x > -10.0);
2031 assert!(x < 10.0);
2032 }
2033 for &x in f64_col.values() {
2034 assert!(x > -10.0);
2035 assert!(x < 10.0);
2036 }
2037 }
2038 assert_eq!(row_count, 300);
2039 }
2040
2041 #[test]
2042 fn test_read_extended_byte_stream_split() {
2043 let path = format!(
2044 "{}/byte_stream_split_extended.gzip.parquet",
2045 arrow::util::test_util::parquet_test_data(),
2046 );
2047 let file = File::open(path).unwrap();
2048 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2049
2050 let mut row_count = 0;
2051 for batch in record_reader {
2052 let batch = batch.unwrap();
2053 row_count += batch.num_rows();
2054
2055 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2057 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2058 assert_eq!(f16_col.len(), f16_bss.len());
2059 f16_col
2060 .iter()
2061 .zip(f16_bss.iter())
2062 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2063
2064 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2066 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2067 assert_eq!(f32_col.len(), f32_bss.len());
2068 f32_col
2069 .iter()
2070 .zip(f32_bss.iter())
2071 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2072
2073 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2075 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2076 assert_eq!(f64_col.len(), f64_bss.len());
2077 f64_col
2078 .iter()
2079 .zip(f64_bss.iter())
2080 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2081
2082 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2084 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2085 assert_eq!(i32_col.len(), i32_bss.len());
2086 i32_col
2087 .iter()
2088 .zip(i32_bss.iter())
2089 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2090
2091 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2093 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2094 assert_eq!(i64_col.len(), i64_bss.len());
2095 i64_col
2096 .iter()
2097 .zip(i64_bss.iter())
2098 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2099
2100 let flba_col = batch.column(10).as_fixed_size_binary();
2102 let flba_bss = batch.column(11).as_fixed_size_binary();
2103 assert_eq!(flba_col.len(), flba_bss.len());
2104 flba_col
2105 .iter()
2106 .zip(flba_bss.iter())
2107 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2108
2109 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2111 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2112 assert_eq!(dec_col.len(), dec_bss.len());
2113 dec_col
2114 .iter()
2115 .zip(dec_bss.iter())
2116 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2117 }
2118 assert_eq!(row_count, 200);
2119 }
2120
2121 #[test]
2122 fn test_read_incorrect_map_schema_file() {
2123 let testdata = arrow::util::test_util::parquet_test_data();
2124 let path = format!("{testdata}/incorrect_map_schema.parquet");
2126 let file = File::open(path).unwrap();
2127 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2128
2129 let batch = record_reader.next().unwrap().unwrap();
2130 assert_eq!(batch.num_rows(), 1);
2131
2132 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2133 "my_map",
2134 ArrowDataType::Map(
2135 Arc::new(Field::new(
2136 "key_value",
2137 ArrowDataType::Struct(Fields::from(vec![
2138 Field::new("key", ArrowDataType::Utf8, false),
2139 Field::new("value", ArrowDataType::Utf8, true),
2140 ])),
2141 false,
2142 )),
2143 false,
2144 ),
2145 true,
2146 )]));
2147 assert_eq!(batch.schema().as_ref(), &expected_schema);
2148
2149 assert_eq!(batch.num_rows(), 1);
2150 assert_eq!(batch.column(0).null_count(), 0);
2151 assert_eq!(
2152 batch.column(0).as_map().keys().as_ref(),
2153 &StringArray::from(vec!["parent", "name"])
2154 );
2155 assert_eq!(
2156 batch.column(0).as_map().values().as_ref(),
2157 &StringArray::from(vec!["another", "report"])
2158 );
2159 }
2160
2161 #[test]
2162 fn test_read_dict_fixed_size_binary() {
2163 let schema = Arc::new(Schema::new(vec![Field::new(
2164 "a",
2165 ArrowDataType::Dictionary(
2166 Box::new(ArrowDataType::UInt8),
2167 Box::new(ArrowDataType::FixedSizeBinary(8)),
2168 ),
2169 true,
2170 )]));
2171 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2172 let values = FixedSizeBinaryArray::try_from_iter(
2173 vec![
2174 (0u8..8u8).collect::<Vec<u8>>(),
2175 (24u8..32u8).collect::<Vec<u8>>(),
2176 ]
2177 .into_iter(),
2178 )
2179 .unwrap();
2180 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2181 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2182
2183 let mut buffer = Vec::with_capacity(1024);
2184 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2185 writer.write(&batch).unwrap();
2186 writer.close().unwrap();
2187 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2188 .unwrap()
2189 .collect::<Result<Vec<_>, _>>()
2190 .unwrap();
2191
2192 assert_eq!(read.len(), 1);
2193 assert_eq!(&batch, &read[0])
2194 }
2195
2196 #[derive(Clone)]
2198 struct TestOptions {
2199 num_row_groups: usize,
2202 num_rows: usize,
2204 record_batch_size: usize,
2206 null_percent: Option<usize>,
2208 write_batch_size: usize,
2213 max_data_page_size: usize,
2215 max_dict_page_size: usize,
2217 writer_version: WriterVersion,
2219 enabled_statistics: EnabledStatistics,
2221 encoding: Encoding,
2223 row_selections: Option<(RowSelection, usize)>,
2225 row_filter: Option<Vec<bool>>,
2227 limit: Option<usize>,
2229 offset: Option<usize>,
2231 }
2232
2233 impl std::fmt::Debug for TestOptions {
2235 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2236 f.debug_struct("TestOptions")
2237 .field("num_row_groups", &self.num_row_groups)
2238 .field("num_rows", &self.num_rows)
2239 .field("record_batch_size", &self.record_batch_size)
2240 .field("null_percent", &self.null_percent)
2241 .field("write_batch_size", &self.write_batch_size)
2242 .field("max_data_page_size", &self.max_data_page_size)
2243 .field("max_dict_page_size", &self.max_dict_page_size)
2244 .field("writer_version", &self.writer_version)
2245 .field("enabled_statistics", &self.enabled_statistics)
2246 .field("encoding", &self.encoding)
2247 .field("row_selections", &self.row_selections.is_some())
2248 .field("row_filter", &self.row_filter.is_some())
2249 .field("limit", &self.limit)
2250 .field("offset", &self.offset)
2251 .finish()
2252 }
2253 }
2254
2255 impl Default for TestOptions {
2256 fn default() -> Self {
2257 Self {
2258 num_row_groups: 2,
2259 num_rows: 100,
2260 record_batch_size: 15,
2261 null_percent: None,
2262 write_batch_size: 64,
2263 max_data_page_size: 1024 * 1024,
2264 max_dict_page_size: 1024 * 1024,
2265 writer_version: WriterVersion::PARQUET_1_0,
2266 enabled_statistics: EnabledStatistics::Page,
2267 encoding: Encoding::PLAIN,
2268 row_selections: None,
2269 row_filter: None,
2270 limit: None,
2271 offset: None,
2272 }
2273 }
2274 }
2275
2276 impl TestOptions {
2277 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2278 Self {
2279 num_row_groups,
2280 num_rows,
2281 record_batch_size,
2282 ..Default::default()
2283 }
2284 }
2285
2286 fn with_null_percent(self, null_percent: usize) -> Self {
2287 Self {
2288 null_percent: Some(null_percent),
2289 ..self
2290 }
2291 }
2292
2293 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2294 Self {
2295 max_data_page_size,
2296 ..self
2297 }
2298 }
2299
2300 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2301 Self {
2302 max_dict_page_size,
2303 ..self
2304 }
2305 }
2306
2307 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2308 Self {
2309 enabled_statistics,
2310 ..self
2311 }
2312 }
2313
2314 fn with_row_selections(self) -> Self {
2315 assert!(self.row_filter.is_none(), "Must set row selection first");
2316
2317 let mut rng = rng();
2318 let step = rng.random_range(self.record_batch_size..self.num_rows);
2319 let row_selections = create_test_selection(
2320 step,
2321 self.num_row_groups * self.num_rows,
2322 rng.random::<bool>(),
2323 );
2324 Self {
2325 row_selections: Some(row_selections),
2326 ..self
2327 }
2328 }
2329
2330 fn with_row_filter(self) -> Self {
2331 let row_count = match &self.row_selections {
2332 Some((_, count)) => *count,
2333 None => self.num_row_groups * self.num_rows,
2334 };
2335
2336 let mut rng = rng();
2337 Self {
2338 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2339 ..self
2340 }
2341 }
2342
2343 fn with_limit(self, limit: usize) -> Self {
2344 Self {
2345 limit: Some(limit),
2346 ..self
2347 }
2348 }
2349
2350 fn with_offset(self, offset: usize) -> Self {
2351 Self {
2352 offset: Some(offset),
2353 ..self
2354 }
2355 }
2356
2357 fn writer_props(&self) -> WriterProperties {
2358 let builder = WriterProperties::builder()
2359 .set_data_page_size_limit(self.max_data_page_size)
2360 .set_write_batch_size(self.write_batch_size)
2361 .set_writer_version(self.writer_version)
2362 .set_statistics_enabled(self.enabled_statistics);
2363
2364 let builder = match self.encoding {
2365 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2366 .set_dictionary_enabled(true)
2367 .set_dictionary_page_size_limit(self.max_dict_page_size),
2368 _ => builder
2369 .set_dictionary_enabled(false)
2370 .set_encoding(self.encoding),
2371 };
2372
2373 builder.build()
2374 }
2375 }
2376
2377 fn run_single_column_reader_tests<T, F, G>(
2384 rand_max: i32,
2385 converted_type: ConvertedType,
2386 arrow_type: Option<ArrowDataType>,
2387 converter: F,
2388 encodings: &[Encoding],
2389 ) where
2390 T: DataType,
2391 G: RandGen<T>,
2392 F: Fn(&[Option<T::T>]) -> ArrayRef,
2393 {
2394 let all_options = vec![
2395 TestOptions::new(2, 100, 15),
2398 TestOptions::new(3, 25, 5),
2403 TestOptions::new(4, 100, 25),
2407 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2409 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2411 TestOptions::new(2, 256, 127).with_null_percent(0),
2413 TestOptions::new(2, 256, 93).with_null_percent(25),
2415 TestOptions::new(4, 100, 25).with_limit(0),
2417 TestOptions::new(4, 100, 25).with_limit(50),
2419 TestOptions::new(4, 100, 25).with_limit(10),
2421 TestOptions::new(4, 100, 25).with_limit(101),
2423 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2425 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2427 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2429 TestOptions::new(2, 256, 91)
2431 .with_null_percent(25)
2432 .with_enabled_statistics(EnabledStatistics::Chunk),
2433 TestOptions::new(2, 256, 91)
2435 .with_null_percent(25)
2436 .with_enabled_statistics(EnabledStatistics::None),
2437 TestOptions::new(2, 128, 91)
2439 .with_null_percent(100)
2440 .with_enabled_statistics(EnabledStatistics::None),
2441 TestOptions::new(2, 100, 15).with_row_selections(),
2446 TestOptions::new(3, 25, 5).with_row_selections(),
2451 TestOptions::new(4, 100, 25).with_row_selections(),
2455 TestOptions::new(3, 256, 73)
2457 .with_max_data_page_size(128)
2458 .with_row_selections(),
2459 TestOptions::new(3, 256, 57)
2461 .with_max_dict_page_size(128)
2462 .with_row_selections(),
2463 TestOptions::new(2, 256, 127)
2465 .with_null_percent(0)
2466 .with_row_selections(),
2467 TestOptions::new(2, 256, 93)
2469 .with_null_percent(25)
2470 .with_row_selections(),
2471 TestOptions::new(2, 256, 93)
2473 .with_null_percent(25)
2474 .with_row_selections()
2475 .with_limit(10),
2476 TestOptions::new(2, 256, 93)
2478 .with_null_percent(25)
2479 .with_row_selections()
2480 .with_offset(20)
2481 .with_limit(10),
2482 TestOptions::new(4, 100, 25).with_row_filter(),
2486 TestOptions::new(4, 100, 25)
2488 .with_row_selections()
2489 .with_row_filter(),
2490 TestOptions::new(2, 256, 93)
2492 .with_null_percent(25)
2493 .with_max_data_page_size(10)
2494 .with_row_filter(),
2495 TestOptions::new(2, 256, 93)
2497 .with_null_percent(25)
2498 .with_max_data_page_size(10)
2499 .with_row_selections()
2500 .with_row_filter(),
2501 TestOptions::new(2, 256, 93)
2503 .with_enabled_statistics(EnabledStatistics::None)
2504 .with_max_data_page_size(10)
2505 .with_row_selections(),
2506 ];
2507
2508 all_options.into_iter().for_each(|opts| {
2509 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2510 for encoding in encodings {
2511 let opts = TestOptions {
2512 writer_version,
2513 encoding: *encoding,
2514 ..opts.clone()
2515 };
2516
2517 single_column_reader_test::<T, _, G>(
2518 opts,
2519 rand_max,
2520 converted_type,
2521 arrow_type.clone(),
2522 &converter,
2523 )
2524 }
2525 }
2526 });
2527 }
2528
2529 fn single_column_reader_test<T, F, G>(
2533 opts: TestOptions,
2534 rand_max: i32,
2535 converted_type: ConvertedType,
2536 arrow_type: Option<ArrowDataType>,
2537 converter: F,
2538 ) where
2539 T: DataType,
2540 G: RandGen<T>,
2541 F: Fn(&[Option<T::T>]) -> ArrayRef,
2542 {
2543 println!(
2545 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2546 T::get_physical_type(), converted_type, arrow_type, opts
2547 );
2548
2549 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2551 Some(null_percent) => {
2552 let mut rng = rng();
2553
2554 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2555 .map(|_| {
2556 std::iter::from_fn(|| {
2557 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2558 })
2559 .take(opts.num_rows)
2560 .collect()
2561 })
2562 .collect();
2563 (Repetition::OPTIONAL, Some(def_levels))
2564 }
2565 None => (Repetition::REQUIRED, None),
2566 };
2567
2568 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2570 .map(|idx| {
2571 let null_count = match def_levels.as_ref() {
2572 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2573 None => 0,
2574 };
2575 G::gen_vec(rand_max, opts.num_rows - null_count)
2576 })
2577 .collect();
2578
2579 let len = match T::get_physical_type() {
2580 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2581 crate::basic::Type::INT96 => 12,
2582 _ => -1,
2583 };
2584
2585 let fields = vec![Arc::new(
2586 Type::primitive_type_builder("leaf", T::get_physical_type())
2587 .with_repetition(repetition)
2588 .with_converted_type(converted_type)
2589 .with_length(len)
2590 .build()
2591 .unwrap(),
2592 )];
2593
2594 let schema = Arc::new(
2595 Type::group_type_builder("test_schema")
2596 .with_fields(fields)
2597 .build()
2598 .unwrap(),
2599 );
2600
2601 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2602
2603 let mut file = tempfile::tempfile().unwrap();
2604
2605 generate_single_column_file_with_data::<T>(
2606 &values,
2607 def_levels.as_ref(),
2608 file.try_clone().unwrap(), schema,
2610 arrow_field,
2611 &opts,
2612 )
2613 .unwrap();
2614
2615 file.rewind().unwrap();
2616
2617 let options = ArrowReaderOptions::new()
2618 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2619
2620 let mut builder =
2621 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2622
2623 let expected_data = match opts.row_selections {
2624 Some((selections, row_count)) => {
2625 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2626
2627 let mut skip_data: Vec<Option<T::T>> = vec![];
2628 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2629 for select in dequeue {
2630 if select.skip {
2631 without_skip_data.drain(0..select.row_count);
2632 } else {
2633 skip_data.extend(without_skip_data.drain(0..select.row_count));
2634 }
2635 }
2636 builder = builder.with_row_selection(selections);
2637
2638 assert_eq!(skip_data.len(), row_count);
2639 skip_data
2640 }
2641 None => {
2642 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2644 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2645 expected_data
2646 }
2647 };
2648
2649 let mut expected_data = match opts.row_filter {
2650 Some(filter) => {
2651 let expected_data = expected_data
2652 .into_iter()
2653 .zip(filter.iter())
2654 .filter_map(|(d, f)| f.then(|| d))
2655 .collect();
2656
2657 let mut filter_offset = 0;
2658 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2659 ProjectionMask::all(),
2660 move |b| {
2661 let array = BooleanArray::from_iter(
2662 filter
2663 .iter()
2664 .skip(filter_offset)
2665 .take(b.num_rows())
2666 .map(|x| Some(*x)),
2667 );
2668 filter_offset += b.num_rows();
2669 Ok(array)
2670 },
2671 ))]);
2672
2673 builder = builder.with_row_filter(filter);
2674 expected_data
2675 }
2676 None => expected_data,
2677 };
2678
2679 if let Some(offset) = opts.offset {
2680 builder = builder.with_offset(offset);
2681 expected_data = expected_data.into_iter().skip(offset).collect();
2682 }
2683
2684 if let Some(limit) = opts.limit {
2685 builder = builder.with_limit(limit);
2686 expected_data = expected_data.into_iter().take(limit).collect();
2687 }
2688
2689 let mut record_reader = builder
2690 .with_batch_size(opts.record_batch_size)
2691 .build()
2692 .unwrap();
2693
2694 let mut total_read = 0;
2695 loop {
2696 let maybe_batch = record_reader.next();
2697 if total_read < expected_data.len() {
2698 let end = min(total_read + opts.record_batch_size, expected_data.len());
2699 let batch = maybe_batch.unwrap().unwrap();
2700 assert_eq!(end - total_read, batch.num_rows());
2701
2702 let a = converter(&expected_data[total_read..end]);
2703 let b = Arc::clone(batch.column(0));
2704
2705 assert_eq!(a.data_type(), b.data_type());
2706 assert_eq!(a.to_data(), b.to_data());
2707 assert_eq!(
2708 a.as_any().type_id(),
2709 b.as_any().type_id(),
2710 "incorrect type ids"
2711 );
2712
2713 total_read = end;
2714 } else {
2715 assert!(maybe_batch.is_none());
2716 break;
2717 }
2718 }
2719 }
2720
2721 fn gen_expected_data<T: DataType>(
2722 def_levels: Option<&Vec<Vec<i16>>>,
2723 values: &[Vec<T::T>],
2724 ) -> Vec<Option<T::T>> {
2725 let data: Vec<Option<T::T>> = match def_levels {
2726 Some(levels) => {
2727 let mut values_iter = values.iter().flatten();
2728 levels
2729 .iter()
2730 .flatten()
2731 .map(|d| match d {
2732 1 => Some(values_iter.next().cloned().unwrap()),
2733 0 => None,
2734 _ => unreachable!(),
2735 })
2736 .collect()
2737 }
2738 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2739 };
2740 data
2741 }
2742
2743 fn generate_single_column_file_with_data<T: DataType>(
2744 values: &[Vec<T::T>],
2745 def_levels: Option<&Vec<Vec<i16>>>,
2746 file: File,
2747 schema: TypePtr,
2748 field: Option<Field>,
2749 opts: &TestOptions,
2750 ) -> Result<crate::format::FileMetaData> {
2751 let mut writer_props = opts.writer_props();
2752 if let Some(field) = field {
2753 let arrow_schema = Schema::new(vec![field]);
2754 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2755 }
2756
2757 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2758
2759 for (idx, v) in values.iter().enumerate() {
2760 let def_levels = def_levels.map(|d| d[idx].as_slice());
2761 let mut row_group_writer = writer.next_row_group()?;
2762 {
2763 let mut column_writer = row_group_writer
2764 .next_column()?
2765 .expect("Column writer is none!");
2766
2767 column_writer
2768 .typed::<T>()
2769 .write_batch(v, def_levels, None)?;
2770
2771 column_writer.close()?;
2772 }
2773 row_group_writer.close()?;
2774 }
2775
2776 writer.close()
2777 }
2778
2779 fn get_test_file(file_name: &str) -> File {
2780 let mut path = PathBuf::new();
2781 path.push(arrow::util::test_util::arrow_test_data());
2782 path.push(file_name);
2783
2784 File::open(path.as_path()).expect("File not found!")
2785 }
2786
2787 #[test]
2788 fn test_read_structs() {
2789 let testdata = arrow::util::test_util::parquet_test_data();
2793 let path = format!("{testdata}/nested_structs.rust.parquet");
2794 let file = File::open(&path).unwrap();
2795 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2796
2797 for batch in record_batch_reader {
2798 batch.unwrap();
2799 }
2800
2801 let file = File::open(&path).unwrap();
2802 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2803
2804 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2805 let projected_reader = builder
2806 .with_projection(mask)
2807 .with_batch_size(60)
2808 .build()
2809 .unwrap();
2810
2811 let expected_schema = Schema::new(vec![
2812 Field::new(
2813 "roll_num",
2814 ArrowDataType::Struct(Fields::from(vec![Field::new(
2815 "count",
2816 ArrowDataType::UInt64,
2817 false,
2818 )])),
2819 false,
2820 ),
2821 Field::new(
2822 "PC_CUR",
2823 ArrowDataType::Struct(Fields::from(vec![
2824 Field::new("mean", ArrowDataType::Int64, false),
2825 Field::new("sum", ArrowDataType::Int64, false),
2826 ])),
2827 false,
2828 ),
2829 ]);
2830
2831 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2833
2834 for batch in projected_reader {
2835 let batch = batch.unwrap();
2836 assert_eq!(batch.schema().as_ref(), &expected_schema);
2837 }
2838 }
2839
2840 #[test]
2841 fn test_read_structs_by_name() {
2843 let testdata = arrow::util::test_util::parquet_test_data();
2844 let path = format!("{testdata}/nested_structs.rust.parquet");
2845 let file = File::open(&path).unwrap();
2846 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2847
2848 for batch in record_batch_reader {
2849 batch.unwrap();
2850 }
2851
2852 let file = File::open(&path).unwrap();
2853 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2854
2855 let mask = ProjectionMask::columns(
2856 builder.parquet_schema(),
2857 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2858 );
2859 let projected_reader = builder
2860 .with_projection(mask)
2861 .with_batch_size(60)
2862 .build()
2863 .unwrap();
2864
2865 let expected_schema = Schema::new(vec![
2866 Field::new(
2867 "roll_num",
2868 ArrowDataType::Struct(Fields::from(vec![Field::new(
2869 "count",
2870 ArrowDataType::UInt64,
2871 false,
2872 )])),
2873 false,
2874 ),
2875 Field::new(
2876 "PC_CUR",
2877 ArrowDataType::Struct(Fields::from(vec![
2878 Field::new("mean", ArrowDataType::Int64, false),
2879 Field::new("sum", ArrowDataType::Int64, false),
2880 ])),
2881 false,
2882 ),
2883 ]);
2884
2885 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2886
2887 for batch in projected_reader {
2888 let batch = batch.unwrap();
2889 assert_eq!(batch.schema().as_ref(), &expected_schema);
2890 }
2891 }
2892
2893 #[test]
2894 fn test_read_maps() {
2895 let testdata = arrow::util::test_util::parquet_test_data();
2896 let path = format!("{testdata}/nested_maps.snappy.parquet");
2897 let file = File::open(path).unwrap();
2898 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2899
2900 for batch in record_batch_reader {
2901 batch.unwrap();
2902 }
2903 }
2904
2905 #[test]
2906 fn test_nested_nullability() {
2907 let message_type = "message nested {
2908 OPTIONAL Group group {
2909 REQUIRED INT32 leaf;
2910 }
2911 }";
2912
2913 let file = tempfile::tempfile().unwrap();
2914 let schema = Arc::new(parse_message_type(message_type).unwrap());
2915
2916 {
2917 let mut writer =
2919 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2920 .unwrap();
2921
2922 {
2923 let mut row_group_writer = writer.next_row_group().unwrap();
2924 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2925
2926 column_writer
2927 .typed::<Int32Type>()
2928 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2929 .unwrap();
2930
2931 column_writer.close().unwrap();
2932 row_group_writer.close().unwrap();
2933 }
2934
2935 writer.close().unwrap();
2936 }
2937
2938 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2939 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2940
2941 let reader = builder.with_projection(mask).build().unwrap();
2942
2943 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2944 "group",
2945 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2946 true,
2947 )]));
2948
2949 let batch = reader.into_iter().next().unwrap().unwrap();
2950 assert_eq!(batch.schema().as_ref(), &expected_schema);
2951 assert_eq!(batch.num_rows(), 4);
2952 assert_eq!(batch.column(0).null_count(), 2);
2953 }
2954
2955 #[test]
2956 fn test_invalid_utf8() {
2957 let data = vec![
2959 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2960 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2961 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2962 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2963 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2964 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2965 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2966 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2967 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2968 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2969 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2970 82, 49,
2971 ];
2972
2973 let file = Bytes::from(data);
2974 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2975
2976 let error = record_batch_reader.next().unwrap().unwrap_err();
2977
2978 assert!(
2979 error.to_string().contains("invalid utf-8 sequence"),
2980 "{}",
2981 error
2982 );
2983 }
2984
2985 #[test]
2986 fn test_invalid_utf8_string_array() {
2987 test_invalid_utf8_string_array_inner::<i32>();
2988 }
2989
2990 #[test]
2991 fn test_invalid_utf8_large_string_array() {
2992 test_invalid_utf8_string_array_inner::<i64>();
2993 }
2994
2995 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2996 let cases = [
2997 invalid_utf8_first_char::<O>(),
2998 invalid_utf8_first_char_long_strings::<O>(),
2999 invalid_utf8_later_char::<O>(),
3000 invalid_utf8_later_char_long_strings::<O>(),
3001 invalid_utf8_later_char_really_long_strings::<O>(),
3002 invalid_utf8_later_char_really_long_strings2::<O>(),
3003 ];
3004 for array in &cases {
3005 for encoding in STRING_ENCODINGS {
3006 let array = unsafe {
3009 GenericStringArray::<O>::new_unchecked(
3010 array.offsets().clone(),
3011 array.values().clone(),
3012 array.nulls().cloned(),
3013 )
3014 };
3015 let data_type = array.data_type().clone();
3016 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3017 let err = read_from_parquet(data).unwrap_err();
3018 let expected_err =
3019 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3020 assert!(
3021 err.to_string().contains(expected_err),
3022 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3023 );
3024 }
3025 }
3026 }
3027
3028 #[test]
3029 fn test_invalid_utf8_string_view_array() {
3030 let cases = [
3031 invalid_utf8_first_char::<i32>(),
3032 invalid_utf8_first_char_long_strings::<i32>(),
3033 invalid_utf8_later_char::<i32>(),
3034 invalid_utf8_later_char_long_strings::<i32>(),
3035 invalid_utf8_later_char_really_long_strings::<i32>(),
3036 invalid_utf8_later_char_really_long_strings2::<i32>(),
3037 ];
3038
3039 for encoding in STRING_ENCODINGS {
3040 for array in &cases {
3041 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3042 let array = array.as_binary_view();
3043
3044 let array = unsafe {
3047 StringViewArray::new_unchecked(
3048 array.views().clone(),
3049 array.data_buffers().to_vec(),
3050 array.nulls().cloned(),
3051 )
3052 };
3053
3054 let data_type = array.data_type().clone();
3055 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3056 let err = read_from_parquet(data).unwrap_err();
3057 let expected_err =
3058 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3059 assert!(
3060 err.to_string().contains(expected_err),
3061 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3062 );
3063 }
3064 }
3065 }
3066
3067 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3069 None,
3070 Some(Encoding::PLAIN),
3071 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3072 Some(Encoding::DELTA_BYTE_ARRAY),
3073 ];
3074
3075 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3078
3079 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3082
3083 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3085 let valid: &[u8] = b" ";
3086 let invalid = INVALID_UTF8_FIRST_CHAR;
3087 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3088 }
3089
3090 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3094 let valid: &[u8] = b" ";
3095 let mut invalid = vec![];
3096 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3097 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3098 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3099 }
3100
3101 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3104 let valid: &[u8] = b" ";
3105 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3106 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3107 }
3108
3109 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3113 let valid: &[u8] = b" ";
3114 let mut invalid = vec![];
3115 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3116 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3117 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3118 }
3119
3120 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3124 let valid: &[u8] = b" ";
3125 let mut invalid = vec![];
3126 for _ in 0..10 {
3127 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3129 }
3130 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3131 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3132 }
3133
3134 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3137 let valid: &[u8] = b" ";
3138 let mut valid_long = vec![];
3139 for _ in 0..10 {
3140 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3142 }
3143 let invalid = INVALID_UTF8_LATER_CHAR;
3144 GenericBinaryArray::<O>::from_iter(vec![
3145 None,
3146 Some(valid),
3147 Some(invalid),
3148 None,
3149 Some(&valid_long),
3150 Some(valid),
3151 ])
3152 }
3153
3154 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3159 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3160 let mut data = vec![];
3161 let schema = batch.schema();
3162 let props = encoding.map(|encoding| {
3163 WriterProperties::builder()
3164 .set_dictionary_enabled(false)
3166 .set_encoding(encoding)
3167 .build()
3168 });
3169
3170 {
3171 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3172 writer.write(&batch).unwrap();
3173 writer.flush().unwrap();
3174 writer.close().unwrap();
3175 };
3176 data
3177 }
3178
3179 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3181 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3182 .unwrap()
3183 .build()
3184 .unwrap();
3185
3186 reader.collect()
3187 }
3188
3189 #[test]
3190 fn test_dictionary_preservation() {
3191 let fields = vec![Arc::new(
3192 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3193 .with_repetition(Repetition::OPTIONAL)
3194 .with_converted_type(ConvertedType::UTF8)
3195 .build()
3196 .unwrap(),
3197 )];
3198
3199 let schema = Arc::new(
3200 Type::group_type_builder("test_schema")
3201 .with_fields(fields)
3202 .build()
3203 .unwrap(),
3204 );
3205
3206 let dict_type = ArrowDataType::Dictionary(
3207 Box::new(ArrowDataType::Int32),
3208 Box::new(ArrowDataType::Utf8),
3209 );
3210
3211 let arrow_field = Field::new("leaf", dict_type, true);
3212
3213 let mut file = tempfile::tempfile().unwrap();
3214
3215 let values = vec![
3216 vec![
3217 ByteArray::from("hello"),
3218 ByteArray::from("a"),
3219 ByteArray::from("b"),
3220 ByteArray::from("d"),
3221 ],
3222 vec![
3223 ByteArray::from("c"),
3224 ByteArray::from("a"),
3225 ByteArray::from("b"),
3226 ],
3227 ];
3228
3229 let def_levels = vec![
3230 vec![1, 0, 0, 1, 0, 0, 1, 1],
3231 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3232 ];
3233
3234 let opts = TestOptions {
3235 encoding: Encoding::RLE_DICTIONARY,
3236 ..Default::default()
3237 };
3238
3239 generate_single_column_file_with_data::<ByteArrayType>(
3240 &values,
3241 Some(&def_levels),
3242 file.try_clone().unwrap(), schema,
3244 Some(arrow_field),
3245 &opts,
3246 )
3247 .unwrap();
3248
3249 file.rewind().unwrap();
3250
3251 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3252
3253 let batches = record_reader
3254 .collect::<Result<Vec<RecordBatch>, _>>()
3255 .unwrap();
3256
3257 assert_eq!(batches.len(), 6);
3258 assert!(batches.iter().all(|x| x.num_columns() == 1));
3259
3260 let row_counts = batches
3261 .iter()
3262 .map(|x| (x.num_rows(), x.column(0).null_count()))
3263 .collect::<Vec<_>>();
3264
3265 assert_eq!(
3266 row_counts,
3267 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3268 );
3269
3270 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3271
3272 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3274 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3276 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3277 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3279 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3280 }
3281
3282 #[test]
3283 fn test_read_null_list() {
3284 let testdata = arrow::util::test_util::parquet_test_data();
3285 let path = format!("{testdata}/null_list.parquet");
3286 let file = File::open(path).unwrap();
3287 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3288
3289 let batch = record_batch_reader.next().unwrap().unwrap();
3290 assert_eq!(batch.num_rows(), 1);
3291 assert_eq!(batch.num_columns(), 1);
3292 assert_eq!(batch.column(0).len(), 1);
3293
3294 let list = batch
3295 .column(0)
3296 .as_any()
3297 .downcast_ref::<ListArray>()
3298 .unwrap();
3299 assert_eq!(list.len(), 1);
3300 assert!(list.is_valid(0));
3301
3302 let val = list.value(0);
3303 assert_eq!(val.len(), 0);
3304 }
3305
3306 #[test]
3307 fn test_null_schema_inference() {
3308 let testdata = arrow::util::test_util::parquet_test_data();
3309 let path = format!("{testdata}/null_list.parquet");
3310 let file = File::open(path).unwrap();
3311
3312 let arrow_field = Field::new(
3313 "emptylist",
3314 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3315 true,
3316 );
3317
3318 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3319 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3320 let schema = builder.schema();
3321 assert_eq!(schema.fields().len(), 1);
3322 assert_eq!(schema.field(0), &arrow_field);
3323 }
3324
3325 #[test]
3326 fn test_skip_metadata() {
3327 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3328 let field = Field::new("col", col.data_type().clone(), true);
3329
3330 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3331
3332 let metadata = [("key".to_string(), "value".to_string())]
3333 .into_iter()
3334 .collect();
3335
3336 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3337
3338 assert_ne!(schema_with_metadata, schema_without_metadata);
3339
3340 let batch =
3341 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3342
3343 let file = |version: WriterVersion| {
3344 let props = WriterProperties::builder()
3345 .set_writer_version(version)
3346 .build();
3347
3348 let file = tempfile().unwrap();
3349 let mut writer =
3350 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3351 .unwrap();
3352 writer.write(&batch).unwrap();
3353 writer.close().unwrap();
3354 file
3355 };
3356
3357 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3358
3359 let v1_reader = file(WriterVersion::PARQUET_1_0);
3360 let v2_reader = file(WriterVersion::PARQUET_2_0);
3361
3362 let arrow_reader =
3363 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3364 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3365
3366 let reader =
3367 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3368 .unwrap()
3369 .build()
3370 .unwrap();
3371 assert_eq!(reader.schema(), schema_without_metadata);
3372
3373 let arrow_reader =
3374 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3375 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3376
3377 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3378 .unwrap()
3379 .build()
3380 .unwrap();
3381 assert_eq!(reader.schema(), schema_without_metadata);
3382 }
3383
3384 fn write_parquet_from_iter<I, F>(value: I) -> File
3385 where
3386 I: IntoIterator<Item = (F, ArrayRef)>,
3387 F: AsRef<str>,
3388 {
3389 let batch = RecordBatch::try_from_iter(value).unwrap();
3390 let file = tempfile().unwrap();
3391 let mut writer =
3392 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3393 writer.write(&batch).unwrap();
3394 writer.close().unwrap();
3395 file
3396 }
3397
3398 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3399 where
3400 I: IntoIterator<Item = (F, ArrayRef)>,
3401 F: AsRef<str>,
3402 {
3403 let file = write_parquet_from_iter(value);
3404 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3405 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3406 file.try_clone().unwrap(),
3407 options_with_schema,
3408 );
3409 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3410 }
3411
3412 #[test]
3413 fn test_schema_too_few_columns() {
3414 run_schema_test_with_error(
3415 vec![
3416 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3417 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3418 ],
3419 Arc::new(Schema::new(vec![Field::new(
3420 "int64",
3421 ArrowDataType::Int64,
3422 false,
3423 )])),
3424 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3425 );
3426 }
3427
3428 #[test]
3429 fn test_schema_too_many_columns() {
3430 run_schema_test_with_error(
3431 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3432 Arc::new(Schema::new(vec![
3433 Field::new("int64", ArrowDataType::Int64, false),
3434 Field::new("int32", ArrowDataType::Int32, false),
3435 ])),
3436 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3437 );
3438 }
3439
3440 #[test]
3441 fn test_schema_mismatched_column_names() {
3442 run_schema_test_with_error(
3443 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3444 Arc::new(Schema::new(vec![Field::new(
3445 "other",
3446 ArrowDataType::Int64,
3447 false,
3448 )])),
3449 "Arrow: incompatible arrow schema, expected field named int64 got other",
3450 );
3451 }
3452
3453 #[test]
3454 fn test_schema_incompatible_columns() {
3455 run_schema_test_with_error(
3456 vec![
3457 (
3458 "col1_invalid",
3459 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3460 ),
3461 (
3462 "col2_valid",
3463 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3464 ),
3465 (
3466 "col3_invalid",
3467 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3468 ),
3469 ],
3470 Arc::new(Schema::new(vec![
3471 Field::new("col1_invalid", ArrowDataType::Int32, false),
3472 Field::new("col2_valid", ArrowDataType::Int32, false),
3473 Field::new("col3_invalid", ArrowDataType::Int32, false),
3474 ])),
3475 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3476 );
3477 }
3478
3479 #[test]
3480 fn test_one_incompatible_nested_column() {
3481 let nested_fields = Fields::from(vec![
3482 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3483 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3484 ]);
3485 let nested = StructArray::try_new(
3486 nested_fields,
3487 vec![
3488 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3489 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3490 ],
3491 None,
3492 )
3493 .expect("struct array");
3494 let supplied_nested_fields = Fields::from(vec![
3495 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3496 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3497 ]);
3498 run_schema_test_with_error(
3499 vec![
3500 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3501 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3502 ("nested", Arc::new(nested) as ArrayRef),
3503 ],
3504 Arc::new(Schema::new(vec![
3505 Field::new("col1", ArrowDataType::Int64, false),
3506 Field::new("col2", ArrowDataType::Int32, false),
3507 Field::new(
3508 "nested",
3509 ArrowDataType::Struct(supplied_nested_fields),
3510 false,
3511 ),
3512 ])),
3513 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3514 requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3515 but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
3516 );
3517 }
3518
3519 fn utf8_parquet() -> Bytes {
3521 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3522 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3523 let props = None;
3524 let mut parquet_data = vec![];
3526 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3527 writer.write(&batch).unwrap();
3528 writer.close().unwrap();
3529 Bytes::from(parquet_data)
3530 }
3531
3532 #[test]
3533 fn test_schema_error_bad_types() {
3534 let parquet_data = utf8_parquet();
3536
3537 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3539 "column1",
3540 arrow::datatypes::DataType::Int32,
3541 false,
3542 )]));
3543
3544 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3546 let err =
3547 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3548 .unwrap_err();
3549 assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3550 }
3551
3552 #[test]
3553 fn test_schema_error_bad_nullability() {
3554 let parquet_data = utf8_parquet();
3556
3557 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3559 "column1",
3560 arrow::datatypes::DataType::Utf8,
3561 true,
3562 )]));
3563
3564 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3566 let err =
3567 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3568 .unwrap_err();
3569 assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3570 }
3571
3572 #[test]
3573 fn test_read_binary_as_utf8() {
3574 let file = write_parquet_from_iter(vec![
3575 (
3576 "binary_to_utf8",
3577 Arc::new(BinaryArray::from(vec![
3578 b"one".as_ref(),
3579 b"two".as_ref(),
3580 b"three".as_ref(),
3581 ])) as ArrayRef,
3582 ),
3583 (
3584 "large_binary_to_large_utf8",
3585 Arc::new(LargeBinaryArray::from(vec![
3586 b"one".as_ref(),
3587 b"two".as_ref(),
3588 b"three".as_ref(),
3589 ])) as ArrayRef,
3590 ),
3591 (
3592 "binary_view_to_utf8_view",
3593 Arc::new(BinaryViewArray::from(vec![
3594 b"one".as_ref(),
3595 b"two".as_ref(),
3596 b"three".as_ref(),
3597 ])) as ArrayRef,
3598 ),
3599 ]);
3600 let supplied_fields = Fields::from(vec![
3601 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3602 Field::new(
3603 "large_binary_to_large_utf8",
3604 ArrowDataType::LargeUtf8,
3605 false,
3606 ),
3607 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3608 ]);
3609
3610 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3611 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3612 file.try_clone().unwrap(),
3613 options,
3614 )
3615 .expect("reader builder with schema")
3616 .build()
3617 .expect("reader with schema");
3618
3619 let batch = arrow_reader.next().unwrap().unwrap();
3620 assert_eq!(batch.num_columns(), 3);
3621 assert_eq!(batch.num_rows(), 3);
3622 assert_eq!(
3623 batch
3624 .column(0)
3625 .as_string::<i32>()
3626 .iter()
3627 .collect::<Vec<_>>(),
3628 vec![Some("one"), Some("two"), Some("three")]
3629 );
3630
3631 assert_eq!(
3632 batch
3633 .column(1)
3634 .as_string::<i64>()
3635 .iter()
3636 .collect::<Vec<_>>(),
3637 vec![Some("one"), Some("two"), Some("three")]
3638 );
3639
3640 assert_eq!(
3641 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3642 vec![Some("one"), Some("two"), Some("three")]
3643 );
3644 }
3645
3646 #[test]
3647 #[should_panic(expected = "Invalid UTF8 sequence at")]
3648 fn test_read_non_utf8_binary_as_utf8() {
3649 let file = write_parquet_from_iter(vec![(
3650 "non_utf8_binary",
3651 Arc::new(BinaryArray::from(vec![
3652 b"\xDE\x00\xFF".as_ref(),
3653 b"\xDE\x01\xAA".as_ref(),
3654 b"\xDE\x02\xFF".as_ref(),
3655 ])) as ArrayRef,
3656 )]);
3657 let supplied_fields = Fields::from(vec![Field::new(
3658 "non_utf8_binary",
3659 ArrowDataType::Utf8,
3660 false,
3661 )]);
3662
3663 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3664 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3665 file.try_clone().unwrap(),
3666 options,
3667 )
3668 .expect("reader builder with schema")
3669 .build()
3670 .expect("reader with schema");
3671 arrow_reader.next().unwrap().unwrap_err();
3672 }
3673
3674 #[test]
3675 fn test_with_schema() {
3676 let nested_fields = Fields::from(vec![
3677 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3678 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3679 ]);
3680
3681 let nested_arrays: Vec<ArrayRef> = vec![
3682 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3683 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3684 ];
3685
3686 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3687
3688 let file = write_parquet_from_iter(vec![
3689 (
3690 "int32_to_ts_second",
3691 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3692 ),
3693 (
3694 "date32_to_date64",
3695 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3696 ),
3697 ("nested", Arc::new(nested) as ArrayRef),
3698 ]);
3699
3700 let supplied_nested_fields = Fields::from(vec![
3701 Field::new(
3702 "utf8_to_dict",
3703 ArrowDataType::Dictionary(
3704 Box::new(ArrowDataType::Int32),
3705 Box::new(ArrowDataType::Utf8),
3706 ),
3707 false,
3708 ),
3709 Field::new(
3710 "int64_to_ts_nano",
3711 ArrowDataType::Timestamp(
3712 arrow::datatypes::TimeUnit::Nanosecond,
3713 Some("+10:00".into()),
3714 ),
3715 false,
3716 ),
3717 ]);
3718
3719 let supplied_schema = Arc::new(Schema::new(vec![
3720 Field::new(
3721 "int32_to_ts_second",
3722 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3723 false,
3724 ),
3725 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3726 Field::new(
3727 "nested",
3728 ArrowDataType::Struct(supplied_nested_fields),
3729 false,
3730 ),
3731 ]));
3732
3733 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3734 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3735 file.try_clone().unwrap(),
3736 options,
3737 )
3738 .expect("reader builder with schema")
3739 .build()
3740 .expect("reader with schema");
3741
3742 assert_eq!(arrow_reader.schema(), supplied_schema);
3743 let batch = arrow_reader.next().unwrap().unwrap();
3744 assert_eq!(batch.num_columns(), 3);
3745 assert_eq!(batch.num_rows(), 4);
3746 assert_eq!(
3747 batch
3748 .column(0)
3749 .as_any()
3750 .downcast_ref::<TimestampSecondArray>()
3751 .expect("downcast to timestamp second")
3752 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3753 .map(|v| v.to_string())
3754 .expect("value as datetime"),
3755 "1970-01-01 01:00:00 +01:00"
3756 );
3757 assert_eq!(
3758 batch
3759 .column(1)
3760 .as_any()
3761 .downcast_ref::<Date64Array>()
3762 .expect("downcast to date64")
3763 .value_as_date(0)
3764 .map(|v| v.to_string())
3765 .expect("value as date"),
3766 "1970-01-01"
3767 );
3768
3769 let nested = batch
3770 .column(2)
3771 .as_any()
3772 .downcast_ref::<StructArray>()
3773 .expect("downcast to struct");
3774
3775 let nested_dict = nested
3776 .column(0)
3777 .as_any()
3778 .downcast_ref::<Int32DictionaryArray>()
3779 .expect("downcast to dictionary");
3780
3781 assert_eq!(
3782 nested_dict
3783 .values()
3784 .as_any()
3785 .downcast_ref::<StringArray>()
3786 .expect("downcast to string")
3787 .iter()
3788 .collect::<Vec<_>>(),
3789 vec![Some("a"), Some("b")]
3790 );
3791
3792 assert_eq!(
3793 nested_dict.keys().iter().collect::<Vec<_>>(),
3794 vec![Some(0), Some(0), Some(0), Some(1)]
3795 );
3796
3797 assert_eq!(
3798 nested
3799 .column(1)
3800 .as_any()
3801 .downcast_ref::<TimestampNanosecondArray>()
3802 .expect("downcast to timestamp nanosecond")
3803 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3804 .map(|v| v.to_string())
3805 .expect("value as datetime"),
3806 "1970-01-01 10:00:00.000000001 +10:00"
3807 );
3808 }
3809
3810 #[test]
3811 fn test_empty_projection() {
3812 let testdata = arrow::util::test_util::parquet_test_data();
3813 let path = format!("{testdata}/alltypes_plain.parquet");
3814 let file = File::open(path).unwrap();
3815
3816 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3817 let file_metadata = builder.metadata().file_metadata();
3818 let expected_rows = file_metadata.num_rows() as usize;
3819
3820 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3821 let batch_reader = builder
3822 .with_projection(mask)
3823 .with_batch_size(2)
3824 .build()
3825 .unwrap();
3826
3827 let mut total_rows = 0;
3828 for maybe_batch in batch_reader {
3829 let batch = maybe_batch.unwrap();
3830 total_rows += batch.num_rows();
3831 assert_eq!(batch.num_columns(), 0);
3832 assert!(batch.num_rows() <= 2);
3833 }
3834
3835 assert_eq!(total_rows, expected_rows);
3836 }
3837
3838 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3839 let schema = Arc::new(Schema::new(vec![Field::new(
3840 "list",
3841 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3842 true,
3843 )]));
3844
3845 let mut buf = Vec::with_capacity(1024);
3846
3847 let mut writer = ArrowWriter::try_new(
3848 &mut buf,
3849 schema.clone(),
3850 Some(
3851 WriterProperties::builder()
3852 .set_max_row_group_size(row_group_size)
3853 .build(),
3854 ),
3855 )
3856 .unwrap();
3857 for _ in 0..2 {
3858 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3859 for _ in 0..(batch_size) {
3860 list_builder.append(true);
3861 }
3862 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3863 .unwrap();
3864 writer.write(&batch).unwrap();
3865 }
3866 writer.close().unwrap();
3867
3868 let mut record_reader =
3869 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3870 assert_eq!(
3871 batch_size,
3872 record_reader.next().unwrap().unwrap().num_rows()
3873 );
3874 assert_eq!(
3875 batch_size,
3876 record_reader.next().unwrap().unwrap().num_rows()
3877 );
3878 }
3879
3880 #[test]
3881 fn test_row_group_exact_multiple() {
3882 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3883 test_row_group_batch(8, 8);
3884 test_row_group_batch(10, 8);
3885 test_row_group_batch(8, 10);
3886 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3887 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3888 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3889 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3890 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3891 }
3892
3893 fn get_expected_batches(
3896 column: &RecordBatch,
3897 selection: &RowSelection,
3898 batch_size: usize,
3899 ) -> Vec<RecordBatch> {
3900 let mut expected_batches = vec![];
3901
3902 let mut selection: VecDeque<_> = selection.clone().into();
3903 let mut row_offset = 0;
3904 let mut last_start = None;
3905 while row_offset < column.num_rows() && !selection.is_empty() {
3906 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3907 while batch_remaining > 0 && !selection.is_empty() {
3908 let (to_read, skip) = match selection.front_mut() {
3909 Some(selection) if selection.row_count > batch_remaining => {
3910 selection.row_count -= batch_remaining;
3911 (batch_remaining, selection.skip)
3912 }
3913 Some(_) => {
3914 let select = selection.pop_front().unwrap();
3915 (select.row_count, select.skip)
3916 }
3917 None => break,
3918 };
3919
3920 batch_remaining -= to_read;
3921
3922 match skip {
3923 true => {
3924 if let Some(last_start) = last_start.take() {
3925 expected_batches.push(column.slice(last_start, row_offset - last_start))
3926 }
3927 row_offset += to_read
3928 }
3929 false => {
3930 last_start.get_or_insert(row_offset);
3931 row_offset += to_read
3932 }
3933 }
3934 }
3935 }
3936
3937 if let Some(last_start) = last_start.take() {
3938 expected_batches.push(column.slice(last_start, row_offset - last_start))
3939 }
3940
3941 for batch in &expected_batches[..expected_batches.len() - 1] {
3943 assert_eq!(batch.num_rows(), batch_size);
3944 }
3945
3946 expected_batches
3947 }
3948
3949 fn create_test_selection(
3950 step_len: usize,
3951 total_len: usize,
3952 skip_first: bool,
3953 ) -> (RowSelection, usize) {
3954 let mut remaining = total_len;
3955 let mut skip = skip_first;
3956 let mut vec = vec![];
3957 let mut selected_count = 0;
3958 while remaining != 0 {
3959 let step = if remaining > step_len {
3960 step_len
3961 } else {
3962 remaining
3963 };
3964 vec.push(RowSelector {
3965 row_count: step,
3966 skip,
3967 });
3968 remaining -= step;
3969 if !skip {
3970 selected_count += step;
3971 }
3972 skip = !skip;
3973 }
3974 (vec.into(), selected_count)
3975 }
3976
3977 #[test]
3978 fn test_scan_row_with_selection() {
3979 let testdata = arrow::util::test_util::parquet_test_data();
3980 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3981 let test_file = File::open(&path).unwrap();
3982
3983 let mut serial_reader =
3984 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3985 let data = serial_reader.next().unwrap().unwrap();
3986
3987 let do_test = |batch_size: usize, selection_len: usize| {
3988 for skip_first in [false, true] {
3989 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3990
3991 let expected = get_expected_batches(&data, &selections, batch_size);
3992 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3993 assert_eq!(
3994 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3995 expected,
3996 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3997 );
3998 }
3999 };
4000
4001 do_test(1000, 1000);
4004
4005 do_test(20, 20);
4007
4008 do_test(20, 5);
4010
4011 do_test(20, 5);
4014
4015 fn create_skip_reader(
4016 test_file: &File,
4017 batch_size: usize,
4018 selections: RowSelection,
4019 ) -> ParquetRecordBatchReader {
4020 let options = ArrowReaderOptions::new().with_page_index(true);
4021 let file = test_file.try_clone().unwrap();
4022 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4023 .unwrap()
4024 .with_batch_size(batch_size)
4025 .with_row_selection(selections)
4026 .build()
4027 .unwrap()
4028 }
4029 }
4030
4031 #[test]
4032 fn test_batch_size_overallocate() {
4033 let testdata = arrow::util::test_util::parquet_test_data();
4034 let path = format!("{testdata}/alltypes_plain.parquet");
4036 let test_file = File::open(path).unwrap();
4037
4038 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4039 let num_rows = builder.metadata.file_metadata().num_rows();
4040 let reader = builder
4041 .with_batch_size(1024)
4042 .with_projection(ProjectionMask::all())
4043 .build()
4044 .unwrap();
4045 assert_ne!(1024, num_rows);
4046 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4047 }
4048
4049 #[test]
4050 fn test_read_with_page_index_enabled() {
4051 let testdata = arrow::util::test_util::parquet_test_data();
4052
4053 {
4054 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4056 let test_file = File::open(path).unwrap();
4057 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4058 test_file,
4059 ArrowReaderOptions::new().with_page_index(true),
4060 )
4061 .unwrap();
4062 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4063 let reader = builder.build().unwrap();
4064 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4065 assert_eq!(batches.len(), 8);
4066 }
4067
4068 {
4069 let path = format!("{testdata}/alltypes_plain.parquet");
4071 let test_file = File::open(path).unwrap();
4072 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4073 test_file,
4074 ArrowReaderOptions::new().with_page_index(true),
4075 )
4076 .unwrap();
4077 assert!(builder.metadata().offset_index().is_none());
4080 let reader = builder.build().unwrap();
4081 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4082 assert_eq!(batches.len(), 1);
4083 }
4084 }
4085
4086 #[test]
4087 fn test_raw_repetition() {
4088 const MESSAGE_TYPE: &str = "
4089 message Log {
4090 OPTIONAL INT32 eventType;
4091 REPEATED INT32 category;
4092 REPEATED group filter {
4093 OPTIONAL INT32 error;
4094 }
4095 }
4096 ";
4097 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4098 let props = Default::default();
4099
4100 let mut buf = Vec::with_capacity(1024);
4101 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4102 let mut row_group_writer = writer.next_row_group().unwrap();
4103
4104 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4106 col_writer
4107 .typed::<Int32Type>()
4108 .write_batch(&[1], Some(&[1]), None)
4109 .unwrap();
4110 col_writer.close().unwrap();
4111 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4113 col_writer
4114 .typed::<Int32Type>()
4115 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4116 .unwrap();
4117 col_writer.close().unwrap();
4118 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4120 col_writer
4121 .typed::<Int32Type>()
4122 .write_batch(&[1], Some(&[1]), Some(&[0]))
4123 .unwrap();
4124 col_writer.close().unwrap();
4125
4126 let rg_md = row_group_writer.close().unwrap();
4127 assert_eq!(rg_md.num_rows(), 1);
4128 writer.close().unwrap();
4129
4130 let bytes = Bytes::from(buf);
4131
4132 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4133 let full = no_mask.next().unwrap().unwrap();
4134
4135 assert_eq!(full.num_columns(), 3);
4136
4137 for idx in 0..3 {
4138 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4139 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4140 let mut reader = b.with_projection(mask).build().unwrap();
4141 let projected = reader.next().unwrap().unwrap();
4142
4143 assert_eq!(projected.num_columns(), 1);
4144 assert_eq!(full.column(idx), projected.column(0));
4145 }
4146 }
4147
4148 #[test]
4149 fn test_read_lz4_raw() {
4150 let testdata = arrow::util::test_util::parquet_test_data();
4151 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4152 let file = File::open(path).unwrap();
4153
4154 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4155 .unwrap()
4156 .collect::<Result<Vec<_>, _>>()
4157 .unwrap();
4158 assert_eq!(batches.len(), 1);
4159 let batch = &batches[0];
4160
4161 assert_eq!(batch.num_columns(), 3);
4162 assert_eq!(batch.num_rows(), 4);
4163
4164 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4166 assert_eq!(
4167 a.values(),
4168 &[1593604800, 1593604800, 1593604801, 1593604801]
4169 );
4170
4171 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4172 let a: Vec<_> = a.iter().flatten().collect();
4173 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4174
4175 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4176 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4177 }
4178
4179 #[test]
4189 fn test_read_lz4_hadoop_fallback() {
4190 for file in [
4191 "hadoop_lz4_compressed.parquet",
4192 "non_hadoop_lz4_compressed.parquet",
4193 ] {
4194 let testdata = arrow::util::test_util::parquet_test_data();
4195 let path = format!("{testdata}/{file}");
4196 let file = File::open(path).unwrap();
4197 let expected_rows = 4;
4198
4199 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4200 .unwrap()
4201 .collect::<Result<Vec<_>, _>>()
4202 .unwrap();
4203 assert_eq!(batches.len(), 1);
4204 let batch = &batches[0];
4205
4206 assert_eq!(batch.num_columns(), 3);
4207 assert_eq!(batch.num_rows(), expected_rows);
4208
4209 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4210 assert_eq!(
4211 a.values(),
4212 &[1593604800, 1593604800, 1593604801, 1593604801]
4213 );
4214
4215 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4216 let b: Vec<_> = b.iter().flatten().collect();
4217 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4218
4219 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4220 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4221 }
4222 }
4223
4224 #[test]
4225 fn test_read_lz4_hadoop_large() {
4226 let testdata = arrow::util::test_util::parquet_test_data();
4227 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4228 let file = File::open(path).unwrap();
4229 let expected_rows = 10000;
4230
4231 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4232 .unwrap()
4233 .collect::<Result<Vec<_>, _>>()
4234 .unwrap();
4235 assert_eq!(batches.len(), 1);
4236 let batch = &batches[0];
4237
4238 assert_eq!(batch.num_columns(), 1);
4239 assert_eq!(batch.num_rows(), expected_rows);
4240
4241 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4242 let a: Vec<_> = a.iter().flatten().collect();
4243 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4244 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4245 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4246 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4247 }
4248
4249 #[test]
4250 #[cfg(feature = "snap")]
4251 fn test_read_nested_lists() {
4252 let testdata = arrow::util::test_util::parquet_test_data();
4253 let path = format!("{testdata}/nested_lists.snappy.parquet");
4254 let file = File::open(path).unwrap();
4255
4256 let f = file.try_clone().unwrap();
4257 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4258 let expected = reader.next().unwrap().unwrap();
4259 assert_eq!(expected.num_rows(), 3);
4260
4261 let selection = RowSelection::from(vec![
4262 RowSelector::skip(1),
4263 RowSelector::select(1),
4264 RowSelector::skip(1),
4265 ]);
4266 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4267 .unwrap()
4268 .with_row_selection(selection)
4269 .build()
4270 .unwrap();
4271
4272 let actual = reader.next().unwrap().unwrap();
4273 assert_eq!(actual.num_rows(), 1);
4274 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4275 }
4276
4277 #[test]
4278 fn test_arbitrary_decimal() {
4279 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4280 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4281 .with_precision_and_scale(19, 0)
4282 .unwrap();
4283 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4284 .with_precision_and_scale(12, 0)
4285 .unwrap();
4286 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4287 .with_precision_and_scale(17, 10)
4288 .unwrap();
4289
4290 let written = RecordBatch::try_from_iter([
4291 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4292 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4293 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4294 ])
4295 .unwrap();
4296
4297 let mut buffer = Vec::with_capacity(1024);
4298 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4299 writer.write(&written).unwrap();
4300 writer.close().unwrap();
4301
4302 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4303 .unwrap()
4304 .collect::<Result<Vec<_>, _>>()
4305 .unwrap();
4306
4307 assert_eq!(&written.slice(0, 8), &read[0]);
4308 }
4309
4310 #[test]
4311 fn test_list_skip() {
4312 let mut list = ListBuilder::new(Int32Builder::new());
4313 list.append_value([Some(1), Some(2)]);
4314 list.append_value([Some(3)]);
4315 list.append_value([Some(4)]);
4316 let list = list.finish();
4317 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4318
4319 let props = WriterProperties::builder()
4321 .set_data_page_row_count_limit(1)
4322 .set_write_batch_size(2)
4323 .build();
4324
4325 let mut buffer = Vec::with_capacity(1024);
4326 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4327 writer.write(&batch).unwrap();
4328 writer.close().unwrap();
4329
4330 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4331 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4332 .unwrap()
4333 .with_row_selection(selection.into())
4334 .build()
4335 .unwrap();
4336 let out = reader.next().unwrap().unwrap();
4337 assert_eq!(out.num_rows(), 1);
4338 assert_eq!(out, batch.slice(2, 1));
4339 }
4340
4341 fn test_decimal_roundtrip<T: DecimalType>() {
4342 let d = |values: Vec<usize>, p: u8| {
4347 let iter = values.into_iter().map(T::Native::usize_as);
4348 PrimitiveArray::<T>::from_iter_values(iter)
4349 .with_precision_and_scale(p, 2)
4350 .unwrap()
4351 };
4352
4353 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4354 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4355 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4356 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4357
4358 let batch = RecordBatch::try_from_iter([
4359 ("d1", Arc::new(d1) as ArrayRef),
4360 ("d2", Arc::new(d2) as ArrayRef),
4361 ("d3", Arc::new(d3) as ArrayRef),
4362 ("d4", Arc::new(d4) as ArrayRef),
4363 ])
4364 .unwrap();
4365
4366 let mut buffer = Vec::with_capacity(1024);
4367 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4368 writer.write(&batch).unwrap();
4369 writer.close().unwrap();
4370
4371 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4372 let t1 = builder.parquet_schema().columns()[0].physical_type();
4373 assert_eq!(t1, PhysicalType::INT32);
4374 let t2 = builder.parquet_schema().columns()[1].physical_type();
4375 assert_eq!(t2, PhysicalType::INT64);
4376 let t3 = builder.parquet_schema().columns()[2].physical_type();
4377 assert_eq!(t3, PhysicalType::INT64);
4378 let t4 = builder.parquet_schema().columns()[3].physical_type();
4379 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4380
4381 let mut reader = builder.build().unwrap();
4382 assert_eq!(batch.schema(), reader.schema());
4383
4384 let out = reader.next().unwrap().unwrap();
4385 assert_eq!(batch, out);
4386 }
4387
4388 #[test]
4389 fn test_decimal() {
4390 test_decimal_roundtrip::<Decimal128Type>();
4391 test_decimal_roundtrip::<Decimal256Type>();
4392 }
4393
4394 #[test]
4395 fn test_list_selection() {
4396 let schema = Arc::new(Schema::new(vec![Field::new_list(
4397 "list",
4398 Field::new_list_field(ArrowDataType::Utf8, true),
4399 false,
4400 )]));
4401 let mut buf = Vec::with_capacity(1024);
4402
4403 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4404
4405 for i in 0..2 {
4406 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4407 for j in 0..1024 {
4408 list_a_builder.values().append_value(format!("{i} {j}"));
4409 list_a_builder.append(true);
4410 }
4411 let batch =
4412 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4413 .unwrap();
4414 writer.write(&batch).unwrap();
4415 }
4416 let _metadata = writer.close().unwrap();
4417
4418 let buf = Bytes::from(buf);
4419 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4420 .unwrap()
4421 .with_row_selection(RowSelection::from(vec![
4422 RowSelector::skip(100),
4423 RowSelector::select(924),
4424 RowSelector::skip(100),
4425 RowSelector::select(924),
4426 ]))
4427 .build()
4428 .unwrap();
4429
4430 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4431 let batch = concat_batches(&schema, &batches).unwrap();
4432
4433 assert_eq!(batch.num_rows(), 924 * 2);
4434 let list = batch.column(0).as_list::<i32>();
4435
4436 for w in list.value_offsets().windows(2) {
4437 assert_eq!(w[0] + 1, w[1])
4438 }
4439 let mut values = list.values().as_string::<i32>().iter();
4440
4441 for i in 0..2 {
4442 for j in 100..1024 {
4443 let expected = format!("{i} {j}");
4444 assert_eq!(values.next().unwrap().unwrap(), &expected);
4445 }
4446 }
4447 }
4448
4449 #[test]
4450 fn test_list_selection_fuzz() {
4451 let mut rng = rng();
4452 let schema = Arc::new(Schema::new(vec![Field::new_list(
4453 "list",
4454 Field::new_list(
4455 Field::LIST_FIELD_DEFAULT_NAME,
4456 Field::new_list_field(ArrowDataType::Int32, true),
4457 true,
4458 ),
4459 true,
4460 )]));
4461 let mut buf = Vec::with_capacity(1024);
4462 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4463
4464 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4465
4466 for _ in 0..2048 {
4467 if rng.random_bool(0.2) {
4468 list_a_builder.append(false);
4469 continue;
4470 }
4471
4472 let list_a_len = rng.random_range(0..10);
4473 let list_b_builder = list_a_builder.values();
4474
4475 for _ in 0..list_a_len {
4476 if rng.random_bool(0.2) {
4477 list_b_builder.append(false);
4478 continue;
4479 }
4480
4481 let list_b_len = rng.random_range(0..10);
4482 let int_builder = list_b_builder.values();
4483 for _ in 0..list_b_len {
4484 match rng.random_bool(0.2) {
4485 true => int_builder.append_null(),
4486 false => int_builder.append_value(rng.random()),
4487 }
4488 }
4489 list_b_builder.append(true)
4490 }
4491 list_a_builder.append(true);
4492 }
4493
4494 let array = Arc::new(list_a_builder.finish());
4495 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4496
4497 writer.write(&batch).unwrap();
4498 let _metadata = writer.close().unwrap();
4499
4500 let buf = Bytes::from(buf);
4501
4502 let cases = [
4503 vec![
4504 RowSelector::skip(100),
4505 RowSelector::select(924),
4506 RowSelector::skip(100),
4507 RowSelector::select(924),
4508 ],
4509 vec![
4510 RowSelector::select(924),
4511 RowSelector::skip(100),
4512 RowSelector::select(924),
4513 RowSelector::skip(100),
4514 ],
4515 vec![
4516 RowSelector::skip(1023),
4517 RowSelector::select(1),
4518 RowSelector::skip(1023),
4519 RowSelector::select(1),
4520 ],
4521 vec![
4522 RowSelector::select(1),
4523 RowSelector::skip(1023),
4524 RowSelector::select(1),
4525 RowSelector::skip(1023),
4526 ],
4527 ];
4528
4529 for batch_size in [100, 1024, 2048] {
4530 for selection in &cases {
4531 let selection = RowSelection::from(selection.clone());
4532 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4533 .unwrap()
4534 .with_row_selection(selection.clone())
4535 .with_batch_size(batch_size)
4536 .build()
4537 .unwrap();
4538
4539 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4540 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4541 assert_eq!(actual.num_rows(), selection.row_count());
4542
4543 let mut batch_offset = 0;
4544 let mut actual_offset = 0;
4545 for selector in selection.iter() {
4546 if selector.skip {
4547 batch_offset += selector.row_count;
4548 continue;
4549 }
4550
4551 assert_eq!(
4552 batch.slice(batch_offset, selector.row_count),
4553 actual.slice(actual_offset, selector.row_count)
4554 );
4555
4556 batch_offset += selector.row_count;
4557 actual_offset += selector.row_count;
4558 }
4559 }
4560 }
4561 }
4562
4563 #[test]
4564 fn test_read_old_nested_list() {
4565 use arrow::datatypes::DataType;
4566 use arrow::datatypes::ToByteSlice;
4567
4568 let testdata = arrow::util::test_util::parquet_test_data();
4569 let path = format!("{testdata}/old_list_structure.parquet");
4578 let test_file = File::open(path).unwrap();
4579
4580 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4582
4583 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4585
4586 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4588 "array",
4589 DataType::Int32,
4590 false,
4591 ))))
4592 .len(2)
4593 .add_buffer(a_value_offsets)
4594 .add_child_data(a_values.into_data())
4595 .build()
4596 .unwrap();
4597 let a = ListArray::from(a_list_data);
4598
4599 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4600 let mut reader = builder.build().unwrap();
4601 let out = reader.next().unwrap().unwrap();
4602 assert_eq!(out.num_rows(), 1);
4603 assert_eq!(out.num_columns(), 1);
4604 let c0 = out.column(0);
4606 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4607 let r0 = c0arr.value(0);
4609 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4610 assert_eq!(r0arr, &a);
4611 }
4612
4613 #[test]
4614 fn test_map_no_value() {
4615 let testdata = arrow::util::test_util::parquet_test_data();
4635 let path = format!("{testdata}/map_no_value.parquet");
4636 let file = File::open(path).unwrap();
4637
4638 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4639 .unwrap()
4640 .build()
4641 .unwrap();
4642 let out = reader.next().unwrap().unwrap();
4643 assert_eq!(out.num_rows(), 3);
4644 assert_eq!(out.num_columns(), 3);
4645 let c0 = out.column(1).as_list::<i32>();
4647 let c1 = out.column(2).as_list::<i32>();
4648 assert_eq!(c0.len(), c1.len());
4649 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4650 }
4651}