1use arrow_array::Array;
21use arrow_array::cast::AsArray;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields};
32use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
33use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
34use crate::bloom_filter::{
35 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
36};
37use crate::column::page::{PageIterator, PageReader};
38#[cfg(feature = "encryption")]
39use crate::encryption::decrypt::FileDecryptionProperties;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
43};
44use crate::file::reader::{ChunkReader, SerializedPageReader};
45use crate::schema::types::SchemaDescriptor;
46
47use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
48pub use read_plan::{ReadPlan, ReadPlanBuilder};
49
50mod filter;
51pub mod metrics;
52mod read_plan;
53mod selection;
54pub mod statistics;
55
56pub struct ArrowReaderBuilder<T> {
104 pub(crate) input: T,
112
113 pub(crate) metadata: Arc<ParquetMetaData>,
114
115 pub(crate) schema: SchemaRef,
116
117 pub(crate) fields: Option<Arc<ParquetField>>,
118
119 pub(crate) batch_size: usize,
120
121 pub(crate) row_groups: Option<Vec<usize>>,
122
123 pub(crate) projection: ProjectionMask,
124
125 pub(crate) filter: Option<RowFilter>,
126
127 pub(crate) selection: Option<RowSelection>,
128
129 pub(crate) limit: Option<usize>,
130
131 pub(crate) offset: Option<usize>,
132
133 pub(crate) metrics: ArrowReaderMetrics,
134
135 pub(crate) max_predicate_cache_size: usize,
136}
137
138impl<T: Debug> Debug for ArrowReaderBuilder<T> {
139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140 f.debug_struct("ArrowReaderBuilder<T>")
141 .field("input", &self.input)
142 .field("metadata", &self.metadata)
143 .field("schema", &self.schema)
144 .field("fields", &self.fields)
145 .field("batch_size", &self.batch_size)
146 .field("row_groups", &self.row_groups)
147 .field("projection", &self.projection)
148 .field("filter", &self.filter)
149 .field("selection", &self.selection)
150 .field("limit", &self.limit)
151 .field("offset", &self.offset)
152 .field("metrics", &self.metrics)
153 .finish()
154 }
155}
156
157impl<T> ArrowReaderBuilder<T> {
158 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
159 Self {
160 input,
161 metadata: metadata.metadata,
162 schema: metadata.schema,
163 fields: metadata.fields,
164 batch_size: 1024,
165 row_groups: None,
166 projection: ProjectionMask::all(),
167 filter: None,
168 selection: None,
169 limit: None,
170 offset: None,
171 metrics: ArrowReaderMetrics::Disabled,
172 max_predicate_cache_size: 100 * 1024 * 1024, }
174 }
175
176 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
178 &self.metadata
179 }
180
181 pub fn parquet_schema(&self) -> &SchemaDescriptor {
183 self.metadata.file_metadata().schema_descr()
184 }
185
186 pub fn schema(&self) -> &SchemaRef {
188 &self.schema
189 }
190
191 pub fn with_batch_size(self, batch_size: usize) -> Self {
194 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
196 Self { batch_size, ..self }
197 }
198
199 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
203 Self {
204 row_groups: Some(row_groups),
205 ..self
206 }
207 }
208
209 pub fn with_projection(self, mask: ProjectionMask) -> Self {
211 Self {
212 projection: mask,
213 ..self
214 }
215 }
216
217 pub fn with_row_selection(self, selection: RowSelection) -> Self {
277 Self {
278 selection: Some(selection),
279 ..self
280 }
281 }
282
283 pub fn with_row_filter(self, filter: RowFilter) -> Self {
290 Self {
291 filter: Some(filter),
292 ..self
293 }
294 }
295
296 pub fn with_limit(self, limit: usize) -> Self {
304 Self {
305 limit: Some(limit),
306 ..self
307 }
308 }
309
310 pub fn with_offset(self, offset: usize) -> Self {
318 Self {
319 offset: Some(offset),
320 ..self
321 }
322 }
323
324 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
359 Self { metrics, ..self }
360 }
361
362 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
377 Self {
378 max_predicate_cache_size,
379 ..self
380 }
381 }
382}
383
384#[derive(Debug, Clone, Default)]
399pub struct ArrowReaderOptions {
400 skip_arrow_metadata: bool,
402 supplied_schema: Option<SchemaRef>,
407 pub(crate) page_index_policy: PageIndexPolicy,
409 metadata_options: ParquetMetaDataOptions,
411 #[cfg(feature = "encryption")]
413 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
414}
415
416impl ArrowReaderOptions {
417 pub fn new() -> Self {
419 Self::default()
420 }
421
422 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
429 Self {
430 skip_arrow_metadata,
431 ..self
432 }
433 }
434
435 pub fn with_schema(self, schema: SchemaRef) -> Self {
492 Self {
493 supplied_schema: Some(schema),
494 skip_arrow_metadata: true,
495 ..self
496 }
497 }
498
499 pub fn with_page_index(self, page_index: bool) -> Self {
512 let page_index_policy = PageIndexPolicy::from(page_index);
513
514 Self {
515 page_index_policy,
516 ..self
517 }
518 }
519
520 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
524 Self {
525 page_index_policy: policy,
526 ..self
527 }
528 }
529
530 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
536 self.metadata_options.set_schema(schema);
537 self
538 }
539
540 #[cfg(feature = "encryption")]
544 pub fn with_file_decryption_properties(
545 self,
546 file_decryption_properties: Arc<FileDecryptionProperties>,
547 ) -> Self {
548 Self {
549 file_decryption_properties: Some(file_decryption_properties),
550 ..self
551 }
552 }
553
554 pub fn page_index(&self) -> bool {
558 self.page_index_policy != PageIndexPolicy::Skip
559 }
560
561 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
563 &self.metadata_options
564 }
565
566 #[cfg(feature = "encryption")]
571 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
572 self.file_decryption_properties.as_ref()
573 }
574}
575
576#[derive(Debug, Clone)]
591pub struct ArrowReaderMetadata {
592 pub(crate) metadata: Arc<ParquetMetaData>,
594 pub(crate) schema: SchemaRef,
596 pub(crate) fields: Option<Arc<ParquetField>>,
598}
599
600impl ArrowReaderMetadata {
601 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
613 let metadata = ParquetMetaDataReader::new()
614 .with_page_index_policy(options.page_index_policy)
615 .with_metadata_options(Some(options.metadata_options.clone()));
616 #[cfg(feature = "encryption")]
617 let metadata = metadata.with_decryption_properties(
618 options.file_decryption_properties.as_ref().map(Arc::clone),
619 );
620 let metadata = metadata.parse_and_finish(reader)?;
621 Self::try_new(Arc::new(metadata), options)
622 }
623
624 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
632 match options.supplied_schema {
633 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
634 None => {
635 let kv_metadata = match options.skip_arrow_metadata {
636 true => None,
637 false => metadata.file_metadata().key_value_metadata(),
638 };
639
640 let (schema, fields) = parquet_to_arrow_schema_and_fields(
641 metadata.file_metadata().schema_descr(),
642 ProjectionMask::all(),
643 kv_metadata,
644 )?;
645
646 Ok(Self {
647 metadata,
648 schema: Arc::new(schema),
649 fields: fields.map(Arc::new),
650 })
651 }
652 }
653 }
654
655 fn with_supplied_schema(
656 metadata: Arc<ParquetMetaData>,
657 supplied_schema: SchemaRef,
658 ) -> Result<Self> {
659 let parquet_schema = metadata.file_metadata().schema_descr();
660 let field_levels = parquet_to_arrow_field_levels(
661 parquet_schema,
662 ProjectionMask::all(),
663 Some(supplied_schema.fields()),
664 )?;
665 let fields = field_levels.fields;
666 let inferred_len = fields.len();
667 let supplied_len = supplied_schema.fields().len();
668 if inferred_len != supplied_len {
672 return Err(arrow_err!(format!(
673 "Incompatible supplied Arrow schema: expected {} columns received {}",
674 inferred_len, supplied_len
675 )));
676 }
677
678 let mut errors = Vec::new();
679
680 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
681
682 for (field1, field2) in field_iter {
683 if field1.data_type() != field2.data_type() {
684 errors.push(format!(
685 "data type mismatch for field {}: requested {} but found {}",
686 field1.name(),
687 field1.data_type(),
688 field2.data_type()
689 ));
690 }
691 if field1.is_nullable() != field2.is_nullable() {
692 errors.push(format!(
693 "nullability mismatch for field {}: expected {:?} but found {:?}",
694 field1.name(),
695 field1.is_nullable(),
696 field2.is_nullable()
697 ));
698 }
699 if field1.metadata() != field2.metadata() {
700 errors.push(format!(
701 "metadata mismatch for field {}: expected {:?} but found {:?}",
702 field1.name(),
703 field1.metadata(),
704 field2.metadata()
705 ));
706 }
707 }
708
709 if !errors.is_empty() {
710 let message = errors.join(", ");
711 return Err(ParquetError::ArrowError(format!(
712 "Incompatible supplied Arrow schema: {message}",
713 )));
714 }
715
716 Ok(Self {
717 metadata,
718 schema: supplied_schema,
719 fields: field_levels.levels.map(Arc::new),
720 })
721 }
722
723 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
725 &self.metadata
726 }
727
728 pub fn parquet_schema(&self) -> &SchemaDescriptor {
730 self.metadata.file_metadata().schema_descr()
731 }
732
733 pub fn schema(&self) -> &SchemaRef {
735 &self.schema
736 }
737}
738
739#[doc(hidden)]
740pub struct SyncReader<T: ChunkReader>(T);
742
743impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
744 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
745 f.debug_tuple("SyncReader").field(&self.0).finish()
746 }
747}
748
749pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
756
757impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
758 pub fn try_new(reader: T) -> Result<Self> {
787 Self::try_new_with_options(reader, Default::default())
788 }
789
790 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
795 let metadata = ArrowReaderMetadata::load(&reader, options)?;
796 Ok(Self::new_with_metadata(reader, metadata))
797 }
798
799 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
838 Self::new_builder(SyncReader(input), metadata)
839 }
840
841 pub fn get_row_group_column_bloom_filter(
847 &self,
848 row_group_idx: usize,
849 column_idx: usize,
850 ) -> Result<Option<Sbbf>> {
851 let metadata = self.metadata.row_group(row_group_idx);
852 let column_metadata = metadata.column(column_idx);
853
854 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
855 offset
856 .try_into()
857 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
858 } else {
859 return Ok(None);
860 };
861
862 let buffer = match column_metadata.bloom_filter_length() {
863 Some(length) => self.input.0.get_bytes(offset, length as usize),
864 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
865 }?;
866
867 let (header, bitset_offset) =
868 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
869
870 match header.algorithm {
871 BloomFilterAlgorithm::BLOCK => {
872 }
874 }
875 match header.compression {
876 BloomFilterCompression::UNCOMPRESSED => {
877 }
879 }
880 match header.hash {
881 BloomFilterHash::XXHASH => {
882 }
884 }
885
886 let bitset = match column_metadata.bloom_filter_length() {
887 Some(_) => buffer.slice(
888 (TryInto::<usize>::try_into(bitset_offset).unwrap()
889 - TryInto::<usize>::try_into(offset).unwrap())..,
890 ),
891 None => {
892 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
893 ParquetError::General("Bloom filter length is invalid".to_string())
894 })?;
895 self.input.0.get_bytes(bitset_offset, bitset_length)?
896 }
897 };
898 Ok(Some(Sbbf::new(&bitset)))
899 }
900
901 pub fn build(self) -> Result<ParquetRecordBatchReader> {
905 let Self {
906 input,
907 metadata,
908 schema: _,
909 fields,
910 batch_size: _,
911 row_groups,
912 projection,
913 mut filter,
914 selection,
915 limit,
916 offset,
917 metrics,
918 max_predicate_cache_size: _,
920 } = self;
921
922 let batch_size = self
924 .batch_size
925 .min(metadata.file_metadata().num_rows() as usize);
926
927 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
928
929 let reader = ReaderRowGroups {
930 reader: Arc::new(input.0),
931 metadata,
932 row_groups,
933 };
934
935 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
936
937 if let Some(filter) = filter.as_mut() {
939 for predicate in filter.predicates.iter_mut() {
940 if !plan_builder.selects_any() {
942 break;
943 }
944
945 let mut cache_projection = predicate.projection().clone();
946 cache_projection.intersect(&projection);
947
948 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
949 .build_array_reader(fields.as_deref(), predicate.projection())?;
950
951 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
952 }
953 }
954
955 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
956 .build_array_reader(fields.as_deref(), &projection)?;
957
958 let read_plan = plan_builder
959 .limited(reader.num_rows())
960 .with_offset(offset)
961 .with_limit(limit)
962 .build_limited()
963 .build();
964
965 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
966 }
967}
968
969struct ReaderRowGroups<T: ChunkReader> {
970 reader: Arc<T>,
971
972 metadata: Arc<ParquetMetaData>,
973 row_groups: Vec<usize>,
975}
976
977impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
978 fn num_rows(&self) -> usize {
979 let meta = self.metadata.row_groups();
980 self.row_groups
981 .iter()
982 .map(|x| meta[*x].num_rows() as usize)
983 .sum()
984 }
985
986 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
987 Ok(Box::new(ReaderPageIterator {
988 column_idx: i,
989 reader: self.reader.clone(),
990 metadata: self.metadata.clone(),
991 row_groups: self.row_groups.clone().into_iter(),
992 }))
993 }
994}
995
996struct ReaderPageIterator<T: ChunkReader> {
997 reader: Arc<T>,
998 column_idx: usize,
999 row_groups: std::vec::IntoIter<usize>,
1000 metadata: Arc<ParquetMetaData>,
1001}
1002
1003impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1004 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1006 let rg = self.metadata.row_group(rg_idx);
1007 let column_chunk_metadata = rg.column(self.column_idx);
1008 let offset_index = self.metadata.offset_index();
1009 let page_locations = offset_index
1012 .filter(|i| !i[rg_idx].is_empty())
1013 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1014 let total_rows = rg.num_rows() as usize;
1015 let reader = self.reader.clone();
1016
1017 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1018 .add_crypto_context(
1019 rg_idx,
1020 self.column_idx,
1021 self.metadata.as_ref(),
1022 column_chunk_metadata,
1023 )
1024 }
1025}
1026
1027impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1028 type Item = Result<Box<dyn PageReader>>;
1029
1030 fn next(&mut self) -> Option<Self::Item> {
1031 let rg_idx = self.row_groups.next()?;
1032 let page_reader = self
1033 .next_page_reader(rg_idx)
1034 .map(|page_reader| Box::new(page_reader) as _);
1035 Some(page_reader)
1036 }
1037}
1038
1039impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1040
1041pub struct ParquetRecordBatchReader {
1052 array_reader: Box<dyn ArrayReader>,
1053 schema: SchemaRef,
1054 read_plan: ReadPlan,
1055}
1056
1057impl Debug for ParquetRecordBatchReader {
1058 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1059 f.debug_struct("ParquetRecordBatchReader")
1060 .field("array_reader", &"...")
1061 .field("schema", &self.schema)
1062 .field("read_plan", &self.read_plan)
1063 .finish()
1064 }
1065}
1066
1067impl Iterator for ParquetRecordBatchReader {
1068 type Item = Result<RecordBatch, ArrowError>;
1069
1070 fn next(&mut self) -> Option<Self::Item> {
1071 self.next_inner()
1072 .map_err(|arrow_err| arrow_err.into())
1073 .transpose()
1074 }
1075}
1076
1077impl ParquetRecordBatchReader {
1078 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1084 let mut read_records = 0;
1085 let batch_size = self.batch_size();
1086 match self.read_plan.selection_mut() {
1087 Some(selection) => {
1088 while read_records < batch_size && !selection.is_empty() {
1089 let front = selection.pop_front().unwrap();
1090 if front.skip {
1091 let skipped = self.array_reader.skip_records(front.row_count)?;
1092
1093 if skipped != front.row_count {
1094 return Err(general_err!(
1095 "failed to skip rows, expected {}, got {}",
1096 front.row_count,
1097 skipped
1098 ));
1099 }
1100 continue;
1101 }
1102
1103 if front.row_count == 0 {
1106 continue;
1107 }
1108
1109 let need_read = batch_size - read_records;
1111 let to_read = match front.row_count.checked_sub(need_read) {
1112 Some(remaining) if remaining != 0 => {
1113 selection.push_front(RowSelector::select(remaining));
1116 need_read
1117 }
1118 _ => front.row_count,
1119 };
1120 match self.array_reader.read_records(to_read)? {
1121 0 => break,
1122 rec => read_records += rec,
1123 };
1124 }
1125 }
1126 None => {
1127 self.array_reader.read_records(batch_size)?;
1128 }
1129 };
1130
1131 let array = self.array_reader.consume_batch()?;
1132 let struct_array = array.as_struct_opt().ok_or_else(|| {
1133 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1134 })?;
1135
1136 Ok(if struct_array.len() > 0 {
1137 Some(RecordBatch::from(struct_array))
1138 } else {
1139 None
1140 })
1141 }
1142}
1143
1144impl RecordBatchReader for ParquetRecordBatchReader {
1145 fn schema(&self) -> SchemaRef {
1150 self.schema.clone()
1151 }
1152}
1153
1154impl ParquetRecordBatchReader {
1155 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1159 ParquetRecordBatchReaderBuilder::try_new(reader)?
1160 .with_batch_size(batch_size)
1161 .build()
1162 }
1163
1164 pub fn try_new_with_row_groups(
1169 levels: &FieldLevels,
1170 row_groups: &dyn RowGroups,
1171 batch_size: usize,
1172 selection: Option<RowSelection>,
1173 ) -> Result<Self> {
1174 let metrics = ArrowReaderMetrics::disabled();
1176 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1177 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1178
1179 let read_plan = ReadPlanBuilder::new(batch_size)
1180 .with_selection(selection)
1181 .build();
1182
1183 Ok(Self {
1184 array_reader,
1185 schema: Arc::new(Schema::new(levels.fields.clone())),
1186 read_plan,
1187 })
1188 }
1189
1190 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1194 let schema = match array_reader.get_data_type() {
1195 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1196 _ => unreachable!("Struct array reader's data type is not struct!"),
1197 };
1198
1199 Self {
1200 array_reader,
1201 schema: Arc::new(schema),
1202 read_plan,
1203 }
1204 }
1205
1206 #[inline(always)]
1207 pub(crate) fn batch_size(&self) -> usize {
1208 self.read_plan.batch_size()
1209 }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use std::cmp::min;
1215 use std::collections::{HashMap, VecDeque};
1216 use std::fmt::Formatter;
1217 use std::fs::File;
1218 use std::io::Seek;
1219 use std::path::PathBuf;
1220 use std::sync::Arc;
1221
1222 use arrow_array::builder::*;
1223 use arrow_array::cast::AsArray;
1224 use arrow_array::types::{
1225 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1226 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1227 Time64MicrosecondType,
1228 };
1229 use arrow_array::*;
1230 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1231 use arrow_data::{ArrayData, ArrayDataBuilder};
1232 use arrow_schema::{
1233 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1234 };
1235 use arrow_select::concat::concat_batches;
1236 use bytes::Bytes;
1237 use half::f16;
1238 use num_traits::PrimInt;
1239 use rand::{Rng, RngCore, rng};
1240 use tempfile::tempfile;
1241
1242 use crate::arrow::arrow_reader::{
1243 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1244 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1245 };
1246 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1247 use crate::arrow::{ArrowWriter, ProjectionMask};
1248 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1249 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1250 use crate::data_type::{
1251 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1252 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1253 };
1254 use crate::errors::Result;
1255 use crate::file::metadata::ParquetMetaData;
1256 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1257 use crate::file::writer::SerializedFileWriter;
1258 use crate::schema::parser::parse_message_type;
1259 use crate::schema::types::{Type, TypePtr};
1260 use crate::util::test_common::rand_gen::RandGen;
1261
1262 #[test]
1263 fn test_arrow_reader_all_columns() {
1264 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1265
1266 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1267 let original_schema = Arc::clone(builder.schema());
1268 let reader = builder.build().unwrap();
1269
1270 assert_eq!(original_schema.fields(), reader.schema().fields());
1272 }
1273
1274 #[test]
1275 fn test_reuse_schema() {
1276 let file = get_test_file("parquet/alltypes-java.parquet");
1277
1278 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1279 let expected = builder.metadata;
1280 let schema = expected.file_metadata().schema_descr_ptr();
1281
1282 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1283 let builder =
1284 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1285
1286 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1288 }
1289
1290 #[test]
1291 fn test_arrow_reader_single_column() {
1292 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1293
1294 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1295 let original_schema = Arc::clone(builder.schema());
1296
1297 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1298 let reader = builder.with_projection(mask).build().unwrap();
1299
1300 assert_eq!(1, reader.schema().fields().len());
1302 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1303 }
1304
1305 #[test]
1306 fn test_arrow_reader_single_column_by_name() {
1307 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1308
1309 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1310 let original_schema = Arc::clone(builder.schema());
1311
1312 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1313 let reader = builder.with_projection(mask).build().unwrap();
1314
1315 assert_eq!(1, reader.schema().fields().len());
1317 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1318 }
1319
1320 #[test]
1321 fn test_null_column_reader_test() {
1322 let mut file = tempfile::tempfile().unwrap();
1323
1324 let schema = "
1325 message message {
1326 OPTIONAL INT32 int32;
1327 }
1328 ";
1329 let schema = Arc::new(parse_message_type(schema).unwrap());
1330
1331 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1332 generate_single_column_file_with_data::<Int32Type>(
1333 &[vec![], vec![]],
1334 Some(&def_levels),
1335 file.try_clone().unwrap(), schema,
1337 Some(Field::new("int32", ArrowDataType::Null, true)),
1338 &Default::default(),
1339 )
1340 .unwrap();
1341
1342 file.rewind().unwrap();
1343
1344 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1345 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1346
1347 assert_eq!(batches.len(), 4);
1348 for batch in &batches[0..3] {
1349 assert_eq!(batch.num_rows(), 2);
1350 assert_eq!(batch.num_columns(), 1);
1351 assert_eq!(batch.column(0).null_count(), 2);
1352 }
1353
1354 assert_eq!(batches[3].num_rows(), 1);
1355 assert_eq!(batches[3].num_columns(), 1);
1356 assert_eq!(batches[3].column(0).null_count(), 1);
1357 }
1358
1359 #[test]
1360 fn test_primitive_single_column_reader_test() {
1361 run_single_column_reader_tests::<BoolType, _, BoolType>(
1362 2,
1363 ConvertedType::NONE,
1364 None,
1365 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1366 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1367 );
1368 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1369 2,
1370 ConvertedType::NONE,
1371 None,
1372 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1373 &[
1374 Encoding::PLAIN,
1375 Encoding::RLE_DICTIONARY,
1376 Encoding::DELTA_BINARY_PACKED,
1377 Encoding::BYTE_STREAM_SPLIT,
1378 ],
1379 );
1380 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1381 2,
1382 ConvertedType::NONE,
1383 None,
1384 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1385 &[
1386 Encoding::PLAIN,
1387 Encoding::RLE_DICTIONARY,
1388 Encoding::DELTA_BINARY_PACKED,
1389 Encoding::BYTE_STREAM_SPLIT,
1390 ],
1391 );
1392 run_single_column_reader_tests::<FloatType, _, FloatType>(
1393 2,
1394 ConvertedType::NONE,
1395 None,
1396 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1397 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1398 );
1399 }
1400
1401 #[test]
1402 fn test_unsigned_primitive_single_column_reader_test() {
1403 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1404 2,
1405 ConvertedType::UINT_32,
1406 Some(ArrowDataType::UInt32),
1407 |vals| {
1408 Arc::new(UInt32Array::from_iter(
1409 vals.iter().map(|x| x.map(|x| x as u32)),
1410 ))
1411 },
1412 &[
1413 Encoding::PLAIN,
1414 Encoding::RLE_DICTIONARY,
1415 Encoding::DELTA_BINARY_PACKED,
1416 ],
1417 );
1418 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1419 2,
1420 ConvertedType::UINT_64,
1421 Some(ArrowDataType::UInt64),
1422 |vals| {
1423 Arc::new(UInt64Array::from_iter(
1424 vals.iter().map(|x| x.map(|x| x as u64)),
1425 ))
1426 },
1427 &[
1428 Encoding::PLAIN,
1429 Encoding::RLE_DICTIONARY,
1430 Encoding::DELTA_BINARY_PACKED,
1431 ],
1432 );
1433 }
1434
1435 #[test]
1436 fn test_unsigned_roundtrip() {
1437 let schema = Arc::new(Schema::new(vec![
1438 Field::new("uint32", ArrowDataType::UInt32, true),
1439 Field::new("uint64", ArrowDataType::UInt64, true),
1440 ]));
1441
1442 let mut buf = Vec::with_capacity(1024);
1443 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1444
1445 let original = RecordBatch::try_new(
1446 schema,
1447 vec![
1448 Arc::new(UInt32Array::from_iter_values([
1449 0,
1450 i32::MAX as u32,
1451 u32::MAX,
1452 ])),
1453 Arc::new(UInt64Array::from_iter_values([
1454 0,
1455 i64::MAX as u64,
1456 u64::MAX,
1457 ])),
1458 ],
1459 )
1460 .unwrap();
1461
1462 writer.write(&original).unwrap();
1463 writer.close().unwrap();
1464
1465 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1466 let ret = reader.next().unwrap().unwrap();
1467 assert_eq!(ret, original);
1468
1469 ret.column(0)
1471 .as_any()
1472 .downcast_ref::<UInt32Array>()
1473 .unwrap();
1474
1475 ret.column(1)
1476 .as_any()
1477 .downcast_ref::<UInt64Array>()
1478 .unwrap();
1479 }
1480
1481 #[test]
1482 fn test_float16_roundtrip() -> Result<()> {
1483 let schema = Arc::new(Schema::new(vec![
1484 Field::new("float16", ArrowDataType::Float16, false),
1485 Field::new("float16-nullable", ArrowDataType::Float16, true),
1486 ]));
1487
1488 let mut buf = Vec::with_capacity(1024);
1489 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1490
1491 let original = RecordBatch::try_new(
1492 schema,
1493 vec![
1494 Arc::new(Float16Array::from_iter_values([
1495 f16::EPSILON,
1496 f16::MIN,
1497 f16::MAX,
1498 f16::NAN,
1499 f16::INFINITY,
1500 f16::NEG_INFINITY,
1501 f16::ONE,
1502 f16::NEG_ONE,
1503 f16::ZERO,
1504 f16::NEG_ZERO,
1505 f16::E,
1506 f16::PI,
1507 f16::FRAC_1_PI,
1508 ])),
1509 Arc::new(Float16Array::from(vec![
1510 None,
1511 None,
1512 None,
1513 Some(f16::NAN),
1514 Some(f16::INFINITY),
1515 Some(f16::NEG_INFINITY),
1516 None,
1517 None,
1518 None,
1519 None,
1520 None,
1521 None,
1522 Some(f16::FRAC_1_PI),
1523 ])),
1524 ],
1525 )?;
1526
1527 writer.write(&original)?;
1528 writer.close()?;
1529
1530 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1531 let ret = reader.next().unwrap()?;
1532 assert_eq!(ret, original);
1533
1534 ret.column(0).as_primitive::<Float16Type>();
1536 ret.column(1).as_primitive::<Float16Type>();
1537
1538 Ok(())
1539 }
1540
1541 #[test]
1542 fn test_time_utc_roundtrip() -> Result<()> {
1543 let schema = Arc::new(Schema::new(vec![
1544 Field::new(
1545 "time_millis",
1546 ArrowDataType::Time32(TimeUnit::Millisecond),
1547 true,
1548 )
1549 .with_metadata(HashMap::from_iter(vec![(
1550 "adjusted_to_utc".to_string(),
1551 "".to_string(),
1552 )])),
1553 Field::new(
1554 "time_micros",
1555 ArrowDataType::Time64(TimeUnit::Microsecond),
1556 true,
1557 )
1558 .with_metadata(HashMap::from_iter(vec![(
1559 "adjusted_to_utc".to_string(),
1560 "".to_string(),
1561 )])),
1562 ]));
1563
1564 let mut buf = Vec::with_capacity(1024);
1565 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1566
1567 let original = RecordBatch::try_new(
1568 schema,
1569 vec![
1570 Arc::new(Time32MillisecondArray::from(vec![
1571 Some(-1),
1572 Some(0),
1573 Some(86_399_000),
1574 Some(86_400_000),
1575 Some(86_401_000),
1576 None,
1577 ])),
1578 Arc::new(Time64MicrosecondArray::from(vec![
1579 Some(-1),
1580 Some(0),
1581 Some(86_399 * 1_000_000),
1582 Some(86_400 * 1_000_000),
1583 Some(86_401 * 1_000_000),
1584 None,
1585 ])),
1586 ],
1587 )?;
1588
1589 writer.write(&original)?;
1590 writer.close()?;
1591
1592 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1593 let ret = reader.next().unwrap()?;
1594 assert_eq!(ret, original);
1595
1596 ret.column(0).as_primitive::<Time32MillisecondType>();
1598 ret.column(1).as_primitive::<Time64MicrosecondType>();
1599
1600 Ok(())
1601 }
1602
1603 #[test]
1604 fn test_date32_roundtrip() -> Result<()> {
1605 use arrow_array::Date32Array;
1606
1607 let schema = Arc::new(Schema::new(vec![Field::new(
1608 "date32",
1609 ArrowDataType::Date32,
1610 false,
1611 )]));
1612
1613 let mut buf = Vec::with_capacity(1024);
1614
1615 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1616
1617 let original = RecordBatch::try_new(
1618 schema,
1619 vec![Arc::new(Date32Array::from(vec![
1620 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1621 ]))],
1622 )?;
1623
1624 writer.write(&original)?;
1625 writer.close()?;
1626
1627 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1628 let ret = reader.next().unwrap()?;
1629 assert_eq!(ret, original);
1630
1631 ret.column(0).as_primitive::<Date32Type>();
1633
1634 Ok(())
1635 }
1636
1637 #[test]
1638 fn test_date64_roundtrip() -> Result<()> {
1639 use arrow_array::Date64Array;
1640
1641 let schema = Arc::new(Schema::new(vec![
1642 Field::new("small-date64", ArrowDataType::Date64, false),
1643 Field::new("big-date64", ArrowDataType::Date64, false),
1644 Field::new("invalid-date64", ArrowDataType::Date64, false),
1645 ]));
1646
1647 let mut default_buf = Vec::with_capacity(1024);
1648 let mut coerce_buf = Vec::with_capacity(1024);
1649
1650 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1651
1652 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1653 let mut coerce_writer =
1654 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1655
1656 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1657
1658 let original = RecordBatch::try_new(
1659 schema,
1660 vec![
1661 Arc::new(Date64Array::from(vec![
1663 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1664 -1_000 * NUM_MILLISECONDS_IN_DAY,
1665 0,
1666 1_000 * NUM_MILLISECONDS_IN_DAY,
1667 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1668 ])),
1669 Arc::new(Date64Array::from(vec![
1671 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1672 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1673 0,
1674 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1675 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1676 ])),
1677 Arc::new(Date64Array::from(vec![
1679 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1680 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1681 1,
1682 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1683 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1684 ])),
1685 ],
1686 )?;
1687
1688 default_writer.write(&original)?;
1689 coerce_writer.write(&original)?;
1690
1691 default_writer.close()?;
1692 coerce_writer.close()?;
1693
1694 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1695 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1696
1697 let default_ret = default_reader.next().unwrap()?;
1698 let coerce_ret = coerce_reader.next().unwrap()?;
1699
1700 assert_eq!(default_ret, original);
1702
1703 assert_eq!(coerce_ret.column(0), original.column(0));
1705 assert_ne!(coerce_ret.column(1), original.column(1));
1706 assert_ne!(coerce_ret.column(2), original.column(2));
1707
1708 default_ret.column(0).as_primitive::<Date64Type>();
1710 coerce_ret.column(0).as_primitive::<Date64Type>();
1711
1712 Ok(())
1713 }
1714 struct RandFixedLenGen {}
1715
1716 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1717 fn r#gen(len: i32) -> FixedLenByteArray {
1718 let mut v = vec![0u8; len as usize];
1719 rng().fill_bytes(&mut v);
1720 ByteArray::from(v).into()
1721 }
1722 }
1723
1724 #[test]
1725 fn test_fixed_length_binary_column_reader() {
1726 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1727 20,
1728 ConvertedType::NONE,
1729 None,
1730 |vals| {
1731 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1732 for val in vals {
1733 match val {
1734 Some(b) => builder.append_value(b).unwrap(),
1735 None => builder.append_null(),
1736 }
1737 }
1738 Arc::new(builder.finish())
1739 },
1740 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1741 );
1742 }
1743
1744 #[test]
1745 fn test_interval_day_time_column_reader() {
1746 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1747 12,
1748 ConvertedType::INTERVAL,
1749 None,
1750 |vals| {
1751 Arc::new(
1752 vals.iter()
1753 .map(|x| {
1754 x.as_ref().map(|b| IntervalDayTime {
1755 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1756 milliseconds: i32::from_le_bytes(
1757 b.as_ref()[8..12].try_into().unwrap(),
1758 ),
1759 })
1760 })
1761 .collect::<IntervalDayTimeArray>(),
1762 )
1763 },
1764 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1765 );
1766 }
1767
1768 #[test]
1769 fn test_int96_single_column_reader_test() {
1770 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1771
1772 type TypeHintAndConversionFunction =
1773 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1774
1775 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1776 (None, |vals: &[Option<Int96>]| {
1778 Arc::new(TimestampNanosecondArray::from_iter(
1779 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1780 )) as ArrayRef
1781 }),
1782 (
1784 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1785 |vals: &[Option<Int96>]| {
1786 Arc::new(TimestampSecondArray::from_iter(
1787 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1788 )) as ArrayRef
1789 },
1790 ),
1791 (
1792 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1793 |vals: &[Option<Int96>]| {
1794 Arc::new(TimestampMillisecondArray::from_iter(
1795 vals.iter().map(|x| x.map(|x| x.to_millis())),
1796 )) as ArrayRef
1797 },
1798 ),
1799 (
1800 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1801 |vals: &[Option<Int96>]| {
1802 Arc::new(TimestampMicrosecondArray::from_iter(
1803 vals.iter().map(|x| x.map(|x| x.to_micros())),
1804 )) as ArrayRef
1805 },
1806 ),
1807 (
1808 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1809 |vals: &[Option<Int96>]| {
1810 Arc::new(TimestampNanosecondArray::from_iter(
1811 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1812 )) as ArrayRef
1813 },
1814 ),
1815 (
1817 Some(ArrowDataType::Timestamp(
1818 TimeUnit::Second,
1819 Some(Arc::from("-05:00")),
1820 )),
1821 |vals: &[Option<Int96>]| {
1822 Arc::new(
1823 TimestampSecondArray::from_iter(
1824 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1825 )
1826 .with_timezone("-05:00"),
1827 ) as ArrayRef
1828 },
1829 ),
1830 ];
1831
1832 resolutions.iter().for_each(|(arrow_type, converter)| {
1833 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1834 2,
1835 ConvertedType::NONE,
1836 arrow_type.clone(),
1837 converter,
1838 encodings,
1839 );
1840 })
1841 }
1842
1843 #[test]
1844 fn test_int96_from_spark_file_with_provided_schema() {
1845 use arrow_schema::DataType::Timestamp;
1849 let test_data = arrow::util::test_util::parquet_test_data();
1850 let path = format!("{test_data}/int96_from_spark.parquet");
1851 let file = File::open(path).unwrap();
1852
1853 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1854 "a",
1855 Timestamp(TimeUnit::Microsecond, None),
1856 true,
1857 )]));
1858 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1859
1860 let mut record_reader =
1861 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1862 .unwrap()
1863 .build()
1864 .unwrap();
1865
1866 let batch = record_reader.next().unwrap().unwrap();
1867 assert_eq!(batch.num_columns(), 1);
1868 let column = batch.column(0);
1869 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1870
1871 let expected = Arc::new(Int64Array::from(vec![
1872 Some(1704141296123456),
1873 Some(1704070800000000),
1874 Some(253402225200000000),
1875 Some(1735599600000000),
1876 None,
1877 Some(9089380393200000000),
1878 ]));
1879
1880 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1885 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1886
1887 assert_eq!(casted_timestamps.len(), expected.len());
1888
1889 casted_timestamps
1890 .iter()
1891 .zip(expected.iter())
1892 .for_each(|(lhs, rhs)| {
1893 assert_eq!(lhs, rhs);
1894 });
1895 }
1896
1897 #[test]
1898 fn test_int96_from_spark_file_without_provided_schema() {
1899 use arrow_schema::DataType::Timestamp;
1903 let test_data = arrow::util::test_util::parquet_test_data();
1904 let path = format!("{test_data}/int96_from_spark.parquet");
1905 let file = File::open(path).unwrap();
1906
1907 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1908 .unwrap()
1909 .build()
1910 .unwrap();
1911
1912 let batch = record_reader.next().unwrap().unwrap();
1913 assert_eq!(batch.num_columns(), 1);
1914 let column = batch.column(0);
1915 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1916
1917 let expected = Arc::new(Int64Array::from(vec![
1918 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1923 Some(-4864435138808946688), ]));
1925
1926 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1931 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1932
1933 assert_eq!(casted_timestamps.len(), expected.len());
1934
1935 casted_timestamps
1936 .iter()
1937 .zip(expected.iter())
1938 .for_each(|(lhs, rhs)| {
1939 assert_eq!(lhs, rhs);
1940 });
1941 }
1942
1943 struct RandUtf8Gen {}
1944
1945 impl RandGen<ByteArrayType> for RandUtf8Gen {
1946 fn r#gen(len: i32) -> ByteArray {
1947 Int32Type::r#gen(len).to_string().as_str().into()
1948 }
1949 }
1950
1951 #[test]
1952 fn test_utf8_single_column_reader_test() {
1953 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1954 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1955 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1956 })))
1957 }
1958
1959 let encodings = &[
1960 Encoding::PLAIN,
1961 Encoding::RLE_DICTIONARY,
1962 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1963 Encoding::DELTA_BYTE_ARRAY,
1964 ];
1965
1966 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1967 2,
1968 ConvertedType::NONE,
1969 None,
1970 |vals| {
1971 Arc::new(BinaryArray::from_iter(
1972 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1973 ))
1974 },
1975 encodings,
1976 );
1977
1978 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1979 2,
1980 ConvertedType::UTF8,
1981 None,
1982 string_converter::<i32>,
1983 encodings,
1984 );
1985
1986 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1987 2,
1988 ConvertedType::UTF8,
1989 Some(ArrowDataType::Utf8),
1990 string_converter::<i32>,
1991 encodings,
1992 );
1993
1994 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1995 2,
1996 ConvertedType::UTF8,
1997 Some(ArrowDataType::LargeUtf8),
1998 string_converter::<i64>,
1999 encodings,
2000 );
2001
2002 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2003 for key in &small_key_types {
2004 for encoding in encodings {
2005 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2006 opts.encoding = *encoding;
2007
2008 let data_type =
2009 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2010
2011 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2013 opts,
2014 2,
2015 ConvertedType::UTF8,
2016 Some(data_type.clone()),
2017 move |vals| {
2018 let vals = string_converter::<i32>(vals);
2019 arrow::compute::cast(&vals, &data_type).unwrap()
2020 },
2021 );
2022 }
2023 }
2024
2025 let key_types = [
2026 ArrowDataType::Int16,
2027 ArrowDataType::UInt16,
2028 ArrowDataType::Int32,
2029 ArrowDataType::UInt32,
2030 ArrowDataType::Int64,
2031 ArrowDataType::UInt64,
2032 ];
2033
2034 for key in &key_types {
2035 let data_type =
2036 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2037
2038 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2039 2,
2040 ConvertedType::UTF8,
2041 Some(data_type.clone()),
2042 move |vals| {
2043 let vals = string_converter::<i32>(vals);
2044 arrow::compute::cast(&vals, &data_type).unwrap()
2045 },
2046 encodings,
2047 );
2048
2049 }
2066 }
2067
2068 #[test]
2069 fn test_decimal_nullable_struct() {
2070 let decimals = Decimal256Array::from_iter_values(
2071 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2072 );
2073
2074 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2075 "decimals",
2076 decimals.data_type().clone(),
2077 false,
2078 )])))
2079 .len(8)
2080 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2081 .child_data(vec![decimals.into_data()])
2082 .build()
2083 .unwrap();
2084
2085 let written =
2086 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2087 .unwrap();
2088
2089 let mut buffer = Vec::with_capacity(1024);
2090 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2091 writer.write(&written).unwrap();
2092 writer.close().unwrap();
2093
2094 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2095 .unwrap()
2096 .collect::<Result<Vec<_>, _>>()
2097 .unwrap();
2098
2099 assert_eq!(&written.slice(0, 3), &read[0]);
2100 assert_eq!(&written.slice(3, 3), &read[1]);
2101 assert_eq!(&written.slice(6, 2), &read[2]);
2102 }
2103
2104 #[test]
2105 fn test_int32_nullable_struct() {
2106 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2107 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2108 "int32",
2109 int32.data_type().clone(),
2110 false,
2111 )])))
2112 .len(8)
2113 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2114 .child_data(vec![int32.into_data()])
2115 .build()
2116 .unwrap();
2117
2118 let written =
2119 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2120 .unwrap();
2121
2122 let mut buffer = Vec::with_capacity(1024);
2123 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2124 writer.write(&written).unwrap();
2125 writer.close().unwrap();
2126
2127 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2128 .unwrap()
2129 .collect::<Result<Vec<_>, _>>()
2130 .unwrap();
2131
2132 assert_eq!(&written.slice(0, 3), &read[0]);
2133 assert_eq!(&written.slice(3, 3), &read[1]);
2134 assert_eq!(&written.slice(6, 2), &read[2]);
2135 }
2136
2137 #[test]
2138 fn test_decimal_list() {
2139 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2140
2141 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2143 decimals.data_type().clone(),
2144 false,
2145 ))))
2146 .len(7)
2147 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2148 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2149 .child_data(vec![decimals.into_data()])
2150 .build()
2151 .unwrap();
2152
2153 let written =
2154 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2155 .unwrap();
2156
2157 let mut buffer = Vec::with_capacity(1024);
2158 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2159 writer.write(&written).unwrap();
2160 writer.close().unwrap();
2161
2162 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2163 .unwrap()
2164 .collect::<Result<Vec<_>, _>>()
2165 .unwrap();
2166
2167 assert_eq!(&written.slice(0, 3), &read[0]);
2168 assert_eq!(&written.slice(3, 3), &read[1]);
2169 assert_eq!(&written.slice(6, 1), &read[2]);
2170 }
2171
2172 #[test]
2173 fn test_read_decimal_file() {
2174 use arrow_array::Decimal128Array;
2175 let testdata = arrow::util::test_util::parquet_test_data();
2176 let file_variants = vec![
2177 ("byte_array", 4),
2178 ("fixed_length", 25),
2179 ("int32", 4),
2180 ("int64", 10),
2181 ];
2182 for (prefix, target_precision) in file_variants {
2183 let path = format!("{testdata}/{prefix}_decimal.parquet");
2184 let file = File::open(path).unwrap();
2185 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2186
2187 let batch = record_reader.next().unwrap().unwrap();
2188 assert_eq!(batch.num_rows(), 24);
2189 let col = batch
2190 .column(0)
2191 .as_any()
2192 .downcast_ref::<Decimal128Array>()
2193 .unwrap();
2194
2195 let expected = 1..25;
2196
2197 assert_eq!(col.precision(), target_precision);
2198 assert_eq!(col.scale(), 2);
2199
2200 for (i, v) in expected.enumerate() {
2201 assert_eq!(col.value(i), v * 100_i128);
2202 }
2203 }
2204 }
2205
2206 #[test]
2207 fn test_read_float16_nonzeros_file() {
2208 use arrow_array::Float16Array;
2209 let testdata = arrow::util::test_util::parquet_test_data();
2210 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2212 let file = File::open(path).unwrap();
2213 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2214
2215 let batch = record_reader.next().unwrap().unwrap();
2216 assert_eq!(batch.num_rows(), 8);
2217 let col = batch
2218 .column(0)
2219 .as_any()
2220 .downcast_ref::<Float16Array>()
2221 .unwrap();
2222
2223 let f16_two = f16::ONE + f16::ONE;
2224
2225 assert_eq!(col.null_count(), 1);
2226 assert!(col.is_null(0));
2227 assert_eq!(col.value(1), f16::ONE);
2228 assert_eq!(col.value(2), -f16_two);
2229 assert!(col.value(3).is_nan());
2230 assert_eq!(col.value(4), f16::ZERO);
2231 assert!(col.value(4).is_sign_positive());
2232 assert_eq!(col.value(5), f16::NEG_ONE);
2233 assert_eq!(col.value(6), f16::NEG_ZERO);
2234 assert!(col.value(6).is_sign_negative());
2235 assert_eq!(col.value(7), f16_two);
2236 }
2237
2238 #[test]
2239 fn test_read_float16_zeros_file() {
2240 use arrow_array::Float16Array;
2241 let testdata = arrow::util::test_util::parquet_test_data();
2242 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2244 let file = File::open(path).unwrap();
2245 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2246
2247 let batch = record_reader.next().unwrap().unwrap();
2248 assert_eq!(batch.num_rows(), 3);
2249 let col = batch
2250 .column(0)
2251 .as_any()
2252 .downcast_ref::<Float16Array>()
2253 .unwrap();
2254
2255 assert_eq!(col.null_count(), 1);
2256 assert!(col.is_null(0));
2257 assert_eq!(col.value(1), f16::ZERO);
2258 assert!(col.value(1).is_sign_positive());
2259 assert!(col.value(2).is_nan());
2260 }
2261
2262 #[test]
2263 fn test_read_float32_float64_byte_stream_split() {
2264 let path = format!(
2265 "{}/byte_stream_split.zstd.parquet",
2266 arrow::util::test_util::parquet_test_data(),
2267 );
2268 let file = File::open(path).unwrap();
2269 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2270
2271 let mut row_count = 0;
2272 for batch in record_reader {
2273 let batch = batch.unwrap();
2274 row_count += batch.num_rows();
2275 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2276 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2277
2278 for &x in f32_col.values() {
2280 assert!(x > -10.0);
2281 assert!(x < 10.0);
2282 }
2283 for &x in f64_col.values() {
2284 assert!(x > -10.0);
2285 assert!(x < 10.0);
2286 }
2287 }
2288 assert_eq!(row_count, 300);
2289 }
2290
2291 #[test]
2292 fn test_read_extended_byte_stream_split() {
2293 let path = format!(
2294 "{}/byte_stream_split_extended.gzip.parquet",
2295 arrow::util::test_util::parquet_test_data(),
2296 );
2297 let file = File::open(path).unwrap();
2298 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2299
2300 let mut row_count = 0;
2301 for batch in record_reader {
2302 let batch = batch.unwrap();
2303 row_count += batch.num_rows();
2304
2305 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2307 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2308 assert_eq!(f16_col.len(), f16_bss.len());
2309 f16_col
2310 .iter()
2311 .zip(f16_bss.iter())
2312 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2313
2314 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2316 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2317 assert_eq!(f32_col.len(), f32_bss.len());
2318 f32_col
2319 .iter()
2320 .zip(f32_bss.iter())
2321 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2322
2323 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2325 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2326 assert_eq!(f64_col.len(), f64_bss.len());
2327 f64_col
2328 .iter()
2329 .zip(f64_bss.iter())
2330 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2331
2332 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2334 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2335 assert_eq!(i32_col.len(), i32_bss.len());
2336 i32_col
2337 .iter()
2338 .zip(i32_bss.iter())
2339 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2340
2341 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2343 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2344 assert_eq!(i64_col.len(), i64_bss.len());
2345 i64_col
2346 .iter()
2347 .zip(i64_bss.iter())
2348 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2349
2350 let flba_col = batch.column(10).as_fixed_size_binary();
2352 let flba_bss = batch.column(11).as_fixed_size_binary();
2353 assert_eq!(flba_col.len(), flba_bss.len());
2354 flba_col
2355 .iter()
2356 .zip(flba_bss.iter())
2357 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2358
2359 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2361 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2362 assert_eq!(dec_col.len(), dec_bss.len());
2363 dec_col
2364 .iter()
2365 .zip(dec_bss.iter())
2366 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2367 }
2368 assert_eq!(row_count, 200);
2369 }
2370
2371 #[test]
2372 fn test_read_incorrect_map_schema_file() {
2373 let testdata = arrow::util::test_util::parquet_test_data();
2374 let path = format!("{testdata}/incorrect_map_schema.parquet");
2376 let file = File::open(path).unwrap();
2377 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2378
2379 let batch = record_reader.next().unwrap().unwrap();
2380 assert_eq!(batch.num_rows(), 1);
2381
2382 let expected_schema = Schema::new(vec![Field::new(
2383 "my_map",
2384 ArrowDataType::Map(
2385 Arc::new(Field::new(
2386 "key_value",
2387 ArrowDataType::Struct(Fields::from(vec![
2388 Field::new("key", ArrowDataType::Utf8, false),
2389 Field::new("value", ArrowDataType::Utf8, true),
2390 ])),
2391 false,
2392 )),
2393 false,
2394 ),
2395 true,
2396 )]);
2397 assert_eq!(batch.schema().as_ref(), &expected_schema);
2398
2399 assert_eq!(batch.num_rows(), 1);
2400 assert_eq!(batch.column(0).null_count(), 0);
2401 assert_eq!(
2402 batch.column(0).as_map().keys().as_ref(),
2403 &StringArray::from(vec!["parent", "name"])
2404 );
2405 assert_eq!(
2406 batch.column(0).as_map().values().as_ref(),
2407 &StringArray::from(vec!["another", "report"])
2408 );
2409 }
2410
2411 #[test]
2412 fn test_read_dict_fixed_size_binary() {
2413 let schema = Arc::new(Schema::new(vec![Field::new(
2414 "a",
2415 ArrowDataType::Dictionary(
2416 Box::new(ArrowDataType::UInt8),
2417 Box::new(ArrowDataType::FixedSizeBinary(8)),
2418 ),
2419 true,
2420 )]));
2421 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2422 let values = FixedSizeBinaryArray::try_from_iter(
2423 vec![
2424 (0u8..8u8).collect::<Vec<u8>>(),
2425 (24u8..32u8).collect::<Vec<u8>>(),
2426 ]
2427 .into_iter(),
2428 )
2429 .unwrap();
2430 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2431 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2432
2433 let mut buffer = Vec::with_capacity(1024);
2434 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2435 writer.write(&batch).unwrap();
2436 writer.close().unwrap();
2437 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2438 .unwrap()
2439 .collect::<Result<Vec<_>, _>>()
2440 .unwrap();
2441
2442 assert_eq!(read.len(), 1);
2443 assert_eq!(&batch, &read[0])
2444 }
2445
2446 #[test]
2447 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2448 let struct_fields = Fields::from(vec![
2455 Field::new(
2456 "city",
2457 ArrowDataType::Dictionary(
2458 Box::new(ArrowDataType::UInt8),
2459 Box::new(ArrowDataType::Utf8),
2460 ),
2461 true,
2462 ),
2463 Field::new("name", ArrowDataType::Utf8, true),
2464 ]);
2465 let schema = Arc::new(Schema::new(vec![Field::new(
2466 "items",
2467 ArrowDataType::Struct(struct_fields.clone()),
2468 true,
2469 )]));
2470
2471 let items_arr = StructArray::new(
2472 struct_fields,
2473 vec![
2474 Arc::new(DictionaryArray::new(
2475 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2476 Arc::new(StringArray::from_iter_values(vec![
2477 "quebec",
2478 "fredericton",
2479 "halifax",
2480 ])),
2481 )),
2482 Arc::new(StringArray::from_iter_values(vec![
2483 "albert", "terry", "lance", "", "tim",
2484 ])),
2485 ],
2486 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2487 );
2488
2489 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2490 let mut buffer = Vec::with_capacity(1024);
2491 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2492 writer.write(&batch).unwrap();
2493 writer.close().unwrap();
2494 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2495 .unwrap()
2496 .collect::<Result<Vec<_>, _>>()
2497 .unwrap();
2498
2499 assert_eq!(read.len(), 1);
2500 assert_eq!(&batch, &read[0])
2501 }
2502
2503 #[derive(Clone)]
2505 struct TestOptions {
2506 num_row_groups: usize,
2509 num_rows: usize,
2511 record_batch_size: usize,
2513 null_percent: Option<usize>,
2515 write_batch_size: usize,
2520 max_data_page_size: usize,
2522 max_dict_page_size: usize,
2524 writer_version: WriterVersion,
2526 enabled_statistics: EnabledStatistics,
2528 encoding: Encoding,
2530 row_selections: Option<(RowSelection, usize)>,
2532 row_filter: Option<Vec<bool>>,
2534 limit: Option<usize>,
2536 offset: Option<usize>,
2538 }
2539
2540 impl std::fmt::Debug for TestOptions {
2542 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2543 f.debug_struct("TestOptions")
2544 .field("num_row_groups", &self.num_row_groups)
2545 .field("num_rows", &self.num_rows)
2546 .field("record_batch_size", &self.record_batch_size)
2547 .field("null_percent", &self.null_percent)
2548 .field("write_batch_size", &self.write_batch_size)
2549 .field("max_data_page_size", &self.max_data_page_size)
2550 .field("max_dict_page_size", &self.max_dict_page_size)
2551 .field("writer_version", &self.writer_version)
2552 .field("enabled_statistics", &self.enabled_statistics)
2553 .field("encoding", &self.encoding)
2554 .field("row_selections", &self.row_selections.is_some())
2555 .field("row_filter", &self.row_filter.is_some())
2556 .field("limit", &self.limit)
2557 .field("offset", &self.offset)
2558 .finish()
2559 }
2560 }
2561
2562 impl Default for TestOptions {
2563 fn default() -> Self {
2564 Self {
2565 num_row_groups: 2,
2566 num_rows: 100,
2567 record_batch_size: 15,
2568 null_percent: None,
2569 write_batch_size: 64,
2570 max_data_page_size: 1024 * 1024,
2571 max_dict_page_size: 1024 * 1024,
2572 writer_version: WriterVersion::PARQUET_1_0,
2573 enabled_statistics: EnabledStatistics::Page,
2574 encoding: Encoding::PLAIN,
2575 row_selections: None,
2576 row_filter: None,
2577 limit: None,
2578 offset: None,
2579 }
2580 }
2581 }
2582
2583 impl TestOptions {
2584 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2585 Self {
2586 num_row_groups,
2587 num_rows,
2588 record_batch_size,
2589 ..Default::default()
2590 }
2591 }
2592
2593 fn with_null_percent(self, null_percent: usize) -> Self {
2594 Self {
2595 null_percent: Some(null_percent),
2596 ..self
2597 }
2598 }
2599
2600 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2601 Self {
2602 max_data_page_size,
2603 ..self
2604 }
2605 }
2606
2607 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2608 Self {
2609 max_dict_page_size,
2610 ..self
2611 }
2612 }
2613
2614 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2615 Self {
2616 enabled_statistics,
2617 ..self
2618 }
2619 }
2620
2621 fn with_row_selections(self) -> Self {
2622 assert!(self.row_filter.is_none(), "Must set row selection first");
2623
2624 let mut rng = rng();
2625 let step = rng.random_range(self.record_batch_size..self.num_rows);
2626 let row_selections = create_test_selection(
2627 step,
2628 self.num_row_groups * self.num_rows,
2629 rng.random::<bool>(),
2630 );
2631 Self {
2632 row_selections: Some(row_selections),
2633 ..self
2634 }
2635 }
2636
2637 fn with_row_filter(self) -> Self {
2638 let row_count = match &self.row_selections {
2639 Some((_, count)) => *count,
2640 None => self.num_row_groups * self.num_rows,
2641 };
2642
2643 let mut rng = rng();
2644 Self {
2645 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2646 ..self
2647 }
2648 }
2649
2650 fn with_limit(self, limit: usize) -> Self {
2651 Self {
2652 limit: Some(limit),
2653 ..self
2654 }
2655 }
2656
2657 fn with_offset(self, offset: usize) -> Self {
2658 Self {
2659 offset: Some(offset),
2660 ..self
2661 }
2662 }
2663
2664 fn writer_props(&self) -> WriterProperties {
2665 let builder = WriterProperties::builder()
2666 .set_data_page_size_limit(self.max_data_page_size)
2667 .set_write_batch_size(self.write_batch_size)
2668 .set_writer_version(self.writer_version)
2669 .set_statistics_enabled(self.enabled_statistics);
2670
2671 let builder = match self.encoding {
2672 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2673 .set_dictionary_enabled(true)
2674 .set_dictionary_page_size_limit(self.max_dict_page_size),
2675 _ => builder
2676 .set_dictionary_enabled(false)
2677 .set_encoding(self.encoding),
2678 };
2679
2680 builder.build()
2681 }
2682 }
2683
2684 fn run_single_column_reader_tests<T, F, G>(
2691 rand_max: i32,
2692 converted_type: ConvertedType,
2693 arrow_type: Option<ArrowDataType>,
2694 converter: F,
2695 encodings: &[Encoding],
2696 ) where
2697 T: DataType,
2698 G: RandGen<T>,
2699 F: Fn(&[Option<T::T>]) -> ArrayRef,
2700 {
2701 let all_options = vec![
2702 TestOptions::new(2, 100, 15),
2705 TestOptions::new(3, 25, 5),
2710 TestOptions::new(4, 100, 25),
2714 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2716 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2718 TestOptions::new(2, 256, 127).with_null_percent(0),
2720 TestOptions::new(2, 256, 93).with_null_percent(25),
2722 TestOptions::new(4, 100, 25).with_limit(0),
2724 TestOptions::new(4, 100, 25).with_limit(50),
2726 TestOptions::new(4, 100, 25).with_limit(10),
2728 TestOptions::new(4, 100, 25).with_limit(101),
2730 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2732 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2734 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2736 TestOptions::new(2, 256, 91)
2738 .with_null_percent(25)
2739 .with_enabled_statistics(EnabledStatistics::Chunk),
2740 TestOptions::new(2, 256, 91)
2742 .with_null_percent(25)
2743 .with_enabled_statistics(EnabledStatistics::None),
2744 TestOptions::new(2, 128, 91)
2746 .with_null_percent(100)
2747 .with_enabled_statistics(EnabledStatistics::None),
2748 TestOptions::new(2, 100, 15).with_row_selections(),
2753 TestOptions::new(3, 25, 5).with_row_selections(),
2758 TestOptions::new(4, 100, 25).with_row_selections(),
2762 TestOptions::new(3, 256, 73)
2764 .with_max_data_page_size(128)
2765 .with_row_selections(),
2766 TestOptions::new(3, 256, 57)
2768 .with_max_dict_page_size(128)
2769 .with_row_selections(),
2770 TestOptions::new(2, 256, 127)
2772 .with_null_percent(0)
2773 .with_row_selections(),
2774 TestOptions::new(2, 256, 93)
2776 .with_null_percent(25)
2777 .with_row_selections(),
2778 TestOptions::new(2, 256, 93)
2780 .with_null_percent(25)
2781 .with_row_selections()
2782 .with_limit(10),
2783 TestOptions::new(2, 256, 93)
2785 .with_null_percent(25)
2786 .with_row_selections()
2787 .with_offset(20)
2788 .with_limit(10),
2789 TestOptions::new(4, 100, 25).with_row_filter(),
2793 TestOptions::new(4, 100, 25)
2795 .with_row_selections()
2796 .with_row_filter(),
2797 TestOptions::new(2, 256, 93)
2799 .with_null_percent(25)
2800 .with_max_data_page_size(10)
2801 .with_row_filter(),
2802 TestOptions::new(2, 256, 93)
2804 .with_null_percent(25)
2805 .with_max_data_page_size(10)
2806 .with_row_selections()
2807 .with_row_filter(),
2808 TestOptions::new(2, 256, 93)
2810 .with_enabled_statistics(EnabledStatistics::None)
2811 .with_max_data_page_size(10)
2812 .with_row_selections(),
2813 ];
2814
2815 all_options.into_iter().for_each(|opts| {
2816 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2817 for encoding in encodings {
2818 let opts = TestOptions {
2819 writer_version,
2820 encoding: *encoding,
2821 ..opts.clone()
2822 };
2823
2824 single_column_reader_test::<T, _, G>(
2825 opts,
2826 rand_max,
2827 converted_type,
2828 arrow_type.clone(),
2829 &converter,
2830 )
2831 }
2832 }
2833 });
2834 }
2835
2836 fn single_column_reader_test<T, F, G>(
2840 opts: TestOptions,
2841 rand_max: i32,
2842 converted_type: ConvertedType,
2843 arrow_type: Option<ArrowDataType>,
2844 converter: F,
2845 ) where
2846 T: DataType,
2847 G: RandGen<T>,
2848 F: Fn(&[Option<T::T>]) -> ArrayRef,
2849 {
2850 println!(
2852 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2853 T::get_physical_type(),
2854 converted_type,
2855 arrow_type,
2856 opts
2857 );
2858
2859 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2861 Some(null_percent) => {
2862 let mut rng = rng();
2863
2864 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2865 .map(|_| {
2866 std::iter::from_fn(|| {
2867 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2868 })
2869 .take(opts.num_rows)
2870 .collect()
2871 })
2872 .collect();
2873 (Repetition::OPTIONAL, Some(def_levels))
2874 }
2875 None => (Repetition::REQUIRED, None),
2876 };
2877
2878 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2880 .map(|idx| {
2881 let null_count = match def_levels.as_ref() {
2882 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2883 None => 0,
2884 };
2885 G::gen_vec(rand_max, opts.num_rows - null_count)
2886 })
2887 .collect();
2888
2889 let len = match T::get_physical_type() {
2890 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2891 crate::basic::Type::INT96 => 12,
2892 _ => -1,
2893 };
2894
2895 let fields = vec![Arc::new(
2896 Type::primitive_type_builder("leaf", T::get_physical_type())
2897 .with_repetition(repetition)
2898 .with_converted_type(converted_type)
2899 .with_length(len)
2900 .build()
2901 .unwrap(),
2902 )];
2903
2904 let schema = Arc::new(
2905 Type::group_type_builder("test_schema")
2906 .with_fields(fields)
2907 .build()
2908 .unwrap(),
2909 );
2910
2911 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2912
2913 let mut file = tempfile::tempfile().unwrap();
2914
2915 generate_single_column_file_with_data::<T>(
2916 &values,
2917 def_levels.as_ref(),
2918 file.try_clone().unwrap(), schema,
2920 arrow_field,
2921 &opts,
2922 )
2923 .unwrap();
2924
2925 file.rewind().unwrap();
2926
2927 let options = ArrowReaderOptions::new()
2928 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2929
2930 let mut builder =
2931 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2932
2933 let expected_data = match opts.row_selections {
2934 Some((selections, row_count)) => {
2935 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2936
2937 let mut skip_data: Vec<Option<T::T>> = vec![];
2938 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2939 for select in dequeue {
2940 if select.skip {
2941 without_skip_data.drain(0..select.row_count);
2942 } else {
2943 skip_data.extend(without_skip_data.drain(0..select.row_count));
2944 }
2945 }
2946 builder = builder.with_row_selection(selections);
2947
2948 assert_eq!(skip_data.len(), row_count);
2949 skip_data
2950 }
2951 None => {
2952 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2954 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2955 expected_data
2956 }
2957 };
2958
2959 let mut expected_data = match opts.row_filter {
2960 Some(filter) => {
2961 let expected_data = expected_data
2962 .into_iter()
2963 .zip(filter.iter())
2964 .filter_map(|(d, f)| f.then(|| d))
2965 .collect();
2966
2967 let mut filter_offset = 0;
2968 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2969 ProjectionMask::all(),
2970 move |b| {
2971 let array = BooleanArray::from_iter(
2972 filter
2973 .iter()
2974 .skip(filter_offset)
2975 .take(b.num_rows())
2976 .map(|x| Some(*x)),
2977 );
2978 filter_offset += b.num_rows();
2979 Ok(array)
2980 },
2981 ))]);
2982
2983 builder = builder.with_row_filter(filter);
2984 expected_data
2985 }
2986 None => expected_data,
2987 };
2988
2989 if let Some(offset) = opts.offset {
2990 builder = builder.with_offset(offset);
2991 expected_data = expected_data.into_iter().skip(offset).collect();
2992 }
2993
2994 if let Some(limit) = opts.limit {
2995 builder = builder.with_limit(limit);
2996 expected_data = expected_data.into_iter().take(limit).collect();
2997 }
2998
2999 let mut record_reader = builder
3000 .with_batch_size(opts.record_batch_size)
3001 .build()
3002 .unwrap();
3003
3004 let mut total_read = 0;
3005 loop {
3006 let maybe_batch = record_reader.next();
3007 if total_read < expected_data.len() {
3008 let end = min(total_read + opts.record_batch_size, expected_data.len());
3009 let batch = maybe_batch.unwrap().unwrap();
3010 assert_eq!(end - total_read, batch.num_rows());
3011
3012 let a = converter(&expected_data[total_read..end]);
3013 let b = Arc::clone(batch.column(0));
3014
3015 assert_eq!(a.data_type(), b.data_type());
3016 assert_eq!(a.to_data(), b.to_data());
3017 assert_eq!(
3018 a.as_any().type_id(),
3019 b.as_any().type_id(),
3020 "incorrect type ids"
3021 );
3022
3023 total_read = end;
3024 } else {
3025 assert!(maybe_batch.is_none());
3026 break;
3027 }
3028 }
3029 }
3030
3031 fn gen_expected_data<T: DataType>(
3032 def_levels: Option<&Vec<Vec<i16>>>,
3033 values: &[Vec<T::T>],
3034 ) -> Vec<Option<T::T>> {
3035 let data: Vec<Option<T::T>> = match def_levels {
3036 Some(levels) => {
3037 let mut values_iter = values.iter().flatten();
3038 levels
3039 .iter()
3040 .flatten()
3041 .map(|d| match d {
3042 1 => Some(values_iter.next().cloned().unwrap()),
3043 0 => None,
3044 _ => unreachable!(),
3045 })
3046 .collect()
3047 }
3048 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3049 };
3050 data
3051 }
3052
3053 fn generate_single_column_file_with_data<T: DataType>(
3054 values: &[Vec<T::T>],
3055 def_levels: Option<&Vec<Vec<i16>>>,
3056 file: File,
3057 schema: TypePtr,
3058 field: Option<Field>,
3059 opts: &TestOptions,
3060 ) -> Result<ParquetMetaData> {
3061 let mut writer_props = opts.writer_props();
3062 if let Some(field) = field {
3063 let arrow_schema = Schema::new(vec![field]);
3064 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3065 }
3066
3067 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3068
3069 for (idx, v) in values.iter().enumerate() {
3070 let def_levels = def_levels.map(|d| d[idx].as_slice());
3071 let mut row_group_writer = writer.next_row_group()?;
3072 {
3073 let mut column_writer = row_group_writer
3074 .next_column()?
3075 .expect("Column writer is none!");
3076
3077 column_writer
3078 .typed::<T>()
3079 .write_batch(v, def_levels, None)?;
3080
3081 column_writer.close()?;
3082 }
3083 row_group_writer.close()?;
3084 }
3085
3086 writer.close()
3087 }
3088
3089 fn get_test_file(file_name: &str) -> File {
3090 let mut path = PathBuf::new();
3091 path.push(arrow::util::test_util::arrow_test_data());
3092 path.push(file_name);
3093
3094 File::open(path.as_path()).expect("File not found!")
3095 }
3096
3097 #[test]
3098 fn test_read_structs() {
3099 let testdata = arrow::util::test_util::parquet_test_data();
3103 let path = format!("{testdata}/nested_structs.rust.parquet");
3104 let file = File::open(&path).unwrap();
3105 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3106
3107 for batch in record_batch_reader {
3108 batch.unwrap();
3109 }
3110
3111 let file = File::open(&path).unwrap();
3112 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3113
3114 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3115 let projected_reader = builder
3116 .with_projection(mask)
3117 .with_batch_size(60)
3118 .build()
3119 .unwrap();
3120
3121 let expected_schema = Schema::new(vec![
3122 Field::new(
3123 "roll_num",
3124 ArrowDataType::Struct(Fields::from(vec![Field::new(
3125 "count",
3126 ArrowDataType::UInt64,
3127 false,
3128 )])),
3129 false,
3130 ),
3131 Field::new(
3132 "PC_CUR",
3133 ArrowDataType::Struct(Fields::from(vec![
3134 Field::new("mean", ArrowDataType::Int64, false),
3135 Field::new("sum", ArrowDataType::Int64, false),
3136 ])),
3137 false,
3138 ),
3139 ]);
3140
3141 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3143
3144 for batch in projected_reader {
3145 let batch = batch.unwrap();
3146 assert_eq!(batch.schema().as_ref(), &expected_schema);
3147 }
3148 }
3149
3150 #[test]
3151 fn test_read_structs_by_name() {
3153 let testdata = arrow::util::test_util::parquet_test_data();
3154 let path = format!("{testdata}/nested_structs.rust.parquet");
3155 let file = File::open(&path).unwrap();
3156 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3157
3158 for batch in record_batch_reader {
3159 batch.unwrap();
3160 }
3161
3162 let file = File::open(&path).unwrap();
3163 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3164
3165 let mask = ProjectionMask::columns(
3166 builder.parquet_schema(),
3167 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3168 );
3169 let projected_reader = builder
3170 .with_projection(mask)
3171 .with_batch_size(60)
3172 .build()
3173 .unwrap();
3174
3175 let expected_schema = Schema::new(vec![
3176 Field::new(
3177 "roll_num",
3178 ArrowDataType::Struct(Fields::from(vec![Field::new(
3179 "count",
3180 ArrowDataType::UInt64,
3181 false,
3182 )])),
3183 false,
3184 ),
3185 Field::new(
3186 "PC_CUR",
3187 ArrowDataType::Struct(Fields::from(vec![
3188 Field::new("mean", ArrowDataType::Int64, false),
3189 Field::new("sum", ArrowDataType::Int64, false),
3190 ])),
3191 false,
3192 ),
3193 ]);
3194
3195 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3196
3197 for batch in projected_reader {
3198 let batch = batch.unwrap();
3199 assert_eq!(batch.schema().as_ref(), &expected_schema);
3200 }
3201 }
3202
3203 #[test]
3204 fn test_read_maps() {
3205 let testdata = arrow::util::test_util::parquet_test_data();
3206 let path = format!("{testdata}/nested_maps.snappy.parquet");
3207 let file = File::open(path).unwrap();
3208 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3209
3210 for batch in record_batch_reader {
3211 batch.unwrap();
3212 }
3213 }
3214
3215 #[test]
3216 fn test_nested_nullability() {
3217 let message_type = "message nested {
3218 OPTIONAL Group group {
3219 REQUIRED INT32 leaf;
3220 }
3221 }";
3222
3223 let file = tempfile::tempfile().unwrap();
3224 let schema = Arc::new(parse_message_type(message_type).unwrap());
3225
3226 {
3227 let mut writer =
3229 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3230 .unwrap();
3231
3232 {
3233 let mut row_group_writer = writer.next_row_group().unwrap();
3234 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3235
3236 column_writer
3237 .typed::<Int32Type>()
3238 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3239 .unwrap();
3240
3241 column_writer.close().unwrap();
3242 row_group_writer.close().unwrap();
3243 }
3244
3245 writer.close().unwrap();
3246 }
3247
3248 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3249 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3250
3251 let reader = builder.with_projection(mask).build().unwrap();
3252
3253 let expected_schema = Schema::new(vec![Field::new(
3254 "group",
3255 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3256 true,
3257 )]);
3258
3259 let batch = reader.into_iter().next().unwrap().unwrap();
3260 assert_eq!(batch.schema().as_ref(), &expected_schema);
3261 assert_eq!(batch.num_rows(), 4);
3262 assert_eq!(batch.column(0).null_count(), 2);
3263 }
3264
3265 #[test]
3266 fn test_invalid_utf8() {
3267 let data = vec![
3269 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3270 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3271 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3272 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3273 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3274 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3275 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3276 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3277 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3278 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3279 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3280 82, 49,
3281 ];
3282
3283 let file = Bytes::from(data);
3284 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3285
3286 let error = record_batch_reader.next().unwrap().unwrap_err();
3287
3288 assert!(
3289 error.to_string().contains("invalid utf-8 sequence"),
3290 "{}",
3291 error
3292 );
3293 }
3294
3295 #[test]
3296 fn test_invalid_utf8_string_array() {
3297 test_invalid_utf8_string_array_inner::<i32>();
3298 }
3299
3300 #[test]
3301 fn test_invalid_utf8_large_string_array() {
3302 test_invalid_utf8_string_array_inner::<i64>();
3303 }
3304
3305 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3306 let cases = [
3307 invalid_utf8_first_char::<O>(),
3308 invalid_utf8_first_char_long_strings::<O>(),
3309 invalid_utf8_later_char::<O>(),
3310 invalid_utf8_later_char_long_strings::<O>(),
3311 invalid_utf8_later_char_really_long_strings::<O>(),
3312 invalid_utf8_later_char_really_long_strings2::<O>(),
3313 ];
3314 for array in &cases {
3315 for encoding in STRING_ENCODINGS {
3316 let array = unsafe {
3319 GenericStringArray::<O>::new_unchecked(
3320 array.offsets().clone(),
3321 array.values().clone(),
3322 array.nulls().cloned(),
3323 )
3324 };
3325 let data_type = array.data_type().clone();
3326 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3327 let err = read_from_parquet(data).unwrap_err();
3328 let expected_err =
3329 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3330 assert!(
3331 err.to_string().contains(expected_err),
3332 "data type: {data_type}, expected: {expected_err}, got: {err}"
3333 );
3334 }
3335 }
3336 }
3337
3338 #[test]
3339 fn test_invalid_utf8_string_view_array() {
3340 let cases = [
3341 invalid_utf8_first_char::<i32>(),
3342 invalid_utf8_first_char_long_strings::<i32>(),
3343 invalid_utf8_later_char::<i32>(),
3344 invalid_utf8_later_char_long_strings::<i32>(),
3345 invalid_utf8_later_char_really_long_strings::<i32>(),
3346 invalid_utf8_later_char_really_long_strings2::<i32>(),
3347 ];
3348
3349 for encoding in STRING_ENCODINGS {
3350 for array in &cases {
3351 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3352 let array = array.as_binary_view();
3353
3354 let array = unsafe {
3357 StringViewArray::new_unchecked(
3358 array.views().clone(),
3359 array.data_buffers().to_vec(),
3360 array.nulls().cloned(),
3361 )
3362 };
3363
3364 let data_type = array.data_type().clone();
3365 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3366 let err = read_from_parquet(data).unwrap_err();
3367 let expected_err =
3368 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3369 assert!(
3370 err.to_string().contains(expected_err),
3371 "data type: {data_type}, expected: {expected_err}, got: {err}"
3372 );
3373 }
3374 }
3375 }
3376
3377 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3379 None,
3380 Some(Encoding::PLAIN),
3381 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3382 Some(Encoding::DELTA_BYTE_ARRAY),
3383 ];
3384
3385 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3388
3389 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3392
3393 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3395 let valid: &[u8] = b" ";
3396 let invalid = INVALID_UTF8_FIRST_CHAR;
3397 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3398 }
3399
3400 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3404 let valid: &[u8] = b" ";
3405 let mut invalid = vec![];
3406 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3407 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3408 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3409 }
3410
3411 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3414 let valid: &[u8] = b" ";
3415 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3416 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3417 }
3418
3419 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3423 let valid: &[u8] = b" ";
3424 let mut invalid = vec![];
3425 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3426 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3427 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3428 }
3429
3430 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3434 let valid: &[u8] = b" ";
3435 let mut invalid = vec![];
3436 for _ in 0..10 {
3437 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3439 }
3440 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3441 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3442 }
3443
3444 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3447 let valid: &[u8] = b" ";
3448 let mut valid_long = vec![];
3449 for _ in 0..10 {
3450 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3452 }
3453 let invalid = INVALID_UTF8_LATER_CHAR;
3454 GenericBinaryArray::<O>::from_iter(vec![
3455 None,
3456 Some(valid),
3457 Some(invalid),
3458 None,
3459 Some(&valid_long),
3460 Some(valid),
3461 ])
3462 }
3463
3464 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3469 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3470 let mut data = vec![];
3471 let schema = batch.schema();
3472 let props = encoding.map(|encoding| {
3473 WriterProperties::builder()
3474 .set_dictionary_enabled(false)
3476 .set_encoding(encoding)
3477 .build()
3478 });
3479
3480 {
3481 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3482 writer.write(&batch).unwrap();
3483 writer.flush().unwrap();
3484 writer.close().unwrap();
3485 };
3486 data
3487 }
3488
3489 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3491 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3492 .unwrap()
3493 .build()
3494 .unwrap();
3495
3496 reader.collect()
3497 }
3498
3499 #[test]
3500 fn test_dictionary_preservation() {
3501 let fields = vec![Arc::new(
3502 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3503 .with_repetition(Repetition::OPTIONAL)
3504 .with_converted_type(ConvertedType::UTF8)
3505 .build()
3506 .unwrap(),
3507 )];
3508
3509 let schema = Arc::new(
3510 Type::group_type_builder("test_schema")
3511 .with_fields(fields)
3512 .build()
3513 .unwrap(),
3514 );
3515
3516 let dict_type = ArrowDataType::Dictionary(
3517 Box::new(ArrowDataType::Int32),
3518 Box::new(ArrowDataType::Utf8),
3519 );
3520
3521 let arrow_field = Field::new("leaf", dict_type, true);
3522
3523 let mut file = tempfile::tempfile().unwrap();
3524
3525 let values = vec![
3526 vec![
3527 ByteArray::from("hello"),
3528 ByteArray::from("a"),
3529 ByteArray::from("b"),
3530 ByteArray::from("d"),
3531 ],
3532 vec![
3533 ByteArray::from("c"),
3534 ByteArray::from("a"),
3535 ByteArray::from("b"),
3536 ],
3537 ];
3538
3539 let def_levels = vec![
3540 vec![1, 0, 0, 1, 0, 0, 1, 1],
3541 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3542 ];
3543
3544 let opts = TestOptions {
3545 encoding: Encoding::RLE_DICTIONARY,
3546 ..Default::default()
3547 };
3548
3549 generate_single_column_file_with_data::<ByteArrayType>(
3550 &values,
3551 Some(&def_levels),
3552 file.try_clone().unwrap(), schema,
3554 Some(arrow_field),
3555 &opts,
3556 )
3557 .unwrap();
3558
3559 file.rewind().unwrap();
3560
3561 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3562
3563 let batches = record_reader
3564 .collect::<Result<Vec<RecordBatch>, _>>()
3565 .unwrap();
3566
3567 assert_eq!(batches.len(), 6);
3568 assert!(batches.iter().all(|x| x.num_columns() == 1));
3569
3570 let row_counts = batches
3571 .iter()
3572 .map(|x| (x.num_rows(), x.column(0).null_count()))
3573 .collect::<Vec<_>>();
3574
3575 assert_eq!(
3576 row_counts,
3577 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3578 );
3579
3580 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3581
3582 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3584 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3586 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3587 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3589 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3590 }
3591
3592 #[test]
3593 fn test_read_null_list() {
3594 let testdata = arrow::util::test_util::parquet_test_data();
3595 let path = format!("{testdata}/null_list.parquet");
3596 let file = File::open(path).unwrap();
3597 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3598
3599 let batch = record_batch_reader.next().unwrap().unwrap();
3600 assert_eq!(batch.num_rows(), 1);
3601 assert_eq!(batch.num_columns(), 1);
3602 assert_eq!(batch.column(0).len(), 1);
3603
3604 let list = batch
3605 .column(0)
3606 .as_any()
3607 .downcast_ref::<ListArray>()
3608 .unwrap();
3609 assert_eq!(list.len(), 1);
3610 assert!(list.is_valid(0));
3611
3612 let val = list.value(0);
3613 assert_eq!(val.len(), 0);
3614 }
3615
3616 #[test]
3617 fn test_null_schema_inference() {
3618 let testdata = arrow::util::test_util::parquet_test_data();
3619 let path = format!("{testdata}/null_list.parquet");
3620 let file = File::open(path).unwrap();
3621
3622 let arrow_field = Field::new(
3623 "emptylist",
3624 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3625 true,
3626 );
3627
3628 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3629 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3630 let schema = builder.schema();
3631 assert_eq!(schema.fields().len(), 1);
3632 assert_eq!(schema.field(0), &arrow_field);
3633 }
3634
3635 #[test]
3636 fn test_skip_metadata() {
3637 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3638 let field = Field::new("col", col.data_type().clone(), true);
3639
3640 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3641
3642 let metadata = [("key".to_string(), "value".to_string())]
3643 .into_iter()
3644 .collect();
3645
3646 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3647
3648 assert_ne!(schema_with_metadata, schema_without_metadata);
3649
3650 let batch =
3651 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3652
3653 let file = |version: WriterVersion| {
3654 let props = WriterProperties::builder()
3655 .set_writer_version(version)
3656 .build();
3657
3658 let file = tempfile().unwrap();
3659 let mut writer =
3660 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3661 .unwrap();
3662 writer.write(&batch).unwrap();
3663 writer.close().unwrap();
3664 file
3665 };
3666
3667 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3668
3669 let v1_reader = file(WriterVersion::PARQUET_1_0);
3670 let v2_reader = file(WriterVersion::PARQUET_2_0);
3671
3672 let arrow_reader =
3673 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3674 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3675
3676 let reader =
3677 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3678 .unwrap()
3679 .build()
3680 .unwrap();
3681 assert_eq!(reader.schema(), schema_without_metadata);
3682
3683 let arrow_reader =
3684 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3685 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3686
3687 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3688 .unwrap()
3689 .build()
3690 .unwrap();
3691 assert_eq!(reader.schema(), schema_without_metadata);
3692 }
3693
3694 fn write_parquet_from_iter<I, F>(value: I) -> File
3695 where
3696 I: IntoIterator<Item = (F, ArrayRef)>,
3697 F: AsRef<str>,
3698 {
3699 let batch = RecordBatch::try_from_iter(value).unwrap();
3700 let file = tempfile().unwrap();
3701 let mut writer =
3702 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3703 writer.write(&batch).unwrap();
3704 writer.close().unwrap();
3705 file
3706 }
3707
3708 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3709 where
3710 I: IntoIterator<Item = (F, ArrayRef)>,
3711 F: AsRef<str>,
3712 {
3713 let file = write_parquet_from_iter(value);
3714 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3715 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3716 file.try_clone().unwrap(),
3717 options_with_schema,
3718 );
3719 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3720 }
3721
3722 #[test]
3723 fn test_schema_too_few_columns() {
3724 run_schema_test_with_error(
3725 vec![
3726 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3727 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3728 ],
3729 Arc::new(Schema::new(vec![Field::new(
3730 "int64",
3731 ArrowDataType::Int64,
3732 false,
3733 )])),
3734 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3735 );
3736 }
3737
3738 #[test]
3739 fn test_schema_too_many_columns() {
3740 run_schema_test_with_error(
3741 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3742 Arc::new(Schema::new(vec![
3743 Field::new("int64", ArrowDataType::Int64, false),
3744 Field::new("int32", ArrowDataType::Int32, false),
3745 ])),
3746 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3747 );
3748 }
3749
3750 #[test]
3751 fn test_schema_mismatched_column_names() {
3752 run_schema_test_with_error(
3753 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3754 Arc::new(Schema::new(vec![Field::new(
3755 "other",
3756 ArrowDataType::Int64,
3757 false,
3758 )])),
3759 "Arrow: incompatible arrow schema, expected field named int64 got other",
3760 );
3761 }
3762
3763 #[test]
3764 fn test_schema_incompatible_columns() {
3765 run_schema_test_with_error(
3766 vec![
3767 (
3768 "col1_invalid",
3769 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3770 ),
3771 (
3772 "col2_valid",
3773 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3774 ),
3775 (
3776 "col3_invalid",
3777 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3778 ),
3779 ],
3780 Arc::new(Schema::new(vec![
3781 Field::new("col1_invalid", ArrowDataType::Int32, false),
3782 Field::new("col2_valid", ArrowDataType::Int32, false),
3783 Field::new("col3_invalid", ArrowDataType::Int32, false),
3784 ])),
3785 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3786 );
3787 }
3788
3789 #[test]
3790 fn test_one_incompatible_nested_column() {
3791 let nested_fields = Fields::from(vec![
3792 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3793 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3794 ]);
3795 let nested = StructArray::try_new(
3796 nested_fields,
3797 vec![
3798 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3799 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3800 ],
3801 None,
3802 )
3803 .expect("struct array");
3804 let supplied_nested_fields = Fields::from(vec![
3805 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3806 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3807 ]);
3808 run_schema_test_with_error(
3809 vec![
3810 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3811 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3812 ("nested", Arc::new(nested) as ArrayRef),
3813 ],
3814 Arc::new(Schema::new(vec![
3815 Field::new("col1", ArrowDataType::Int64, false),
3816 Field::new("col2", ArrowDataType::Int32, false),
3817 Field::new(
3818 "nested",
3819 ArrowDataType::Struct(supplied_nested_fields),
3820 false,
3821 ),
3822 ])),
3823 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3824 requested Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int32) \
3825 but found Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int64)",
3826 );
3827 }
3828
3829 fn utf8_parquet() -> Bytes {
3831 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3832 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3833 let props = None;
3834 let mut parquet_data = vec![];
3836 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3837 writer.write(&batch).unwrap();
3838 writer.close().unwrap();
3839 Bytes::from(parquet_data)
3840 }
3841
3842 #[test]
3843 fn test_schema_error_bad_types() {
3844 let parquet_data = utf8_parquet();
3846
3847 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3849 "column1",
3850 arrow::datatypes::DataType::Int32,
3851 false,
3852 )]));
3853
3854 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3856 let err =
3857 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3858 .unwrap_err();
3859 assert_eq!(
3860 err.to_string(),
3861 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
3862 )
3863 }
3864
3865 #[test]
3866 fn test_schema_error_bad_nullability() {
3867 let parquet_data = utf8_parquet();
3869
3870 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3872 "column1",
3873 arrow::datatypes::DataType::Utf8,
3874 true,
3875 )]));
3876
3877 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3879 let err =
3880 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3881 .unwrap_err();
3882 assert_eq!(
3883 err.to_string(),
3884 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
3885 )
3886 }
3887
3888 #[test]
3889 fn test_read_binary_as_utf8() {
3890 let file = write_parquet_from_iter(vec![
3891 (
3892 "binary_to_utf8",
3893 Arc::new(BinaryArray::from(vec![
3894 b"one".as_ref(),
3895 b"two".as_ref(),
3896 b"three".as_ref(),
3897 ])) as ArrayRef,
3898 ),
3899 (
3900 "large_binary_to_large_utf8",
3901 Arc::new(LargeBinaryArray::from(vec![
3902 b"one".as_ref(),
3903 b"two".as_ref(),
3904 b"three".as_ref(),
3905 ])) as ArrayRef,
3906 ),
3907 (
3908 "binary_view_to_utf8_view",
3909 Arc::new(BinaryViewArray::from(vec![
3910 b"one".as_ref(),
3911 b"two".as_ref(),
3912 b"three".as_ref(),
3913 ])) as ArrayRef,
3914 ),
3915 ]);
3916 let supplied_fields = Fields::from(vec![
3917 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3918 Field::new(
3919 "large_binary_to_large_utf8",
3920 ArrowDataType::LargeUtf8,
3921 false,
3922 ),
3923 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3924 ]);
3925
3926 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3927 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3928 file.try_clone().unwrap(),
3929 options,
3930 )
3931 .expect("reader builder with schema")
3932 .build()
3933 .expect("reader with schema");
3934
3935 let batch = arrow_reader.next().unwrap().unwrap();
3936 assert_eq!(batch.num_columns(), 3);
3937 assert_eq!(batch.num_rows(), 3);
3938 assert_eq!(
3939 batch
3940 .column(0)
3941 .as_string::<i32>()
3942 .iter()
3943 .collect::<Vec<_>>(),
3944 vec![Some("one"), Some("two"), Some("three")]
3945 );
3946
3947 assert_eq!(
3948 batch
3949 .column(1)
3950 .as_string::<i64>()
3951 .iter()
3952 .collect::<Vec<_>>(),
3953 vec![Some("one"), Some("two"), Some("three")]
3954 );
3955
3956 assert_eq!(
3957 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3958 vec![Some("one"), Some("two"), Some("three")]
3959 );
3960 }
3961
3962 #[test]
3963 #[should_panic(expected = "Invalid UTF8 sequence at")]
3964 fn test_read_non_utf8_binary_as_utf8() {
3965 let file = write_parquet_from_iter(vec![(
3966 "non_utf8_binary",
3967 Arc::new(BinaryArray::from(vec![
3968 b"\xDE\x00\xFF".as_ref(),
3969 b"\xDE\x01\xAA".as_ref(),
3970 b"\xDE\x02\xFF".as_ref(),
3971 ])) as ArrayRef,
3972 )]);
3973 let supplied_fields = Fields::from(vec![Field::new(
3974 "non_utf8_binary",
3975 ArrowDataType::Utf8,
3976 false,
3977 )]);
3978
3979 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3980 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3981 file.try_clone().unwrap(),
3982 options,
3983 )
3984 .expect("reader builder with schema")
3985 .build()
3986 .expect("reader with schema");
3987 arrow_reader.next().unwrap().unwrap_err();
3988 }
3989
3990 #[test]
3991 fn test_with_schema() {
3992 let nested_fields = Fields::from(vec![
3993 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3994 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3995 ]);
3996
3997 let nested_arrays: Vec<ArrayRef> = vec![
3998 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3999 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4000 ];
4001
4002 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4003
4004 let file = write_parquet_from_iter(vec![
4005 (
4006 "int32_to_ts_second",
4007 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4008 ),
4009 (
4010 "date32_to_date64",
4011 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4012 ),
4013 ("nested", Arc::new(nested) as ArrayRef),
4014 ]);
4015
4016 let supplied_nested_fields = Fields::from(vec![
4017 Field::new(
4018 "utf8_to_dict",
4019 ArrowDataType::Dictionary(
4020 Box::new(ArrowDataType::Int32),
4021 Box::new(ArrowDataType::Utf8),
4022 ),
4023 false,
4024 ),
4025 Field::new(
4026 "int64_to_ts_nano",
4027 ArrowDataType::Timestamp(
4028 arrow::datatypes::TimeUnit::Nanosecond,
4029 Some("+10:00".into()),
4030 ),
4031 false,
4032 ),
4033 ]);
4034
4035 let supplied_schema = Arc::new(Schema::new(vec![
4036 Field::new(
4037 "int32_to_ts_second",
4038 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4039 false,
4040 ),
4041 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4042 Field::new(
4043 "nested",
4044 ArrowDataType::Struct(supplied_nested_fields),
4045 false,
4046 ),
4047 ]));
4048
4049 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4050 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4051 file.try_clone().unwrap(),
4052 options,
4053 )
4054 .expect("reader builder with schema")
4055 .build()
4056 .expect("reader with schema");
4057
4058 assert_eq!(arrow_reader.schema(), supplied_schema);
4059 let batch = arrow_reader.next().unwrap().unwrap();
4060 assert_eq!(batch.num_columns(), 3);
4061 assert_eq!(batch.num_rows(), 4);
4062 assert_eq!(
4063 batch
4064 .column(0)
4065 .as_any()
4066 .downcast_ref::<TimestampSecondArray>()
4067 .expect("downcast to timestamp second")
4068 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4069 .map(|v| v.to_string())
4070 .expect("value as datetime"),
4071 "1970-01-01 01:00:00 +01:00"
4072 );
4073 assert_eq!(
4074 batch
4075 .column(1)
4076 .as_any()
4077 .downcast_ref::<Date64Array>()
4078 .expect("downcast to date64")
4079 .value_as_date(0)
4080 .map(|v| v.to_string())
4081 .expect("value as date"),
4082 "1970-01-01"
4083 );
4084
4085 let nested = batch
4086 .column(2)
4087 .as_any()
4088 .downcast_ref::<StructArray>()
4089 .expect("downcast to struct");
4090
4091 let nested_dict = nested
4092 .column(0)
4093 .as_any()
4094 .downcast_ref::<Int32DictionaryArray>()
4095 .expect("downcast to dictionary");
4096
4097 assert_eq!(
4098 nested_dict
4099 .values()
4100 .as_any()
4101 .downcast_ref::<StringArray>()
4102 .expect("downcast to string")
4103 .iter()
4104 .collect::<Vec<_>>(),
4105 vec![Some("a"), Some("b")]
4106 );
4107
4108 assert_eq!(
4109 nested_dict.keys().iter().collect::<Vec<_>>(),
4110 vec![Some(0), Some(0), Some(0), Some(1)]
4111 );
4112
4113 assert_eq!(
4114 nested
4115 .column(1)
4116 .as_any()
4117 .downcast_ref::<TimestampNanosecondArray>()
4118 .expect("downcast to timestamp nanosecond")
4119 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4120 .map(|v| v.to_string())
4121 .expect("value as datetime"),
4122 "1970-01-01 10:00:00.000000001 +10:00"
4123 );
4124 }
4125
4126 #[test]
4127 fn test_empty_projection() {
4128 let testdata = arrow::util::test_util::parquet_test_data();
4129 let path = format!("{testdata}/alltypes_plain.parquet");
4130 let file = File::open(path).unwrap();
4131
4132 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4133 let file_metadata = builder.metadata().file_metadata();
4134 let expected_rows = file_metadata.num_rows() as usize;
4135
4136 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4137 let batch_reader = builder
4138 .with_projection(mask)
4139 .with_batch_size(2)
4140 .build()
4141 .unwrap();
4142
4143 let mut total_rows = 0;
4144 for maybe_batch in batch_reader {
4145 let batch = maybe_batch.unwrap();
4146 total_rows += batch.num_rows();
4147 assert_eq!(batch.num_columns(), 0);
4148 assert!(batch.num_rows() <= 2);
4149 }
4150
4151 assert_eq!(total_rows, expected_rows);
4152 }
4153
4154 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4155 let schema = Arc::new(Schema::new(vec![Field::new(
4156 "list",
4157 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4158 true,
4159 )]));
4160
4161 let mut buf = Vec::with_capacity(1024);
4162
4163 let mut writer = ArrowWriter::try_new(
4164 &mut buf,
4165 schema.clone(),
4166 Some(
4167 WriterProperties::builder()
4168 .set_max_row_group_size(row_group_size)
4169 .build(),
4170 ),
4171 )
4172 .unwrap();
4173 for _ in 0..2 {
4174 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4175 for _ in 0..(batch_size) {
4176 list_builder.append(true);
4177 }
4178 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4179 .unwrap();
4180 writer.write(&batch).unwrap();
4181 }
4182 writer.close().unwrap();
4183
4184 let mut record_reader =
4185 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4186 assert_eq!(
4187 batch_size,
4188 record_reader.next().unwrap().unwrap().num_rows()
4189 );
4190 assert_eq!(
4191 batch_size,
4192 record_reader.next().unwrap().unwrap().num_rows()
4193 );
4194 }
4195
4196 #[test]
4197 fn test_row_group_exact_multiple() {
4198 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4199 test_row_group_batch(8, 8);
4200 test_row_group_batch(10, 8);
4201 test_row_group_batch(8, 10);
4202 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4203 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4204 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4205 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4206 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4207 }
4208
4209 fn get_expected_batches(
4212 column: &RecordBatch,
4213 selection: &RowSelection,
4214 batch_size: usize,
4215 ) -> Vec<RecordBatch> {
4216 let mut expected_batches = vec![];
4217
4218 let mut selection: VecDeque<_> = selection.clone().into();
4219 let mut row_offset = 0;
4220 let mut last_start = None;
4221 while row_offset < column.num_rows() && !selection.is_empty() {
4222 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4223 while batch_remaining > 0 && !selection.is_empty() {
4224 let (to_read, skip) = match selection.front_mut() {
4225 Some(selection) if selection.row_count > batch_remaining => {
4226 selection.row_count -= batch_remaining;
4227 (batch_remaining, selection.skip)
4228 }
4229 Some(_) => {
4230 let select = selection.pop_front().unwrap();
4231 (select.row_count, select.skip)
4232 }
4233 None => break,
4234 };
4235
4236 batch_remaining -= to_read;
4237
4238 match skip {
4239 true => {
4240 if let Some(last_start) = last_start.take() {
4241 expected_batches.push(column.slice(last_start, row_offset - last_start))
4242 }
4243 row_offset += to_read
4244 }
4245 false => {
4246 last_start.get_or_insert(row_offset);
4247 row_offset += to_read
4248 }
4249 }
4250 }
4251 }
4252
4253 if let Some(last_start) = last_start.take() {
4254 expected_batches.push(column.slice(last_start, row_offset - last_start))
4255 }
4256
4257 for batch in &expected_batches[..expected_batches.len() - 1] {
4259 assert_eq!(batch.num_rows(), batch_size);
4260 }
4261
4262 expected_batches
4263 }
4264
4265 fn create_test_selection(
4266 step_len: usize,
4267 total_len: usize,
4268 skip_first: bool,
4269 ) -> (RowSelection, usize) {
4270 let mut remaining = total_len;
4271 let mut skip = skip_first;
4272 let mut vec = vec![];
4273 let mut selected_count = 0;
4274 while remaining != 0 {
4275 let step = if remaining > step_len {
4276 step_len
4277 } else {
4278 remaining
4279 };
4280 vec.push(RowSelector {
4281 row_count: step,
4282 skip,
4283 });
4284 remaining -= step;
4285 if !skip {
4286 selected_count += step;
4287 }
4288 skip = !skip;
4289 }
4290 (vec.into(), selected_count)
4291 }
4292
4293 #[test]
4294 fn test_scan_row_with_selection() {
4295 let testdata = arrow::util::test_util::parquet_test_data();
4296 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4297 let test_file = File::open(&path).unwrap();
4298
4299 let mut serial_reader =
4300 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4301 let data = serial_reader.next().unwrap().unwrap();
4302
4303 let do_test = |batch_size: usize, selection_len: usize| {
4304 for skip_first in [false, true] {
4305 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4306
4307 let expected = get_expected_batches(&data, &selections, batch_size);
4308 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4309 assert_eq!(
4310 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4311 expected,
4312 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4313 );
4314 }
4315 };
4316
4317 do_test(1000, 1000);
4320
4321 do_test(20, 20);
4323
4324 do_test(20, 5);
4326
4327 do_test(20, 5);
4330
4331 fn create_skip_reader(
4332 test_file: &File,
4333 batch_size: usize,
4334 selections: RowSelection,
4335 ) -> ParquetRecordBatchReader {
4336 let options = ArrowReaderOptions::new().with_page_index(true);
4337 let file = test_file.try_clone().unwrap();
4338 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4339 .unwrap()
4340 .with_batch_size(batch_size)
4341 .with_row_selection(selections)
4342 .build()
4343 .unwrap()
4344 }
4345 }
4346
4347 #[test]
4348 fn test_batch_size_overallocate() {
4349 let testdata = arrow::util::test_util::parquet_test_data();
4350 let path = format!("{testdata}/alltypes_plain.parquet");
4352 let test_file = File::open(path).unwrap();
4353
4354 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4355 let num_rows = builder.metadata.file_metadata().num_rows();
4356 let reader = builder
4357 .with_batch_size(1024)
4358 .with_projection(ProjectionMask::all())
4359 .build()
4360 .unwrap();
4361 assert_ne!(1024, num_rows);
4362 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4363 }
4364
4365 #[test]
4366 fn test_read_with_page_index_enabled() {
4367 let testdata = arrow::util::test_util::parquet_test_data();
4368
4369 {
4370 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4372 let test_file = File::open(path).unwrap();
4373 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4374 test_file,
4375 ArrowReaderOptions::new().with_page_index(true),
4376 )
4377 .unwrap();
4378 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4379 let reader = builder.build().unwrap();
4380 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4381 assert_eq!(batches.len(), 8);
4382 }
4383
4384 {
4385 let path = format!("{testdata}/alltypes_plain.parquet");
4387 let test_file = File::open(path).unwrap();
4388 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4389 test_file,
4390 ArrowReaderOptions::new().with_page_index(true),
4391 )
4392 .unwrap();
4393 assert!(builder.metadata().offset_index().is_none());
4396 let reader = builder.build().unwrap();
4397 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4398 assert_eq!(batches.len(), 1);
4399 }
4400 }
4401
4402 #[test]
4403 fn test_raw_repetition() {
4404 const MESSAGE_TYPE: &str = "
4405 message Log {
4406 OPTIONAL INT32 eventType;
4407 REPEATED INT32 category;
4408 REPEATED group filter {
4409 OPTIONAL INT32 error;
4410 }
4411 }
4412 ";
4413 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4414 let props = Default::default();
4415
4416 let mut buf = Vec::with_capacity(1024);
4417 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4418 let mut row_group_writer = writer.next_row_group().unwrap();
4419
4420 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4422 col_writer
4423 .typed::<Int32Type>()
4424 .write_batch(&[1], Some(&[1]), None)
4425 .unwrap();
4426 col_writer.close().unwrap();
4427 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4429 col_writer
4430 .typed::<Int32Type>()
4431 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4432 .unwrap();
4433 col_writer.close().unwrap();
4434 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4436 col_writer
4437 .typed::<Int32Type>()
4438 .write_batch(&[1], Some(&[1]), Some(&[0]))
4439 .unwrap();
4440 col_writer.close().unwrap();
4441
4442 let rg_md = row_group_writer.close().unwrap();
4443 assert_eq!(rg_md.num_rows(), 1);
4444 writer.close().unwrap();
4445
4446 let bytes = Bytes::from(buf);
4447
4448 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4449 let full = no_mask.next().unwrap().unwrap();
4450
4451 assert_eq!(full.num_columns(), 3);
4452
4453 for idx in 0..3 {
4454 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4455 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4456 let mut reader = b.with_projection(mask).build().unwrap();
4457 let projected = reader.next().unwrap().unwrap();
4458
4459 assert_eq!(projected.num_columns(), 1);
4460 assert_eq!(full.column(idx), projected.column(0));
4461 }
4462 }
4463
4464 #[test]
4465 fn test_read_lz4_raw() {
4466 let testdata = arrow::util::test_util::parquet_test_data();
4467 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4468 let file = File::open(path).unwrap();
4469
4470 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4471 .unwrap()
4472 .collect::<Result<Vec<_>, _>>()
4473 .unwrap();
4474 assert_eq!(batches.len(), 1);
4475 let batch = &batches[0];
4476
4477 assert_eq!(batch.num_columns(), 3);
4478 assert_eq!(batch.num_rows(), 4);
4479
4480 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4482 assert_eq!(
4483 a.values(),
4484 &[1593604800, 1593604800, 1593604801, 1593604801]
4485 );
4486
4487 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4488 let a: Vec<_> = a.iter().flatten().collect();
4489 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4490
4491 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4492 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4493 }
4494
4495 #[test]
4505 fn test_read_lz4_hadoop_fallback() {
4506 for file in [
4507 "hadoop_lz4_compressed.parquet",
4508 "non_hadoop_lz4_compressed.parquet",
4509 ] {
4510 let testdata = arrow::util::test_util::parquet_test_data();
4511 let path = format!("{testdata}/{file}");
4512 let file = File::open(path).unwrap();
4513 let expected_rows = 4;
4514
4515 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4516 .unwrap()
4517 .collect::<Result<Vec<_>, _>>()
4518 .unwrap();
4519 assert_eq!(batches.len(), 1);
4520 let batch = &batches[0];
4521
4522 assert_eq!(batch.num_columns(), 3);
4523 assert_eq!(batch.num_rows(), expected_rows);
4524
4525 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4526 assert_eq!(
4527 a.values(),
4528 &[1593604800, 1593604800, 1593604801, 1593604801]
4529 );
4530
4531 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4532 let b: Vec<_> = b.iter().flatten().collect();
4533 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4534
4535 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4536 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4537 }
4538 }
4539
4540 #[test]
4541 fn test_read_lz4_hadoop_large() {
4542 let testdata = arrow::util::test_util::parquet_test_data();
4543 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4544 let file = File::open(path).unwrap();
4545 let expected_rows = 10000;
4546
4547 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4548 .unwrap()
4549 .collect::<Result<Vec<_>, _>>()
4550 .unwrap();
4551 assert_eq!(batches.len(), 1);
4552 let batch = &batches[0];
4553
4554 assert_eq!(batch.num_columns(), 1);
4555 assert_eq!(batch.num_rows(), expected_rows);
4556
4557 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4558 let a: Vec<_> = a.iter().flatten().collect();
4559 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4560 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4561 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4562 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4563 }
4564
4565 #[test]
4566 #[cfg(feature = "snap")]
4567 fn test_read_nested_lists() {
4568 let testdata = arrow::util::test_util::parquet_test_data();
4569 let path = format!("{testdata}/nested_lists.snappy.parquet");
4570 let file = File::open(path).unwrap();
4571
4572 let f = file.try_clone().unwrap();
4573 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4574 let expected = reader.next().unwrap().unwrap();
4575 assert_eq!(expected.num_rows(), 3);
4576
4577 let selection = RowSelection::from(vec![
4578 RowSelector::skip(1),
4579 RowSelector::select(1),
4580 RowSelector::skip(1),
4581 ]);
4582 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4583 .unwrap()
4584 .with_row_selection(selection)
4585 .build()
4586 .unwrap();
4587
4588 let actual = reader.next().unwrap().unwrap();
4589 assert_eq!(actual.num_rows(), 1);
4590 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4591 }
4592
4593 #[test]
4594 fn test_arbitrary_decimal() {
4595 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4596 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4597 .with_precision_and_scale(19, 0)
4598 .unwrap();
4599 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4600 .with_precision_and_scale(12, 0)
4601 .unwrap();
4602 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4603 .with_precision_and_scale(17, 10)
4604 .unwrap();
4605
4606 let written = RecordBatch::try_from_iter([
4607 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4608 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4609 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4610 ])
4611 .unwrap();
4612
4613 let mut buffer = Vec::with_capacity(1024);
4614 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4615 writer.write(&written).unwrap();
4616 writer.close().unwrap();
4617
4618 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4619 .unwrap()
4620 .collect::<Result<Vec<_>, _>>()
4621 .unwrap();
4622
4623 assert_eq!(&written.slice(0, 8), &read[0]);
4624 }
4625
4626 #[test]
4627 fn test_list_skip() {
4628 let mut list = ListBuilder::new(Int32Builder::new());
4629 list.append_value([Some(1), Some(2)]);
4630 list.append_value([Some(3)]);
4631 list.append_value([Some(4)]);
4632 let list = list.finish();
4633 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4634
4635 let props = WriterProperties::builder()
4637 .set_data_page_row_count_limit(1)
4638 .set_write_batch_size(2)
4639 .build();
4640
4641 let mut buffer = Vec::with_capacity(1024);
4642 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4643 writer.write(&batch).unwrap();
4644 writer.close().unwrap();
4645
4646 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4647 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4648 .unwrap()
4649 .with_row_selection(selection.into())
4650 .build()
4651 .unwrap();
4652 let out = reader.next().unwrap().unwrap();
4653 assert_eq!(out.num_rows(), 1);
4654 assert_eq!(out, batch.slice(2, 1));
4655 }
4656
4657 fn test_decimal32_roundtrip() {
4658 let d = |values: Vec<i32>, p: u8| {
4659 let iter = values.into_iter();
4660 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4661 .with_precision_and_scale(p, 2)
4662 .unwrap()
4663 };
4664
4665 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4666 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4667
4668 let mut buffer = Vec::with_capacity(1024);
4669 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4670 writer.write(&batch).unwrap();
4671 writer.close().unwrap();
4672
4673 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4674 let t1 = builder.parquet_schema().columns()[0].physical_type();
4675 assert_eq!(t1, PhysicalType::INT32);
4676
4677 let mut reader = builder.build().unwrap();
4678 assert_eq!(batch.schema(), reader.schema());
4679
4680 let out = reader.next().unwrap().unwrap();
4681 assert_eq!(batch, out);
4682 }
4683
4684 fn test_decimal64_roundtrip() {
4685 let d = |values: Vec<i64>, p: u8| {
4689 let iter = values.into_iter();
4690 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4691 .with_precision_and_scale(p, 2)
4692 .unwrap()
4693 };
4694
4695 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4696 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4697 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4698
4699 let batch = RecordBatch::try_from_iter([
4700 ("d1", Arc::new(d1) as ArrayRef),
4701 ("d2", Arc::new(d2) as ArrayRef),
4702 ("d3", Arc::new(d3) as ArrayRef),
4703 ])
4704 .unwrap();
4705
4706 let mut buffer = Vec::with_capacity(1024);
4707 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4708 writer.write(&batch).unwrap();
4709 writer.close().unwrap();
4710
4711 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4712 let t1 = builder.parquet_schema().columns()[0].physical_type();
4713 assert_eq!(t1, PhysicalType::INT32);
4714 let t2 = builder.parquet_schema().columns()[1].physical_type();
4715 assert_eq!(t2, PhysicalType::INT64);
4716 let t3 = builder.parquet_schema().columns()[2].physical_type();
4717 assert_eq!(t3, PhysicalType::INT64);
4718
4719 let mut reader = builder.build().unwrap();
4720 assert_eq!(batch.schema(), reader.schema());
4721
4722 let out = reader.next().unwrap().unwrap();
4723 assert_eq!(batch, out);
4724 }
4725
4726 fn test_decimal_roundtrip<T: DecimalType>() {
4727 let d = |values: Vec<usize>, p: u8| {
4732 let iter = values.into_iter().map(T::Native::usize_as);
4733 PrimitiveArray::<T>::from_iter_values(iter)
4734 .with_precision_and_scale(p, 2)
4735 .unwrap()
4736 };
4737
4738 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4739 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4740 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4741 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4742
4743 let batch = RecordBatch::try_from_iter([
4744 ("d1", Arc::new(d1) as ArrayRef),
4745 ("d2", Arc::new(d2) as ArrayRef),
4746 ("d3", Arc::new(d3) as ArrayRef),
4747 ("d4", Arc::new(d4) as ArrayRef),
4748 ])
4749 .unwrap();
4750
4751 let mut buffer = Vec::with_capacity(1024);
4752 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4753 writer.write(&batch).unwrap();
4754 writer.close().unwrap();
4755
4756 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4757 let t1 = builder.parquet_schema().columns()[0].physical_type();
4758 assert_eq!(t1, PhysicalType::INT32);
4759 let t2 = builder.parquet_schema().columns()[1].physical_type();
4760 assert_eq!(t2, PhysicalType::INT64);
4761 let t3 = builder.parquet_schema().columns()[2].physical_type();
4762 assert_eq!(t3, PhysicalType::INT64);
4763 let t4 = builder.parquet_schema().columns()[3].physical_type();
4764 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4765
4766 let mut reader = builder.build().unwrap();
4767 assert_eq!(batch.schema(), reader.schema());
4768
4769 let out = reader.next().unwrap().unwrap();
4770 assert_eq!(batch, out);
4771 }
4772
4773 #[test]
4774 fn test_decimal() {
4775 test_decimal32_roundtrip();
4776 test_decimal64_roundtrip();
4777 test_decimal_roundtrip::<Decimal128Type>();
4778 test_decimal_roundtrip::<Decimal256Type>();
4779 }
4780
4781 #[test]
4782 fn test_list_selection() {
4783 let schema = Arc::new(Schema::new(vec![Field::new_list(
4784 "list",
4785 Field::new_list_field(ArrowDataType::Utf8, true),
4786 false,
4787 )]));
4788 let mut buf = Vec::with_capacity(1024);
4789
4790 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4791
4792 for i in 0..2 {
4793 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4794 for j in 0..1024 {
4795 list_a_builder.values().append_value(format!("{i} {j}"));
4796 list_a_builder.append(true);
4797 }
4798 let batch =
4799 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4800 .unwrap();
4801 writer.write(&batch).unwrap();
4802 }
4803 let _metadata = writer.close().unwrap();
4804
4805 let buf = Bytes::from(buf);
4806 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4807 .unwrap()
4808 .with_row_selection(RowSelection::from(vec![
4809 RowSelector::skip(100),
4810 RowSelector::select(924),
4811 RowSelector::skip(100),
4812 RowSelector::select(924),
4813 ]))
4814 .build()
4815 .unwrap();
4816
4817 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4818 let batch = concat_batches(&schema, &batches).unwrap();
4819
4820 assert_eq!(batch.num_rows(), 924 * 2);
4821 let list = batch.column(0).as_list::<i32>();
4822
4823 for w in list.value_offsets().windows(2) {
4824 assert_eq!(w[0] + 1, w[1])
4825 }
4826 let mut values = list.values().as_string::<i32>().iter();
4827
4828 for i in 0..2 {
4829 for j in 100..1024 {
4830 let expected = format!("{i} {j}");
4831 assert_eq!(values.next().unwrap().unwrap(), &expected);
4832 }
4833 }
4834 }
4835
4836 #[test]
4837 fn test_list_selection_fuzz() {
4838 let mut rng = rng();
4839 let schema = Arc::new(Schema::new(vec![Field::new_list(
4840 "list",
4841 Field::new_list(
4842 Field::LIST_FIELD_DEFAULT_NAME,
4843 Field::new_list_field(ArrowDataType::Int32, true),
4844 true,
4845 ),
4846 true,
4847 )]));
4848 let mut buf = Vec::with_capacity(1024);
4849 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4850
4851 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4852
4853 for _ in 0..2048 {
4854 if rng.random_bool(0.2) {
4855 list_a_builder.append(false);
4856 continue;
4857 }
4858
4859 let list_a_len = rng.random_range(0..10);
4860 let list_b_builder = list_a_builder.values();
4861
4862 for _ in 0..list_a_len {
4863 if rng.random_bool(0.2) {
4864 list_b_builder.append(false);
4865 continue;
4866 }
4867
4868 let list_b_len = rng.random_range(0..10);
4869 let int_builder = list_b_builder.values();
4870 for _ in 0..list_b_len {
4871 match rng.random_bool(0.2) {
4872 true => int_builder.append_null(),
4873 false => int_builder.append_value(rng.random()),
4874 }
4875 }
4876 list_b_builder.append(true)
4877 }
4878 list_a_builder.append(true);
4879 }
4880
4881 let array = Arc::new(list_a_builder.finish());
4882 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4883
4884 writer.write(&batch).unwrap();
4885 let _metadata = writer.close().unwrap();
4886
4887 let buf = Bytes::from(buf);
4888
4889 let cases = [
4890 vec![
4891 RowSelector::skip(100),
4892 RowSelector::select(924),
4893 RowSelector::skip(100),
4894 RowSelector::select(924),
4895 ],
4896 vec![
4897 RowSelector::select(924),
4898 RowSelector::skip(100),
4899 RowSelector::select(924),
4900 RowSelector::skip(100),
4901 ],
4902 vec![
4903 RowSelector::skip(1023),
4904 RowSelector::select(1),
4905 RowSelector::skip(1023),
4906 RowSelector::select(1),
4907 ],
4908 vec![
4909 RowSelector::select(1),
4910 RowSelector::skip(1023),
4911 RowSelector::select(1),
4912 RowSelector::skip(1023),
4913 ],
4914 ];
4915
4916 for batch_size in [100, 1024, 2048] {
4917 for selection in &cases {
4918 let selection = RowSelection::from(selection.clone());
4919 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4920 .unwrap()
4921 .with_row_selection(selection.clone())
4922 .with_batch_size(batch_size)
4923 .build()
4924 .unwrap();
4925
4926 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4927 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4928 assert_eq!(actual.num_rows(), selection.row_count());
4929
4930 let mut batch_offset = 0;
4931 let mut actual_offset = 0;
4932 for selector in selection.iter() {
4933 if selector.skip {
4934 batch_offset += selector.row_count;
4935 continue;
4936 }
4937
4938 assert_eq!(
4939 batch.slice(batch_offset, selector.row_count),
4940 actual.slice(actual_offset, selector.row_count)
4941 );
4942
4943 batch_offset += selector.row_count;
4944 actual_offset += selector.row_count;
4945 }
4946 }
4947 }
4948 }
4949
4950 #[test]
4951 fn test_read_old_nested_list() {
4952 use arrow::datatypes::DataType;
4953 use arrow::datatypes::ToByteSlice;
4954
4955 let testdata = arrow::util::test_util::parquet_test_data();
4956 let path = format!("{testdata}/old_list_structure.parquet");
4965 let test_file = File::open(path).unwrap();
4966
4967 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4969
4970 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4972
4973 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4975 "array",
4976 DataType::Int32,
4977 false,
4978 ))))
4979 .len(2)
4980 .add_buffer(a_value_offsets)
4981 .add_child_data(a_values.into_data())
4982 .build()
4983 .unwrap();
4984 let a = ListArray::from(a_list_data);
4985
4986 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4987 let mut reader = builder.build().unwrap();
4988 let out = reader.next().unwrap().unwrap();
4989 assert_eq!(out.num_rows(), 1);
4990 assert_eq!(out.num_columns(), 1);
4991 let c0 = out.column(0);
4993 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4994 let r0 = c0arr.value(0);
4996 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4997 assert_eq!(r0arr, &a);
4998 }
4999
5000 #[test]
5001 fn test_map_no_value() {
5002 let testdata = arrow::util::test_util::parquet_test_data();
5022 let path = format!("{testdata}/map_no_value.parquet");
5023 let file = File::open(path).unwrap();
5024
5025 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5026 .unwrap()
5027 .build()
5028 .unwrap();
5029 let out = reader.next().unwrap().unwrap();
5030 assert_eq!(out.num_rows(), 3);
5031 assert_eq!(out.num_columns(), 3);
5032 let c0 = out.column(1).as_list::<i32>();
5034 let c1 = out.column(2).as_list::<i32>();
5035 assert_eq!(c0.len(), c1.len());
5036 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5037 }
5038
5039 #[test]
5040 fn test_get_row_group_column_bloom_filter_with_length() {
5041 let testdata = arrow::util::test_util::parquet_test_data();
5043 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5044 let file = File::open(path).unwrap();
5045 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5046 let schema = builder.schema().clone();
5047 let reader = builder.build().unwrap();
5048
5049 let mut parquet_data = Vec::new();
5050 let props = WriterProperties::builder()
5051 .set_bloom_filter_enabled(true)
5052 .build();
5053 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5054 for batch in reader {
5055 let batch = batch.unwrap();
5056 writer.write(&batch).unwrap();
5057 }
5058 writer.close().unwrap();
5059
5060 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5062 }
5063
5064 #[test]
5065 fn test_get_row_group_column_bloom_filter_without_length() {
5066 let testdata = arrow::util::test_util::parquet_test_data();
5067 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5068 let data = Bytes::from(std::fs::read(path).unwrap());
5069 test_get_row_group_column_bloom_filter(data, false);
5070 }
5071
5072 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5073 let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5074
5075 let metadata = builder.metadata();
5076 assert_eq!(metadata.num_row_groups(), 1);
5077 let row_group = metadata.row_group(0);
5078 let column = row_group.column(0);
5079 assert_eq!(column.bloom_filter_length().is_some(), with_length);
5080
5081 let sbbf = builder
5082 .get_row_group_column_bloom_filter(0, 0)
5083 .unwrap()
5084 .unwrap();
5085 assert!(sbbf.check(&"Hello"));
5086 assert!(!sbbf.check(&"Hello_Not_Exists"));
5087 }
5088
5089 #[test]
5090 fn test_read_unknown_logical_type() {
5091 let testdata = arrow::util::test_util::parquet_test_data();
5092 let path = format!("{testdata}/unknown-logical-type.parquet");
5093 let test_file = File::open(path).unwrap();
5094
5095 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5096 .expect("Error creating reader builder");
5097
5098 let schema = builder.metadata().file_metadata().schema_descr();
5099 assert_eq!(
5100 schema.column(0).logical_type_ref(),
5101 Some(&LogicalType::String)
5102 );
5103 assert_eq!(
5104 schema.column(1).logical_type_ref(),
5105 Some(&LogicalType::_Unknown { field_id: 2555 })
5106 );
5107 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5108
5109 let mut reader = builder.build().unwrap();
5110 let out = reader.next().unwrap().unwrap();
5111 assert_eq!(out.num_rows(), 3);
5112 assert_eq!(out.num_columns(), 2);
5113 }
5114}