1use arrow_array::Array;
21use arrow_array::cast::AsArray;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields};
32use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
33use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
34use crate::bloom_filter::{
35 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
36};
37use crate::column::page::{PageIterator, PageReader};
38#[cfg(feature = "encryption")]
39use crate::encryption::decrypt::FileDecryptionProperties;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
42use crate::file::reader::{ChunkReader, SerializedPageReader};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54pub struct ArrowReaderBuilder<T> {
102 pub(crate) input: T,
103
104 pub(crate) metadata: Arc<ParquetMetaData>,
105
106 pub(crate) schema: SchemaRef,
107
108 pub(crate) fields: Option<Arc<ParquetField>>,
109
110 pub(crate) batch_size: usize,
111
112 pub(crate) row_groups: Option<Vec<usize>>,
113
114 pub(crate) projection: ProjectionMask,
115
116 pub(crate) filter: Option<RowFilter>,
117
118 pub(crate) selection: Option<RowSelection>,
119
120 pub(crate) limit: Option<usize>,
121
122 pub(crate) offset: Option<usize>,
123
124 pub(crate) metrics: ArrowReaderMetrics,
125
126 pub(crate) max_predicate_cache_size: usize,
127}
128
129impl<T: Debug> Debug for ArrowReaderBuilder<T> {
130 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("ArrowReaderBuilder<T>")
132 .field("input", &self.input)
133 .field("metadata", &self.metadata)
134 .field("schema", &self.schema)
135 .field("fields", &self.fields)
136 .field("batch_size", &self.batch_size)
137 .field("row_groups", &self.row_groups)
138 .field("projection", &self.projection)
139 .field("filter", &self.filter)
140 .field("selection", &self.selection)
141 .field("limit", &self.limit)
142 .field("offset", &self.offset)
143 .field("metrics", &self.metrics)
144 .finish()
145 }
146}
147
148impl<T> ArrowReaderBuilder<T> {
149 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
150 Self {
151 input,
152 metadata: metadata.metadata,
153 schema: metadata.schema,
154 fields: metadata.fields,
155 batch_size: 1024,
156 row_groups: None,
157 projection: ProjectionMask::all(),
158 filter: None,
159 selection: None,
160 limit: None,
161 offset: None,
162 metrics: ArrowReaderMetrics::Disabled,
163 max_predicate_cache_size: 100 * 1024 * 1024, }
165 }
166
167 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
169 &self.metadata
170 }
171
172 pub fn parquet_schema(&self) -> &SchemaDescriptor {
174 self.metadata.file_metadata().schema_descr()
175 }
176
177 pub fn schema(&self) -> &SchemaRef {
179 &self.schema
180 }
181
182 pub fn with_batch_size(self, batch_size: usize) -> Self {
185 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
187 Self { batch_size, ..self }
188 }
189
190 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
194 Self {
195 row_groups: Some(row_groups),
196 ..self
197 }
198 }
199
200 pub fn with_projection(self, mask: ProjectionMask) -> Self {
202 Self {
203 projection: mask,
204 ..self
205 }
206 }
207
208 pub fn with_row_selection(self, selection: RowSelection) -> Self {
268 Self {
269 selection: Some(selection),
270 ..self
271 }
272 }
273
274 pub fn with_row_filter(self, filter: RowFilter) -> Self {
281 Self {
282 filter: Some(filter),
283 ..self
284 }
285 }
286
287 pub fn with_limit(self, limit: usize) -> Self {
295 Self {
296 limit: Some(limit),
297 ..self
298 }
299 }
300
301 pub fn with_offset(self, offset: usize) -> Self {
309 Self {
310 offset: Some(offset),
311 ..self
312 }
313 }
314
315 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
350 Self { metrics, ..self }
351 }
352
353 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
368 Self {
369 max_predicate_cache_size,
370 ..self
371 }
372 }
373}
374
375#[derive(Debug, Clone, Default)]
380pub struct ArrowReaderOptions {
381 skip_arrow_metadata: bool,
383 supplied_schema: Option<SchemaRef>,
388 pub(crate) page_index_policy: PageIndexPolicy,
390 #[cfg(feature = "encryption")]
392 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
393}
394
395impl ArrowReaderOptions {
396 pub fn new() -> Self {
398 Self::default()
399 }
400
401 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
408 Self {
409 skip_arrow_metadata,
410 ..self
411 }
412 }
413
414 pub fn with_schema(self, schema: SchemaRef) -> Self {
471 Self {
472 supplied_schema: Some(schema),
473 skip_arrow_metadata: true,
474 ..self
475 }
476 }
477
478 pub fn with_page_index(self, page_index: bool) -> Self {
491 let page_index_policy = PageIndexPolicy::from(page_index);
492
493 Self {
494 page_index_policy,
495 ..self
496 }
497 }
498
499 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
501 Self {
502 page_index_policy: policy,
503 ..self
504 }
505 }
506
507 #[cfg(feature = "encryption")]
511 pub fn with_file_decryption_properties(
512 self,
513 file_decryption_properties: Arc<FileDecryptionProperties>,
514 ) -> Self {
515 Self {
516 file_decryption_properties: Some(file_decryption_properties),
517 ..self
518 }
519 }
520
521 pub fn page_index(&self) -> bool {
525 self.page_index_policy != PageIndexPolicy::Skip
526 }
527
528 #[cfg(feature = "encryption")]
533 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
534 self.file_decryption_properties.as_ref()
535 }
536}
537
538#[derive(Debug, Clone)]
553pub struct ArrowReaderMetadata {
554 pub(crate) metadata: Arc<ParquetMetaData>,
556 pub(crate) schema: SchemaRef,
558
559 pub(crate) fields: Option<Arc<ParquetField>>,
560}
561
562impl ArrowReaderMetadata {
563 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
574 let metadata =
575 ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
576 #[cfg(feature = "encryption")]
577 let metadata = metadata.with_decryption_properties(
578 options.file_decryption_properties.as_ref().map(Arc::clone),
579 );
580 let metadata = metadata.parse_and_finish(reader)?;
581 Self::try_new(Arc::new(metadata), options)
582 }
583
584 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
591 match options.supplied_schema {
592 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
593 None => {
594 let kv_metadata = match options.skip_arrow_metadata {
595 true => None,
596 false => metadata.file_metadata().key_value_metadata(),
597 };
598
599 let (schema, fields) = parquet_to_arrow_schema_and_fields(
600 metadata.file_metadata().schema_descr(),
601 ProjectionMask::all(),
602 kv_metadata,
603 )?;
604
605 Ok(Self {
606 metadata,
607 schema: Arc::new(schema),
608 fields: fields.map(Arc::new),
609 })
610 }
611 }
612 }
613
614 fn with_supplied_schema(
615 metadata: Arc<ParquetMetaData>,
616 supplied_schema: SchemaRef,
617 ) -> Result<Self> {
618 let parquet_schema = metadata.file_metadata().schema_descr();
619 let field_levels = parquet_to_arrow_field_levels(
620 parquet_schema,
621 ProjectionMask::all(),
622 Some(supplied_schema.fields()),
623 )?;
624 let fields = field_levels.fields;
625 let inferred_len = fields.len();
626 let supplied_len = supplied_schema.fields().len();
627 if inferred_len != supplied_len {
631 return Err(arrow_err!(format!(
632 "Incompatible supplied Arrow schema: expected {} columns received {}",
633 inferred_len, supplied_len
634 )));
635 }
636
637 let mut errors = Vec::new();
638
639 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
640
641 for (field1, field2) in field_iter {
642 if field1.data_type() != field2.data_type() {
643 errors.push(format!(
644 "data type mismatch for field {}: requested {} but found {}",
645 field1.name(),
646 field1.data_type(),
647 field2.data_type()
648 ));
649 }
650 if field1.is_nullable() != field2.is_nullable() {
651 errors.push(format!(
652 "nullability mismatch for field {}: expected {:?} but found {:?}",
653 field1.name(),
654 field1.is_nullable(),
655 field2.is_nullable()
656 ));
657 }
658 if field1.metadata() != field2.metadata() {
659 errors.push(format!(
660 "metadata mismatch for field {}: expected {:?} but found {:?}",
661 field1.name(),
662 field1.metadata(),
663 field2.metadata()
664 ));
665 }
666 }
667
668 if !errors.is_empty() {
669 let message = errors.join(", ");
670 return Err(ParquetError::ArrowError(format!(
671 "Incompatible supplied Arrow schema: {message}",
672 )));
673 }
674
675 Ok(Self {
676 metadata,
677 schema: supplied_schema,
678 fields: field_levels.levels.map(Arc::new),
679 })
680 }
681
682 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
684 &self.metadata
685 }
686
687 pub fn parquet_schema(&self) -> &SchemaDescriptor {
689 self.metadata.file_metadata().schema_descr()
690 }
691
692 pub fn schema(&self) -> &SchemaRef {
694 &self.schema
695 }
696}
697
698#[doc(hidden)]
699pub struct SyncReader<T: ChunkReader>(T);
701
702impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
703 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
704 f.debug_tuple("SyncReader").field(&self.0).finish()
705 }
706}
707
708pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
714
715impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
716 pub fn try_new(reader: T) -> Result<Self> {
745 Self::try_new_with_options(reader, Default::default())
746 }
747
748 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
750 let metadata = ArrowReaderMetadata::load(&reader, options)?;
751 Ok(Self::new_with_metadata(reader, metadata))
752 }
753
754 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
792 Self::new_builder(SyncReader(input), metadata)
793 }
794
795 pub fn get_row_group_column_bloom_filter(
801 &self,
802 row_group_idx: usize,
803 column_idx: usize,
804 ) -> Result<Option<Sbbf>> {
805 let metadata = self.metadata.row_group(row_group_idx);
806 let column_metadata = metadata.column(column_idx);
807
808 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
809 offset
810 .try_into()
811 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
812 } else {
813 return Ok(None);
814 };
815
816 let buffer = match column_metadata.bloom_filter_length() {
817 Some(length) => self.input.0.get_bytes(offset, length as usize),
818 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
819 }?;
820
821 let (header, bitset_offset) =
822 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
823
824 match header.algorithm {
825 BloomFilterAlgorithm::BLOCK => {
826 }
828 }
829 match header.compression {
830 BloomFilterCompression::UNCOMPRESSED => {
831 }
833 }
834 match header.hash {
835 BloomFilterHash::XXHASH => {
836 }
838 }
839
840 let bitset = match column_metadata.bloom_filter_length() {
841 Some(_) => buffer.slice(
842 (TryInto::<usize>::try_into(bitset_offset).unwrap()
843 - TryInto::<usize>::try_into(offset).unwrap())..,
844 ),
845 None => {
846 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
847 ParquetError::General("Bloom filter length is invalid".to_string())
848 })?;
849 self.input.0.get_bytes(bitset_offset, bitset_length)?
850 }
851 };
852 Ok(Some(Sbbf::new(&bitset)))
853 }
854
855 pub fn build(self) -> Result<ParquetRecordBatchReader> {
859 let Self {
860 input,
861 metadata,
862 schema: _,
863 fields,
864 batch_size: _,
865 row_groups,
866 projection,
867 mut filter,
868 selection,
869 limit,
870 offset,
871 metrics,
872 max_predicate_cache_size: _,
874 } = self;
875
876 let batch_size = self
878 .batch_size
879 .min(metadata.file_metadata().num_rows() as usize);
880
881 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
882
883 let reader = ReaderRowGroups {
884 reader: Arc::new(input.0),
885 metadata,
886 row_groups,
887 };
888
889 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
890
891 if let Some(filter) = filter.as_mut() {
893 for predicate in filter.predicates.iter_mut() {
894 if !plan_builder.selects_any() {
896 break;
897 }
898
899 let mut cache_projection = predicate.projection().clone();
900 cache_projection.intersect(&projection);
901
902 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
903 .build_array_reader(fields.as_deref(), predicate.projection())?;
904
905 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
906 }
907 }
908
909 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
910 .build_array_reader(fields.as_deref(), &projection)?;
911
912 let read_plan = plan_builder
913 .limited(reader.num_rows())
914 .with_offset(offset)
915 .with_limit(limit)
916 .build_limited()
917 .build();
918
919 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
920 }
921}
922
923struct ReaderRowGroups<T: ChunkReader> {
924 reader: Arc<T>,
925
926 metadata: Arc<ParquetMetaData>,
927 row_groups: Vec<usize>,
929}
930
931impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
932 fn num_rows(&self) -> usize {
933 let meta = self.metadata.row_groups();
934 self.row_groups
935 .iter()
936 .map(|x| meta[*x].num_rows() as usize)
937 .sum()
938 }
939
940 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
941 Ok(Box::new(ReaderPageIterator {
942 column_idx: i,
943 reader: self.reader.clone(),
944 metadata: self.metadata.clone(),
945 row_groups: self.row_groups.clone().into_iter(),
946 }))
947 }
948}
949
950struct ReaderPageIterator<T: ChunkReader> {
951 reader: Arc<T>,
952 column_idx: usize,
953 row_groups: std::vec::IntoIter<usize>,
954 metadata: Arc<ParquetMetaData>,
955}
956
957impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
958 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
960 let rg = self.metadata.row_group(rg_idx);
961 let column_chunk_metadata = rg.column(self.column_idx);
962 let offset_index = self.metadata.offset_index();
963 let page_locations = offset_index
966 .filter(|i| !i[rg_idx].is_empty())
967 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
968 let total_rows = rg.num_rows() as usize;
969 let reader = self.reader.clone();
970
971 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
972 .add_crypto_context(
973 rg_idx,
974 self.column_idx,
975 self.metadata.as_ref(),
976 column_chunk_metadata,
977 )
978 }
979}
980
981impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
982 type Item = Result<Box<dyn PageReader>>;
983
984 fn next(&mut self) -> Option<Self::Item> {
985 let rg_idx = self.row_groups.next()?;
986 let page_reader = self
987 .next_page_reader(rg_idx)
988 .map(|page_reader| Box::new(page_reader) as _);
989 Some(page_reader)
990 }
991}
992
993impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
994
995pub struct ParquetRecordBatchReader {
1002 array_reader: Box<dyn ArrayReader>,
1003 schema: SchemaRef,
1004 read_plan: ReadPlan,
1005}
1006
1007impl Debug for ParquetRecordBatchReader {
1008 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1009 f.debug_struct("ParquetRecordBatchReader")
1010 .field("array_reader", &"...")
1011 .field("schema", &self.schema)
1012 .field("read_plan", &self.read_plan)
1013 .finish()
1014 }
1015}
1016
1017impl Iterator for ParquetRecordBatchReader {
1018 type Item = Result<RecordBatch, ArrowError>;
1019
1020 fn next(&mut self) -> Option<Self::Item> {
1021 self.next_inner()
1022 .map_err(|arrow_err| arrow_err.into())
1023 .transpose()
1024 }
1025}
1026
1027impl ParquetRecordBatchReader {
1028 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1034 let mut read_records = 0;
1035 let batch_size = self.batch_size();
1036 match self.read_plan.selection_mut() {
1037 Some(selection) => {
1038 while read_records < batch_size && !selection.is_empty() {
1039 let front = selection.pop_front().unwrap();
1040 if front.skip {
1041 let skipped = self.array_reader.skip_records(front.row_count)?;
1042
1043 if skipped != front.row_count {
1044 return Err(general_err!(
1045 "failed to skip rows, expected {}, got {}",
1046 front.row_count,
1047 skipped
1048 ));
1049 }
1050 continue;
1051 }
1052
1053 if front.row_count == 0 {
1056 continue;
1057 }
1058
1059 let need_read = batch_size - read_records;
1061 let to_read = match front.row_count.checked_sub(need_read) {
1062 Some(remaining) if remaining != 0 => {
1063 selection.push_front(RowSelector::select(remaining));
1066 need_read
1067 }
1068 _ => front.row_count,
1069 };
1070 match self.array_reader.read_records(to_read)? {
1071 0 => break,
1072 rec => read_records += rec,
1073 };
1074 }
1075 }
1076 None => {
1077 self.array_reader.read_records(batch_size)?;
1078 }
1079 };
1080
1081 let array = self.array_reader.consume_batch()?;
1082 let struct_array = array.as_struct_opt().ok_or_else(|| {
1083 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1084 })?;
1085
1086 Ok(if struct_array.len() > 0 {
1087 Some(RecordBatch::from(struct_array))
1088 } else {
1089 None
1090 })
1091 }
1092}
1093
1094impl RecordBatchReader for ParquetRecordBatchReader {
1095 fn schema(&self) -> SchemaRef {
1100 self.schema.clone()
1101 }
1102}
1103
1104impl ParquetRecordBatchReader {
1105 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1109 ParquetRecordBatchReaderBuilder::try_new(reader)?
1110 .with_batch_size(batch_size)
1111 .build()
1112 }
1113
1114 pub fn try_new_with_row_groups(
1119 levels: &FieldLevels,
1120 row_groups: &dyn RowGroups,
1121 batch_size: usize,
1122 selection: Option<RowSelection>,
1123 ) -> Result<Self> {
1124 let metrics = ArrowReaderMetrics::disabled();
1126 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1127 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1128
1129 let read_plan = ReadPlanBuilder::new(batch_size)
1130 .with_selection(selection)
1131 .build();
1132
1133 Ok(Self {
1134 array_reader,
1135 schema: Arc::new(Schema::new(levels.fields.clone())),
1136 read_plan,
1137 })
1138 }
1139
1140 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1144 let schema = match array_reader.get_data_type() {
1145 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1146 _ => unreachable!("Struct array reader's data type is not struct!"),
1147 };
1148
1149 Self {
1150 array_reader,
1151 schema: Arc::new(schema),
1152 read_plan,
1153 }
1154 }
1155
1156 #[inline(always)]
1157 pub(crate) fn batch_size(&self) -> usize {
1158 self.read_plan.batch_size()
1159 }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164 use std::cmp::min;
1165 use std::collections::{HashMap, VecDeque};
1166 use std::fmt::Formatter;
1167 use std::fs::File;
1168 use std::io::Seek;
1169 use std::path::PathBuf;
1170 use std::sync::Arc;
1171
1172 use arrow_array::builder::*;
1173 use arrow_array::cast::AsArray;
1174 use arrow_array::types::{
1175 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1176 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1177 Time64MicrosecondType,
1178 };
1179 use arrow_array::*;
1180 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1181 use arrow_data::{ArrayData, ArrayDataBuilder};
1182 use arrow_schema::{
1183 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1184 };
1185 use arrow_select::concat::concat_batches;
1186 use bytes::Bytes;
1187 use half::f16;
1188 use num_traits::PrimInt;
1189 use rand::{Rng, RngCore, rng};
1190 use tempfile::tempfile;
1191
1192 use crate::arrow::arrow_reader::{
1193 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1194 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1195 };
1196 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1197 use crate::arrow::{ArrowWriter, ProjectionMask};
1198 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1199 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1200 use crate::data_type::{
1201 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1202 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1203 };
1204 use crate::errors::Result;
1205 use crate::file::metadata::ParquetMetaData;
1206 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1207 use crate::file::writer::SerializedFileWriter;
1208 use crate::schema::parser::parse_message_type;
1209 use crate::schema::types::{Type, TypePtr};
1210 use crate::util::test_common::rand_gen::RandGen;
1211
1212 #[test]
1213 fn test_arrow_reader_all_columns() {
1214 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1215
1216 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1217 let original_schema = Arc::clone(builder.schema());
1218 let reader = builder.build().unwrap();
1219
1220 assert_eq!(original_schema.fields(), reader.schema().fields());
1222 }
1223
1224 #[test]
1225 fn test_arrow_reader_single_column() {
1226 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1227
1228 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1229 let original_schema = Arc::clone(builder.schema());
1230
1231 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1232 let reader = builder.with_projection(mask).build().unwrap();
1233
1234 assert_eq!(1, reader.schema().fields().len());
1236 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1237 }
1238
1239 #[test]
1240 fn test_arrow_reader_single_column_by_name() {
1241 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1242
1243 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1244 let original_schema = Arc::clone(builder.schema());
1245
1246 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1247 let reader = builder.with_projection(mask).build().unwrap();
1248
1249 assert_eq!(1, reader.schema().fields().len());
1251 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1252 }
1253
1254 #[test]
1255 fn test_null_column_reader_test() {
1256 let mut file = tempfile::tempfile().unwrap();
1257
1258 let schema = "
1259 message message {
1260 OPTIONAL INT32 int32;
1261 }
1262 ";
1263 let schema = Arc::new(parse_message_type(schema).unwrap());
1264
1265 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1266 generate_single_column_file_with_data::<Int32Type>(
1267 &[vec![], vec![]],
1268 Some(&def_levels),
1269 file.try_clone().unwrap(), schema,
1271 Some(Field::new("int32", ArrowDataType::Null, true)),
1272 &Default::default(),
1273 )
1274 .unwrap();
1275
1276 file.rewind().unwrap();
1277
1278 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1279 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1280
1281 assert_eq!(batches.len(), 4);
1282 for batch in &batches[0..3] {
1283 assert_eq!(batch.num_rows(), 2);
1284 assert_eq!(batch.num_columns(), 1);
1285 assert_eq!(batch.column(0).null_count(), 2);
1286 }
1287
1288 assert_eq!(batches[3].num_rows(), 1);
1289 assert_eq!(batches[3].num_columns(), 1);
1290 assert_eq!(batches[3].column(0).null_count(), 1);
1291 }
1292
1293 #[test]
1294 fn test_primitive_single_column_reader_test() {
1295 run_single_column_reader_tests::<BoolType, _, BoolType>(
1296 2,
1297 ConvertedType::NONE,
1298 None,
1299 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1300 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1301 );
1302 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1303 2,
1304 ConvertedType::NONE,
1305 None,
1306 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1307 &[
1308 Encoding::PLAIN,
1309 Encoding::RLE_DICTIONARY,
1310 Encoding::DELTA_BINARY_PACKED,
1311 Encoding::BYTE_STREAM_SPLIT,
1312 ],
1313 );
1314 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1315 2,
1316 ConvertedType::NONE,
1317 None,
1318 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1319 &[
1320 Encoding::PLAIN,
1321 Encoding::RLE_DICTIONARY,
1322 Encoding::DELTA_BINARY_PACKED,
1323 Encoding::BYTE_STREAM_SPLIT,
1324 ],
1325 );
1326 run_single_column_reader_tests::<FloatType, _, FloatType>(
1327 2,
1328 ConvertedType::NONE,
1329 None,
1330 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1331 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1332 );
1333 }
1334
1335 #[test]
1336 fn test_unsigned_primitive_single_column_reader_test() {
1337 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1338 2,
1339 ConvertedType::UINT_32,
1340 Some(ArrowDataType::UInt32),
1341 |vals| {
1342 Arc::new(UInt32Array::from_iter(
1343 vals.iter().map(|x| x.map(|x| x as u32)),
1344 ))
1345 },
1346 &[
1347 Encoding::PLAIN,
1348 Encoding::RLE_DICTIONARY,
1349 Encoding::DELTA_BINARY_PACKED,
1350 ],
1351 );
1352 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1353 2,
1354 ConvertedType::UINT_64,
1355 Some(ArrowDataType::UInt64),
1356 |vals| {
1357 Arc::new(UInt64Array::from_iter(
1358 vals.iter().map(|x| x.map(|x| x as u64)),
1359 ))
1360 },
1361 &[
1362 Encoding::PLAIN,
1363 Encoding::RLE_DICTIONARY,
1364 Encoding::DELTA_BINARY_PACKED,
1365 ],
1366 );
1367 }
1368
1369 #[test]
1370 fn test_unsigned_roundtrip() {
1371 let schema = Arc::new(Schema::new(vec![
1372 Field::new("uint32", ArrowDataType::UInt32, true),
1373 Field::new("uint64", ArrowDataType::UInt64, true),
1374 ]));
1375
1376 let mut buf = Vec::with_capacity(1024);
1377 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1378
1379 let original = RecordBatch::try_new(
1380 schema,
1381 vec![
1382 Arc::new(UInt32Array::from_iter_values([
1383 0,
1384 i32::MAX as u32,
1385 u32::MAX,
1386 ])),
1387 Arc::new(UInt64Array::from_iter_values([
1388 0,
1389 i64::MAX as u64,
1390 u64::MAX,
1391 ])),
1392 ],
1393 )
1394 .unwrap();
1395
1396 writer.write(&original).unwrap();
1397 writer.close().unwrap();
1398
1399 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1400 let ret = reader.next().unwrap().unwrap();
1401 assert_eq!(ret, original);
1402
1403 ret.column(0)
1405 .as_any()
1406 .downcast_ref::<UInt32Array>()
1407 .unwrap();
1408
1409 ret.column(1)
1410 .as_any()
1411 .downcast_ref::<UInt64Array>()
1412 .unwrap();
1413 }
1414
1415 #[test]
1416 fn test_float16_roundtrip() -> Result<()> {
1417 let schema = Arc::new(Schema::new(vec![
1418 Field::new("float16", ArrowDataType::Float16, false),
1419 Field::new("float16-nullable", ArrowDataType::Float16, true),
1420 ]));
1421
1422 let mut buf = Vec::with_capacity(1024);
1423 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1424
1425 let original = RecordBatch::try_new(
1426 schema,
1427 vec![
1428 Arc::new(Float16Array::from_iter_values([
1429 f16::EPSILON,
1430 f16::MIN,
1431 f16::MAX,
1432 f16::NAN,
1433 f16::INFINITY,
1434 f16::NEG_INFINITY,
1435 f16::ONE,
1436 f16::NEG_ONE,
1437 f16::ZERO,
1438 f16::NEG_ZERO,
1439 f16::E,
1440 f16::PI,
1441 f16::FRAC_1_PI,
1442 ])),
1443 Arc::new(Float16Array::from(vec![
1444 None,
1445 None,
1446 None,
1447 Some(f16::NAN),
1448 Some(f16::INFINITY),
1449 Some(f16::NEG_INFINITY),
1450 None,
1451 None,
1452 None,
1453 None,
1454 None,
1455 None,
1456 Some(f16::FRAC_1_PI),
1457 ])),
1458 ],
1459 )?;
1460
1461 writer.write(&original)?;
1462 writer.close()?;
1463
1464 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1465 let ret = reader.next().unwrap()?;
1466 assert_eq!(ret, original);
1467
1468 ret.column(0).as_primitive::<Float16Type>();
1470 ret.column(1).as_primitive::<Float16Type>();
1471
1472 Ok(())
1473 }
1474
1475 #[test]
1476 fn test_time_utc_roundtrip() -> Result<()> {
1477 let schema = Arc::new(Schema::new(vec![
1478 Field::new(
1479 "time_millis",
1480 ArrowDataType::Time32(TimeUnit::Millisecond),
1481 true,
1482 )
1483 .with_metadata(HashMap::from_iter(vec![(
1484 "adjusted_to_utc".to_string(),
1485 "".to_string(),
1486 )])),
1487 Field::new(
1488 "time_micros",
1489 ArrowDataType::Time64(TimeUnit::Microsecond),
1490 true,
1491 )
1492 .with_metadata(HashMap::from_iter(vec![(
1493 "adjusted_to_utc".to_string(),
1494 "".to_string(),
1495 )])),
1496 ]));
1497
1498 let mut buf = Vec::with_capacity(1024);
1499 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1500
1501 let original = RecordBatch::try_new(
1502 schema,
1503 vec![
1504 Arc::new(Time32MillisecondArray::from(vec![
1505 Some(-1),
1506 Some(0),
1507 Some(86_399_000),
1508 Some(86_400_000),
1509 Some(86_401_000),
1510 None,
1511 ])),
1512 Arc::new(Time64MicrosecondArray::from(vec![
1513 Some(-1),
1514 Some(0),
1515 Some(86_399 * 1_000_000),
1516 Some(86_400 * 1_000_000),
1517 Some(86_401 * 1_000_000),
1518 None,
1519 ])),
1520 ],
1521 )?;
1522
1523 writer.write(&original)?;
1524 writer.close()?;
1525
1526 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1527 let ret = reader.next().unwrap()?;
1528 assert_eq!(ret, original);
1529
1530 ret.column(0).as_primitive::<Time32MillisecondType>();
1532 ret.column(1).as_primitive::<Time64MicrosecondType>();
1533
1534 Ok(())
1535 }
1536
1537 #[test]
1538 fn test_date32_roundtrip() -> Result<()> {
1539 use arrow_array::Date32Array;
1540
1541 let schema = Arc::new(Schema::new(vec![Field::new(
1542 "date32",
1543 ArrowDataType::Date32,
1544 false,
1545 )]));
1546
1547 let mut buf = Vec::with_capacity(1024);
1548
1549 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1550
1551 let original = RecordBatch::try_new(
1552 schema,
1553 vec![Arc::new(Date32Array::from(vec![
1554 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1555 ]))],
1556 )?;
1557
1558 writer.write(&original)?;
1559 writer.close()?;
1560
1561 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1562 let ret = reader.next().unwrap()?;
1563 assert_eq!(ret, original);
1564
1565 ret.column(0).as_primitive::<Date32Type>();
1567
1568 Ok(())
1569 }
1570
1571 #[test]
1572 fn test_date64_roundtrip() -> Result<()> {
1573 use arrow_array::Date64Array;
1574
1575 let schema = Arc::new(Schema::new(vec![
1576 Field::new("small-date64", ArrowDataType::Date64, false),
1577 Field::new("big-date64", ArrowDataType::Date64, false),
1578 Field::new("invalid-date64", ArrowDataType::Date64, false),
1579 ]));
1580
1581 let mut default_buf = Vec::with_capacity(1024);
1582 let mut coerce_buf = Vec::with_capacity(1024);
1583
1584 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1585
1586 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1587 let mut coerce_writer =
1588 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1589
1590 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1591
1592 let original = RecordBatch::try_new(
1593 schema,
1594 vec![
1595 Arc::new(Date64Array::from(vec![
1597 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1598 -1_000 * NUM_MILLISECONDS_IN_DAY,
1599 0,
1600 1_000 * NUM_MILLISECONDS_IN_DAY,
1601 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1602 ])),
1603 Arc::new(Date64Array::from(vec![
1605 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1606 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1607 0,
1608 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1609 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1610 ])),
1611 Arc::new(Date64Array::from(vec![
1613 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1614 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1615 1,
1616 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1617 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1618 ])),
1619 ],
1620 )?;
1621
1622 default_writer.write(&original)?;
1623 coerce_writer.write(&original)?;
1624
1625 default_writer.close()?;
1626 coerce_writer.close()?;
1627
1628 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1629 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1630
1631 let default_ret = default_reader.next().unwrap()?;
1632 let coerce_ret = coerce_reader.next().unwrap()?;
1633
1634 assert_eq!(default_ret, original);
1636
1637 assert_eq!(coerce_ret.column(0), original.column(0));
1639 assert_ne!(coerce_ret.column(1), original.column(1));
1640 assert_ne!(coerce_ret.column(2), original.column(2));
1641
1642 default_ret.column(0).as_primitive::<Date64Type>();
1644 coerce_ret.column(0).as_primitive::<Date64Type>();
1645
1646 Ok(())
1647 }
1648 struct RandFixedLenGen {}
1649
1650 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1651 fn r#gen(len: i32) -> FixedLenByteArray {
1652 let mut v = vec![0u8; len as usize];
1653 rng().fill_bytes(&mut v);
1654 ByteArray::from(v).into()
1655 }
1656 }
1657
1658 #[test]
1659 fn test_fixed_length_binary_column_reader() {
1660 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1661 20,
1662 ConvertedType::NONE,
1663 None,
1664 |vals| {
1665 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1666 for val in vals {
1667 match val {
1668 Some(b) => builder.append_value(b).unwrap(),
1669 None => builder.append_null(),
1670 }
1671 }
1672 Arc::new(builder.finish())
1673 },
1674 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1675 );
1676 }
1677
1678 #[test]
1679 fn test_interval_day_time_column_reader() {
1680 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1681 12,
1682 ConvertedType::INTERVAL,
1683 None,
1684 |vals| {
1685 Arc::new(
1686 vals.iter()
1687 .map(|x| {
1688 x.as_ref().map(|b| IntervalDayTime {
1689 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1690 milliseconds: i32::from_le_bytes(
1691 b.as_ref()[8..12].try_into().unwrap(),
1692 ),
1693 })
1694 })
1695 .collect::<IntervalDayTimeArray>(),
1696 )
1697 },
1698 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1699 );
1700 }
1701
1702 #[test]
1703 fn test_int96_single_column_reader_test() {
1704 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1705
1706 type TypeHintAndConversionFunction =
1707 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1708
1709 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1710 (None, |vals: &[Option<Int96>]| {
1712 Arc::new(TimestampNanosecondArray::from_iter(
1713 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1714 )) as ArrayRef
1715 }),
1716 (
1718 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1719 |vals: &[Option<Int96>]| {
1720 Arc::new(TimestampSecondArray::from_iter(
1721 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1722 )) as ArrayRef
1723 },
1724 ),
1725 (
1726 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1727 |vals: &[Option<Int96>]| {
1728 Arc::new(TimestampMillisecondArray::from_iter(
1729 vals.iter().map(|x| x.map(|x| x.to_millis())),
1730 )) as ArrayRef
1731 },
1732 ),
1733 (
1734 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1735 |vals: &[Option<Int96>]| {
1736 Arc::new(TimestampMicrosecondArray::from_iter(
1737 vals.iter().map(|x| x.map(|x| x.to_micros())),
1738 )) as ArrayRef
1739 },
1740 ),
1741 (
1742 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1743 |vals: &[Option<Int96>]| {
1744 Arc::new(TimestampNanosecondArray::from_iter(
1745 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1746 )) as ArrayRef
1747 },
1748 ),
1749 (
1751 Some(ArrowDataType::Timestamp(
1752 TimeUnit::Second,
1753 Some(Arc::from("-05:00")),
1754 )),
1755 |vals: &[Option<Int96>]| {
1756 Arc::new(
1757 TimestampSecondArray::from_iter(
1758 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1759 )
1760 .with_timezone("-05:00"),
1761 ) as ArrayRef
1762 },
1763 ),
1764 ];
1765
1766 resolutions.iter().for_each(|(arrow_type, converter)| {
1767 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1768 2,
1769 ConvertedType::NONE,
1770 arrow_type.clone(),
1771 converter,
1772 encodings,
1773 );
1774 })
1775 }
1776
1777 #[test]
1778 fn test_int96_from_spark_file_with_provided_schema() {
1779 use arrow_schema::DataType::Timestamp;
1783 let test_data = arrow::util::test_util::parquet_test_data();
1784 let path = format!("{test_data}/int96_from_spark.parquet");
1785 let file = File::open(path).unwrap();
1786
1787 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1788 "a",
1789 Timestamp(TimeUnit::Microsecond, None),
1790 true,
1791 )]));
1792 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1793
1794 let mut record_reader =
1795 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1796 .unwrap()
1797 .build()
1798 .unwrap();
1799
1800 let batch = record_reader.next().unwrap().unwrap();
1801 assert_eq!(batch.num_columns(), 1);
1802 let column = batch.column(0);
1803 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1804
1805 let expected = Arc::new(Int64Array::from(vec![
1806 Some(1704141296123456),
1807 Some(1704070800000000),
1808 Some(253402225200000000),
1809 Some(1735599600000000),
1810 None,
1811 Some(9089380393200000000),
1812 ]));
1813
1814 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1819 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1820
1821 assert_eq!(casted_timestamps.len(), expected.len());
1822
1823 casted_timestamps
1824 .iter()
1825 .zip(expected.iter())
1826 .for_each(|(lhs, rhs)| {
1827 assert_eq!(lhs, rhs);
1828 });
1829 }
1830
1831 #[test]
1832 fn test_int96_from_spark_file_without_provided_schema() {
1833 use arrow_schema::DataType::Timestamp;
1837 let test_data = arrow::util::test_util::parquet_test_data();
1838 let path = format!("{test_data}/int96_from_spark.parquet");
1839 let file = File::open(path).unwrap();
1840
1841 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1842 .unwrap()
1843 .build()
1844 .unwrap();
1845
1846 let batch = record_reader.next().unwrap().unwrap();
1847 assert_eq!(batch.num_columns(), 1);
1848 let column = batch.column(0);
1849 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1850
1851 let expected = Arc::new(Int64Array::from(vec![
1852 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1857 Some(-4864435138808946688), ]));
1859
1860 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1865 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1866
1867 assert_eq!(casted_timestamps.len(), expected.len());
1868
1869 casted_timestamps
1870 .iter()
1871 .zip(expected.iter())
1872 .for_each(|(lhs, rhs)| {
1873 assert_eq!(lhs, rhs);
1874 });
1875 }
1876
1877 struct RandUtf8Gen {}
1878
1879 impl RandGen<ByteArrayType> for RandUtf8Gen {
1880 fn r#gen(len: i32) -> ByteArray {
1881 Int32Type::r#gen(len).to_string().as_str().into()
1882 }
1883 }
1884
1885 #[test]
1886 fn test_utf8_single_column_reader_test() {
1887 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1888 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1889 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1890 })))
1891 }
1892
1893 let encodings = &[
1894 Encoding::PLAIN,
1895 Encoding::RLE_DICTIONARY,
1896 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1897 Encoding::DELTA_BYTE_ARRAY,
1898 ];
1899
1900 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1901 2,
1902 ConvertedType::NONE,
1903 None,
1904 |vals| {
1905 Arc::new(BinaryArray::from_iter(
1906 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1907 ))
1908 },
1909 encodings,
1910 );
1911
1912 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1913 2,
1914 ConvertedType::UTF8,
1915 None,
1916 string_converter::<i32>,
1917 encodings,
1918 );
1919
1920 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1921 2,
1922 ConvertedType::UTF8,
1923 Some(ArrowDataType::Utf8),
1924 string_converter::<i32>,
1925 encodings,
1926 );
1927
1928 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1929 2,
1930 ConvertedType::UTF8,
1931 Some(ArrowDataType::LargeUtf8),
1932 string_converter::<i64>,
1933 encodings,
1934 );
1935
1936 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1937 for key in &small_key_types {
1938 for encoding in encodings {
1939 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1940 opts.encoding = *encoding;
1941
1942 let data_type =
1943 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1944
1945 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1947 opts,
1948 2,
1949 ConvertedType::UTF8,
1950 Some(data_type.clone()),
1951 move |vals| {
1952 let vals = string_converter::<i32>(vals);
1953 arrow::compute::cast(&vals, &data_type).unwrap()
1954 },
1955 );
1956 }
1957 }
1958
1959 let key_types = [
1960 ArrowDataType::Int16,
1961 ArrowDataType::UInt16,
1962 ArrowDataType::Int32,
1963 ArrowDataType::UInt32,
1964 ArrowDataType::Int64,
1965 ArrowDataType::UInt64,
1966 ];
1967
1968 for key in &key_types {
1969 let data_type =
1970 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1971
1972 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1973 2,
1974 ConvertedType::UTF8,
1975 Some(data_type.clone()),
1976 move |vals| {
1977 let vals = string_converter::<i32>(vals);
1978 arrow::compute::cast(&vals, &data_type).unwrap()
1979 },
1980 encodings,
1981 );
1982
1983 }
2000 }
2001
2002 #[test]
2003 fn test_decimal_nullable_struct() {
2004 let decimals = Decimal256Array::from_iter_values(
2005 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2006 );
2007
2008 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2009 "decimals",
2010 decimals.data_type().clone(),
2011 false,
2012 )])))
2013 .len(8)
2014 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2015 .child_data(vec![decimals.into_data()])
2016 .build()
2017 .unwrap();
2018
2019 let written =
2020 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2021 .unwrap();
2022
2023 let mut buffer = Vec::with_capacity(1024);
2024 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2025 writer.write(&written).unwrap();
2026 writer.close().unwrap();
2027
2028 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2029 .unwrap()
2030 .collect::<Result<Vec<_>, _>>()
2031 .unwrap();
2032
2033 assert_eq!(&written.slice(0, 3), &read[0]);
2034 assert_eq!(&written.slice(3, 3), &read[1]);
2035 assert_eq!(&written.slice(6, 2), &read[2]);
2036 }
2037
2038 #[test]
2039 fn test_int32_nullable_struct() {
2040 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2041 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2042 "int32",
2043 int32.data_type().clone(),
2044 false,
2045 )])))
2046 .len(8)
2047 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2048 .child_data(vec![int32.into_data()])
2049 .build()
2050 .unwrap();
2051
2052 let written =
2053 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2054 .unwrap();
2055
2056 let mut buffer = Vec::with_capacity(1024);
2057 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2058 writer.write(&written).unwrap();
2059 writer.close().unwrap();
2060
2061 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2062 .unwrap()
2063 .collect::<Result<Vec<_>, _>>()
2064 .unwrap();
2065
2066 assert_eq!(&written.slice(0, 3), &read[0]);
2067 assert_eq!(&written.slice(3, 3), &read[1]);
2068 assert_eq!(&written.slice(6, 2), &read[2]);
2069 }
2070
2071 #[test]
2072 fn test_decimal_list() {
2073 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2074
2075 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2077 decimals.data_type().clone(),
2078 false,
2079 ))))
2080 .len(7)
2081 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2082 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2083 .child_data(vec![decimals.into_data()])
2084 .build()
2085 .unwrap();
2086
2087 let written =
2088 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2089 .unwrap();
2090
2091 let mut buffer = Vec::with_capacity(1024);
2092 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2093 writer.write(&written).unwrap();
2094 writer.close().unwrap();
2095
2096 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2097 .unwrap()
2098 .collect::<Result<Vec<_>, _>>()
2099 .unwrap();
2100
2101 assert_eq!(&written.slice(0, 3), &read[0]);
2102 assert_eq!(&written.slice(3, 3), &read[1]);
2103 assert_eq!(&written.slice(6, 1), &read[2]);
2104 }
2105
2106 #[test]
2107 fn test_read_decimal_file() {
2108 use arrow_array::Decimal128Array;
2109 let testdata = arrow::util::test_util::parquet_test_data();
2110 let file_variants = vec![
2111 ("byte_array", 4),
2112 ("fixed_length", 25),
2113 ("int32", 4),
2114 ("int64", 10),
2115 ];
2116 for (prefix, target_precision) in file_variants {
2117 let path = format!("{testdata}/{prefix}_decimal.parquet");
2118 let file = File::open(path).unwrap();
2119 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2120
2121 let batch = record_reader.next().unwrap().unwrap();
2122 assert_eq!(batch.num_rows(), 24);
2123 let col = batch
2124 .column(0)
2125 .as_any()
2126 .downcast_ref::<Decimal128Array>()
2127 .unwrap();
2128
2129 let expected = 1..25;
2130
2131 assert_eq!(col.precision(), target_precision);
2132 assert_eq!(col.scale(), 2);
2133
2134 for (i, v) in expected.enumerate() {
2135 assert_eq!(col.value(i), v * 100_i128);
2136 }
2137 }
2138 }
2139
2140 #[test]
2141 fn test_read_float16_nonzeros_file() {
2142 use arrow_array::Float16Array;
2143 let testdata = arrow::util::test_util::parquet_test_data();
2144 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2146 let file = File::open(path).unwrap();
2147 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2148
2149 let batch = record_reader.next().unwrap().unwrap();
2150 assert_eq!(batch.num_rows(), 8);
2151 let col = batch
2152 .column(0)
2153 .as_any()
2154 .downcast_ref::<Float16Array>()
2155 .unwrap();
2156
2157 let f16_two = f16::ONE + f16::ONE;
2158
2159 assert_eq!(col.null_count(), 1);
2160 assert!(col.is_null(0));
2161 assert_eq!(col.value(1), f16::ONE);
2162 assert_eq!(col.value(2), -f16_two);
2163 assert!(col.value(3).is_nan());
2164 assert_eq!(col.value(4), f16::ZERO);
2165 assert!(col.value(4).is_sign_positive());
2166 assert_eq!(col.value(5), f16::NEG_ONE);
2167 assert_eq!(col.value(6), f16::NEG_ZERO);
2168 assert!(col.value(6).is_sign_negative());
2169 assert_eq!(col.value(7), f16_two);
2170 }
2171
2172 #[test]
2173 fn test_read_float16_zeros_file() {
2174 use arrow_array::Float16Array;
2175 let testdata = arrow::util::test_util::parquet_test_data();
2176 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2178 let file = File::open(path).unwrap();
2179 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2180
2181 let batch = record_reader.next().unwrap().unwrap();
2182 assert_eq!(batch.num_rows(), 3);
2183 let col = batch
2184 .column(0)
2185 .as_any()
2186 .downcast_ref::<Float16Array>()
2187 .unwrap();
2188
2189 assert_eq!(col.null_count(), 1);
2190 assert!(col.is_null(0));
2191 assert_eq!(col.value(1), f16::ZERO);
2192 assert!(col.value(1).is_sign_positive());
2193 assert!(col.value(2).is_nan());
2194 }
2195
2196 #[test]
2197 fn test_read_float32_float64_byte_stream_split() {
2198 let path = format!(
2199 "{}/byte_stream_split.zstd.parquet",
2200 arrow::util::test_util::parquet_test_data(),
2201 );
2202 let file = File::open(path).unwrap();
2203 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2204
2205 let mut row_count = 0;
2206 for batch in record_reader {
2207 let batch = batch.unwrap();
2208 row_count += batch.num_rows();
2209 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2210 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2211
2212 for &x in f32_col.values() {
2214 assert!(x > -10.0);
2215 assert!(x < 10.0);
2216 }
2217 for &x in f64_col.values() {
2218 assert!(x > -10.0);
2219 assert!(x < 10.0);
2220 }
2221 }
2222 assert_eq!(row_count, 300);
2223 }
2224
2225 #[test]
2226 fn test_read_extended_byte_stream_split() {
2227 let path = format!(
2228 "{}/byte_stream_split_extended.gzip.parquet",
2229 arrow::util::test_util::parquet_test_data(),
2230 );
2231 let file = File::open(path).unwrap();
2232 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2233
2234 let mut row_count = 0;
2235 for batch in record_reader {
2236 let batch = batch.unwrap();
2237 row_count += batch.num_rows();
2238
2239 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2241 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2242 assert_eq!(f16_col.len(), f16_bss.len());
2243 f16_col
2244 .iter()
2245 .zip(f16_bss.iter())
2246 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2247
2248 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2250 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2251 assert_eq!(f32_col.len(), f32_bss.len());
2252 f32_col
2253 .iter()
2254 .zip(f32_bss.iter())
2255 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2256
2257 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2259 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2260 assert_eq!(f64_col.len(), f64_bss.len());
2261 f64_col
2262 .iter()
2263 .zip(f64_bss.iter())
2264 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2265
2266 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2268 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2269 assert_eq!(i32_col.len(), i32_bss.len());
2270 i32_col
2271 .iter()
2272 .zip(i32_bss.iter())
2273 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2274
2275 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2277 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2278 assert_eq!(i64_col.len(), i64_bss.len());
2279 i64_col
2280 .iter()
2281 .zip(i64_bss.iter())
2282 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2283
2284 let flba_col = batch.column(10).as_fixed_size_binary();
2286 let flba_bss = batch.column(11).as_fixed_size_binary();
2287 assert_eq!(flba_col.len(), flba_bss.len());
2288 flba_col
2289 .iter()
2290 .zip(flba_bss.iter())
2291 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2292
2293 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2295 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2296 assert_eq!(dec_col.len(), dec_bss.len());
2297 dec_col
2298 .iter()
2299 .zip(dec_bss.iter())
2300 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2301 }
2302 assert_eq!(row_count, 200);
2303 }
2304
2305 #[test]
2306 fn test_read_incorrect_map_schema_file() {
2307 let testdata = arrow::util::test_util::parquet_test_data();
2308 let path = format!("{testdata}/incorrect_map_schema.parquet");
2310 let file = File::open(path).unwrap();
2311 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2312
2313 let batch = record_reader.next().unwrap().unwrap();
2314 assert_eq!(batch.num_rows(), 1);
2315
2316 let expected_schema = Schema::new(vec![Field::new(
2317 "my_map",
2318 ArrowDataType::Map(
2319 Arc::new(Field::new(
2320 "key_value",
2321 ArrowDataType::Struct(Fields::from(vec![
2322 Field::new("key", ArrowDataType::Utf8, false),
2323 Field::new("value", ArrowDataType::Utf8, true),
2324 ])),
2325 false,
2326 )),
2327 false,
2328 ),
2329 true,
2330 )]);
2331 assert_eq!(batch.schema().as_ref(), &expected_schema);
2332
2333 assert_eq!(batch.num_rows(), 1);
2334 assert_eq!(batch.column(0).null_count(), 0);
2335 assert_eq!(
2336 batch.column(0).as_map().keys().as_ref(),
2337 &StringArray::from(vec!["parent", "name"])
2338 );
2339 assert_eq!(
2340 batch.column(0).as_map().values().as_ref(),
2341 &StringArray::from(vec!["another", "report"])
2342 );
2343 }
2344
2345 #[test]
2346 fn test_read_dict_fixed_size_binary() {
2347 let schema = Arc::new(Schema::new(vec![Field::new(
2348 "a",
2349 ArrowDataType::Dictionary(
2350 Box::new(ArrowDataType::UInt8),
2351 Box::new(ArrowDataType::FixedSizeBinary(8)),
2352 ),
2353 true,
2354 )]));
2355 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2356 let values = FixedSizeBinaryArray::try_from_iter(
2357 vec![
2358 (0u8..8u8).collect::<Vec<u8>>(),
2359 (24u8..32u8).collect::<Vec<u8>>(),
2360 ]
2361 .into_iter(),
2362 )
2363 .unwrap();
2364 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2365 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2366
2367 let mut buffer = Vec::with_capacity(1024);
2368 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2369 writer.write(&batch).unwrap();
2370 writer.close().unwrap();
2371 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2372 .unwrap()
2373 .collect::<Result<Vec<_>, _>>()
2374 .unwrap();
2375
2376 assert_eq!(read.len(), 1);
2377 assert_eq!(&batch, &read[0])
2378 }
2379
2380 #[test]
2381 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2382 let struct_fields = Fields::from(vec![
2389 Field::new(
2390 "city",
2391 ArrowDataType::Dictionary(
2392 Box::new(ArrowDataType::UInt8),
2393 Box::new(ArrowDataType::Utf8),
2394 ),
2395 true,
2396 ),
2397 Field::new("name", ArrowDataType::Utf8, true),
2398 ]);
2399 let schema = Arc::new(Schema::new(vec![Field::new(
2400 "items",
2401 ArrowDataType::Struct(struct_fields.clone()),
2402 true,
2403 )]));
2404
2405 let items_arr = StructArray::new(
2406 struct_fields,
2407 vec![
2408 Arc::new(DictionaryArray::new(
2409 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2410 Arc::new(StringArray::from_iter_values(vec![
2411 "quebec",
2412 "fredericton",
2413 "halifax",
2414 ])),
2415 )),
2416 Arc::new(StringArray::from_iter_values(vec![
2417 "albert", "terry", "lance", "", "tim",
2418 ])),
2419 ],
2420 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2421 );
2422
2423 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2424 let mut buffer = Vec::with_capacity(1024);
2425 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2426 writer.write(&batch).unwrap();
2427 writer.close().unwrap();
2428 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2429 .unwrap()
2430 .collect::<Result<Vec<_>, _>>()
2431 .unwrap();
2432
2433 assert_eq!(read.len(), 1);
2434 assert_eq!(&batch, &read[0])
2435 }
2436
2437 #[derive(Clone)]
2439 struct TestOptions {
2440 num_row_groups: usize,
2443 num_rows: usize,
2445 record_batch_size: usize,
2447 null_percent: Option<usize>,
2449 write_batch_size: usize,
2454 max_data_page_size: usize,
2456 max_dict_page_size: usize,
2458 writer_version: WriterVersion,
2460 enabled_statistics: EnabledStatistics,
2462 encoding: Encoding,
2464 row_selections: Option<(RowSelection, usize)>,
2466 row_filter: Option<Vec<bool>>,
2468 limit: Option<usize>,
2470 offset: Option<usize>,
2472 }
2473
2474 impl std::fmt::Debug for TestOptions {
2476 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2477 f.debug_struct("TestOptions")
2478 .field("num_row_groups", &self.num_row_groups)
2479 .field("num_rows", &self.num_rows)
2480 .field("record_batch_size", &self.record_batch_size)
2481 .field("null_percent", &self.null_percent)
2482 .field("write_batch_size", &self.write_batch_size)
2483 .field("max_data_page_size", &self.max_data_page_size)
2484 .field("max_dict_page_size", &self.max_dict_page_size)
2485 .field("writer_version", &self.writer_version)
2486 .field("enabled_statistics", &self.enabled_statistics)
2487 .field("encoding", &self.encoding)
2488 .field("row_selections", &self.row_selections.is_some())
2489 .field("row_filter", &self.row_filter.is_some())
2490 .field("limit", &self.limit)
2491 .field("offset", &self.offset)
2492 .finish()
2493 }
2494 }
2495
2496 impl Default for TestOptions {
2497 fn default() -> Self {
2498 Self {
2499 num_row_groups: 2,
2500 num_rows: 100,
2501 record_batch_size: 15,
2502 null_percent: None,
2503 write_batch_size: 64,
2504 max_data_page_size: 1024 * 1024,
2505 max_dict_page_size: 1024 * 1024,
2506 writer_version: WriterVersion::PARQUET_1_0,
2507 enabled_statistics: EnabledStatistics::Page,
2508 encoding: Encoding::PLAIN,
2509 row_selections: None,
2510 row_filter: None,
2511 limit: None,
2512 offset: None,
2513 }
2514 }
2515 }
2516
2517 impl TestOptions {
2518 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2519 Self {
2520 num_row_groups,
2521 num_rows,
2522 record_batch_size,
2523 ..Default::default()
2524 }
2525 }
2526
2527 fn with_null_percent(self, null_percent: usize) -> Self {
2528 Self {
2529 null_percent: Some(null_percent),
2530 ..self
2531 }
2532 }
2533
2534 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2535 Self {
2536 max_data_page_size,
2537 ..self
2538 }
2539 }
2540
2541 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2542 Self {
2543 max_dict_page_size,
2544 ..self
2545 }
2546 }
2547
2548 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2549 Self {
2550 enabled_statistics,
2551 ..self
2552 }
2553 }
2554
2555 fn with_row_selections(self) -> Self {
2556 assert!(self.row_filter.is_none(), "Must set row selection first");
2557
2558 let mut rng = rng();
2559 let step = rng.random_range(self.record_batch_size..self.num_rows);
2560 let row_selections = create_test_selection(
2561 step,
2562 self.num_row_groups * self.num_rows,
2563 rng.random::<bool>(),
2564 );
2565 Self {
2566 row_selections: Some(row_selections),
2567 ..self
2568 }
2569 }
2570
2571 fn with_row_filter(self) -> Self {
2572 let row_count = match &self.row_selections {
2573 Some((_, count)) => *count,
2574 None => self.num_row_groups * self.num_rows,
2575 };
2576
2577 let mut rng = rng();
2578 Self {
2579 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2580 ..self
2581 }
2582 }
2583
2584 fn with_limit(self, limit: usize) -> Self {
2585 Self {
2586 limit: Some(limit),
2587 ..self
2588 }
2589 }
2590
2591 fn with_offset(self, offset: usize) -> Self {
2592 Self {
2593 offset: Some(offset),
2594 ..self
2595 }
2596 }
2597
2598 fn writer_props(&self) -> WriterProperties {
2599 let builder = WriterProperties::builder()
2600 .set_data_page_size_limit(self.max_data_page_size)
2601 .set_write_batch_size(self.write_batch_size)
2602 .set_writer_version(self.writer_version)
2603 .set_statistics_enabled(self.enabled_statistics);
2604
2605 let builder = match self.encoding {
2606 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2607 .set_dictionary_enabled(true)
2608 .set_dictionary_page_size_limit(self.max_dict_page_size),
2609 _ => builder
2610 .set_dictionary_enabled(false)
2611 .set_encoding(self.encoding),
2612 };
2613
2614 builder.build()
2615 }
2616 }
2617
2618 fn run_single_column_reader_tests<T, F, G>(
2625 rand_max: i32,
2626 converted_type: ConvertedType,
2627 arrow_type: Option<ArrowDataType>,
2628 converter: F,
2629 encodings: &[Encoding],
2630 ) where
2631 T: DataType,
2632 G: RandGen<T>,
2633 F: Fn(&[Option<T::T>]) -> ArrayRef,
2634 {
2635 let all_options = vec![
2636 TestOptions::new(2, 100, 15),
2639 TestOptions::new(3, 25, 5),
2644 TestOptions::new(4, 100, 25),
2648 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2650 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2652 TestOptions::new(2, 256, 127).with_null_percent(0),
2654 TestOptions::new(2, 256, 93).with_null_percent(25),
2656 TestOptions::new(4, 100, 25).with_limit(0),
2658 TestOptions::new(4, 100, 25).with_limit(50),
2660 TestOptions::new(4, 100, 25).with_limit(10),
2662 TestOptions::new(4, 100, 25).with_limit(101),
2664 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2666 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2668 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2670 TestOptions::new(2, 256, 91)
2672 .with_null_percent(25)
2673 .with_enabled_statistics(EnabledStatistics::Chunk),
2674 TestOptions::new(2, 256, 91)
2676 .with_null_percent(25)
2677 .with_enabled_statistics(EnabledStatistics::None),
2678 TestOptions::new(2, 128, 91)
2680 .with_null_percent(100)
2681 .with_enabled_statistics(EnabledStatistics::None),
2682 TestOptions::new(2, 100, 15).with_row_selections(),
2687 TestOptions::new(3, 25, 5).with_row_selections(),
2692 TestOptions::new(4, 100, 25).with_row_selections(),
2696 TestOptions::new(3, 256, 73)
2698 .with_max_data_page_size(128)
2699 .with_row_selections(),
2700 TestOptions::new(3, 256, 57)
2702 .with_max_dict_page_size(128)
2703 .with_row_selections(),
2704 TestOptions::new(2, 256, 127)
2706 .with_null_percent(0)
2707 .with_row_selections(),
2708 TestOptions::new(2, 256, 93)
2710 .with_null_percent(25)
2711 .with_row_selections(),
2712 TestOptions::new(2, 256, 93)
2714 .with_null_percent(25)
2715 .with_row_selections()
2716 .with_limit(10),
2717 TestOptions::new(2, 256, 93)
2719 .with_null_percent(25)
2720 .with_row_selections()
2721 .with_offset(20)
2722 .with_limit(10),
2723 TestOptions::new(4, 100, 25).with_row_filter(),
2727 TestOptions::new(4, 100, 25)
2729 .with_row_selections()
2730 .with_row_filter(),
2731 TestOptions::new(2, 256, 93)
2733 .with_null_percent(25)
2734 .with_max_data_page_size(10)
2735 .with_row_filter(),
2736 TestOptions::new(2, 256, 93)
2738 .with_null_percent(25)
2739 .with_max_data_page_size(10)
2740 .with_row_selections()
2741 .with_row_filter(),
2742 TestOptions::new(2, 256, 93)
2744 .with_enabled_statistics(EnabledStatistics::None)
2745 .with_max_data_page_size(10)
2746 .with_row_selections(),
2747 ];
2748
2749 all_options.into_iter().for_each(|opts| {
2750 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2751 for encoding in encodings {
2752 let opts = TestOptions {
2753 writer_version,
2754 encoding: *encoding,
2755 ..opts.clone()
2756 };
2757
2758 single_column_reader_test::<T, _, G>(
2759 opts,
2760 rand_max,
2761 converted_type,
2762 arrow_type.clone(),
2763 &converter,
2764 )
2765 }
2766 }
2767 });
2768 }
2769
2770 fn single_column_reader_test<T, F, G>(
2774 opts: TestOptions,
2775 rand_max: i32,
2776 converted_type: ConvertedType,
2777 arrow_type: Option<ArrowDataType>,
2778 converter: F,
2779 ) where
2780 T: DataType,
2781 G: RandGen<T>,
2782 F: Fn(&[Option<T::T>]) -> ArrayRef,
2783 {
2784 println!(
2786 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2787 T::get_physical_type(),
2788 converted_type,
2789 arrow_type,
2790 opts
2791 );
2792
2793 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2795 Some(null_percent) => {
2796 let mut rng = rng();
2797
2798 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2799 .map(|_| {
2800 std::iter::from_fn(|| {
2801 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2802 })
2803 .take(opts.num_rows)
2804 .collect()
2805 })
2806 .collect();
2807 (Repetition::OPTIONAL, Some(def_levels))
2808 }
2809 None => (Repetition::REQUIRED, None),
2810 };
2811
2812 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2814 .map(|idx| {
2815 let null_count = match def_levels.as_ref() {
2816 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2817 None => 0,
2818 };
2819 G::gen_vec(rand_max, opts.num_rows - null_count)
2820 })
2821 .collect();
2822
2823 let len = match T::get_physical_type() {
2824 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2825 crate::basic::Type::INT96 => 12,
2826 _ => -1,
2827 };
2828
2829 let fields = vec![Arc::new(
2830 Type::primitive_type_builder("leaf", T::get_physical_type())
2831 .with_repetition(repetition)
2832 .with_converted_type(converted_type)
2833 .with_length(len)
2834 .build()
2835 .unwrap(),
2836 )];
2837
2838 let schema = Arc::new(
2839 Type::group_type_builder("test_schema")
2840 .with_fields(fields)
2841 .build()
2842 .unwrap(),
2843 );
2844
2845 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2846
2847 let mut file = tempfile::tempfile().unwrap();
2848
2849 generate_single_column_file_with_data::<T>(
2850 &values,
2851 def_levels.as_ref(),
2852 file.try_clone().unwrap(), schema,
2854 arrow_field,
2855 &opts,
2856 )
2857 .unwrap();
2858
2859 file.rewind().unwrap();
2860
2861 let options = ArrowReaderOptions::new()
2862 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2863
2864 let mut builder =
2865 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2866
2867 let expected_data = match opts.row_selections {
2868 Some((selections, row_count)) => {
2869 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2870
2871 let mut skip_data: Vec<Option<T::T>> = vec![];
2872 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2873 for select in dequeue {
2874 if select.skip {
2875 without_skip_data.drain(0..select.row_count);
2876 } else {
2877 skip_data.extend(without_skip_data.drain(0..select.row_count));
2878 }
2879 }
2880 builder = builder.with_row_selection(selections);
2881
2882 assert_eq!(skip_data.len(), row_count);
2883 skip_data
2884 }
2885 None => {
2886 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2888 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2889 expected_data
2890 }
2891 };
2892
2893 let mut expected_data = match opts.row_filter {
2894 Some(filter) => {
2895 let expected_data = expected_data
2896 .into_iter()
2897 .zip(filter.iter())
2898 .filter_map(|(d, f)| f.then(|| d))
2899 .collect();
2900
2901 let mut filter_offset = 0;
2902 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2903 ProjectionMask::all(),
2904 move |b| {
2905 let array = BooleanArray::from_iter(
2906 filter
2907 .iter()
2908 .skip(filter_offset)
2909 .take(b.num_rows())
2910 .map(|x| Some(*x)),
2911 );
2912 filter_offset += b.num_rows();
2913 Ok(array)
2914 },
2915 ))]);
2916
2917 builder = builder.with_row_filter(filter);
2918 expected_data
2919 }
2920 None => expected_data,
2921 };
2922
2923 if let Some(offset) = opts.offset {
2924 builder = builder.with_offset(offset);
2925 expected_data = expected_data.into_iter().skip(offset).collect();
2926 }
2927
2928 if let Some(limit) = opts.limit {
2929 builder = builder.with_limit(limit);
2930 expected_data = expected_data.into_iter().take(limit).collect();
2931 }
2932
2933 let mut record_reader = builder
2934 .with_batch_size(opts.record_batch_size)
2935 .build()
2936 .unwrap();
2937
2938 let mut total_read = 0;
2939 loop {
2940 let maybe_batch = record_reader.next();
2941 if total_read < expected_data.len() {
2942 let end = min(total_read + opts.record_batch_size, expected_data.len());
2943 let batch = maybe_batch.unwrap().unwrap();
2944 assert_eq!(end - total_read, batch.num_rows());
2945
2946 let a = converter(&expected_data[total_read..end]);
2947 let b = Arc::clone(batch.column(0));
2948
2949 assert_eq!(a.data_type(), b.data_type());
2950 assert_eq!(a.to_data(), b.to_data());
2951 assert_eq!(
2952 a.as_any().type_id(),
2953 b.as_any().type_id(),
2954 "incorrect type ids"
2955 );
2956
2957 total_read = end;
2958 } else {
2959 assert!(maybe_batch.is_none());
2960 break;
2961 }
2962 }
2963 }
2964
2965 fn gen_expected_data<T: DataType>(
2966 def_levels: Option<&Vec<Vec<i16>>>,
2967 values: &[Vec<T::T>],
2968 ) -> Vec<Option<T::T>> {
2969 let data: Vec<Option<T::T>> = match def_levels {
2970 Some(levels) => {
2971 let mut values_iter = values.iter().flatten();
2972 levels
2973 .iter()
2974 .flatten()
2975 .map(|d| match d {
2976 1 => Some(values_iter.next().cloned().unwrap()),
2977 0 => None,
2978 _ => unreachable!(),
2979 })
2980 .collect()
2981 }
2982 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2983 };
2984 data
2985 }
2986
2987 fn generate_single_column_file_with_data<T: DataType>(
2988 values: &[Vec<T::T>],
2989 def_levels: Option<&Vec<Vec<i16>>>,
2990 file: File,
2991 schema: TypePtr,
2992 field: Option<Field>,
2993 opts: &TestOptions,
2994 ) -> Result<ParquetMetaData> {
2995 let mut writer_props = opts.writer_props();
2996 if let Some(field) = field {
2997 let arrow_schema = Schema::new(vec![field]);
2998 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2999 }
3000
3001 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3002
3003 for (idx, v) in values.iter().enumerate() {
3004 let def_levels = def_levels.map(|d| d[idx].as_slice());
3005 let mut row_group_writer = writer.next_row_group()?;
3006 {
3007 let mut column_writer = row_group_writer
3008 .next_column()?
3009 .expect("Column writer is none!");
3010
3011 column_writer
3012 .typed::<T>()
3013 .write_batch(v, def_levels, None)?;
3014
3015 column_writer.close()?;
3016 }
3017 row_group_writer.close()?;
3018 }
3019
3020 writer.close()
3021 }
3022
3023 fn get_test_file(file_name: &str) -> File {
3024 let mut path = PathBuf::new();
3025 path.push(arrow::util::test_util::arrow_test_data());
3026 path.push(file_name);
3027
3028 File::open(path.as_path()).expect("File not found!")
3029 }
3030
3031 #[test]
3032 fn test_read_structs() {
3033 let testdata = arrow::util::test_util::parquet_test_data();
3037 let path = format!("{testdata}/nested_structs.rust.parquet");
3038 let file = File::open(&path).unwrap();
3039 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3040
3041 for batch in record_batch_reader {
3042 batch.unwrap();
3043 }
3044
3045 let file = File::open(&path).unwrap();
3046 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3047
3048 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3049 let projected_reader = builder
3050 .with_projection(mask)
3051 .with_batch_size(60)
3052 .build()
3053 .unwrap();
3054
3055 let expected_schema = Schema::new(vec![
3056 Field::new(
3057 "roll_num",
3058 ArrowDataType::Struct(Fields::from(vec![Field::new(
3059 "count",
3060 ArrowDataType::UInt64,
3061 false,
3062 )])),
3063 false,
3064 ),
3065 Field::new(
3066 "PC_CUR",
3067 ArrowDataType::Struct(Fields::from(vec![
3068 Field::new("mean", ArrowDataType::Int64, false),
3069 Field::new("sum", ArrowDataType::Int64, false),
3070 ])),
3071 false,
3072 ),
3073 ]);
3074
3075 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3077
3078 for batch in projected_reader {
3079 let batch = batch.unwrap();
3080 assert_eq!(batch.schema().as_ref(), &expected_schema);
3081 }
3082 }
3083
3084 #[test]
3085 fn test_read_structs_by_name() {
3087 let testdata = arrow::util::test_util::parquet_test_data();
3088 let path = format!("{testdata}/nested_structs.rust.parquet");
3089 let file = File::open(&path).unwrap();
3090 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3091
3092 for batch in record_batch_reader {
3093 batch.unwrap();
3094 }
3095
3096 let file = File::open(&path).unwrap();
3097 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3098
3099 let mask = ProjectionMask::columns(
3100 builder.parquet_schema(),
3101 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3102 );
3103 let projected_reader = builder
3104 .with_projection(mask)
3105 .with_batch_size(60)
3106 .build()
3107 .unwrap();
3108
3109 let expected_schema = Schema::new(vec![
3110 Field::new(
3111 "roll_num",
3112 ArrowDataType::Struct(Fields::from(vec![Field::new(
3113 "count",
3114 ArrowDataType::UInt64,
3115 false,
3116 )])),
3117 false,
3118 ),
3119 Field::new(
3120 "PC_CUR",
3121 ArrowDataType::Struct(Fields::from(vec![
3122 Field::new("mean", ArrowDataType::Int64, false),
3123 Field::new("sum", ArrowDataType::Int64, false),
3124 ])),
3125 false,
3126 ),
3127 ]);
3128
3129 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3130
3131 for batch in projected_reader {
3132 let batch = batch.unwrap();
3133 assert_eq!(batch.schema().as_ref(), &expected_schema);
3134 }
3135 }
3136
3137 #[test]
3138 fn test_read_maps() {
3139 let testdata = arrow::util::test_util::parquet_test_data();
3140 let path = format!("{testdata}/nested_maps.snappy.parquet");
3141 let file = File::open(path).unwrap();
3142 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3143
3144 for batch in record_batch_reader {
3145 batch.unwrap();
3146 }
3147 }
3148
3149 #[test]
3150 fn test_nested_nullability() {
3151 let message_type = "message nested {
3152 OPTIONAL Group group {
3153 REQUIRED INT32 leaf;
3154 }
3155 }";
3156
3157 let file = tempfile::tempfile().unwrap();
3158 let schema = Arc::new(parse_message_type(message_type).unwrap());
3159
3160 {
3161 let mut writer =
3163 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3164 .unwrap();
3165
3166 {
3167 let mut row_group_writer = writer.next_row_group().unwrap();
3168 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3169
3170 column_writer
3171 .typed::<Int32Type>()
3172 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3173 .unwrap();
3174
3175 column_writer.close().unwrap();
3176 row_group_writer.close().unwrap();
3177 }
3178
3179 writer.close().unwrap();
3180 }
3181
3182 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3183 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3184
3185 let reader = builder.with_projection(mask).build().unwrap();
3186
3187 let expected_schema = Schema::new(vec![Field::new(
3188 "group",
3189 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3190 true,
3191 )]);
3192
3193 let batch = reader.into_iter().next().unwrap().unwrap();
3194 assert_eq!(batch.schema().as_ref(), &expected_schema);
3195 assert_eq!(batch.num_rows(), 4);
3196 assert_eq!(batch.column(0).null_count(), 2);
3197 }
3198
3199 #[test]
3200 fn test_invalid_utf8() {
3201 let data = vec![
3203 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3204 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3205 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3206 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3207 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3208 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3209 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3210 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3211 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3212 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3213 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3214 82, 49,
3215 ];
3216
3217 let file = Bytes::from(data);
3218 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3219
3220 let error = record_batch_reader.next().unwrap().unwrap_err();
3221
3222 assert!(
3223 error.to_string().contains("invalid utf-8 sequence"),
3224 "{}",
3225 error
3226 );
3227 }
3228
3229 #[test]
3230 fn test_invalid_utf8_string_array() {
3231 test_invalid_utf8_string_array_inner::<i32>();
3232 }
3233
3234 #[test]
3235 fn test_invalid_utf8_large_string_array() {
3236 test_invalid_utf8_string_array_inner::<i64>();
3237 }
3238
3239 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3240 let cases = [
3241 invalid_utf8_first_char::<O>(),
3242 invalid_utf8_first_char_long_strings::<O>(),
3243 invalid_utf8_later_char::<O>(),
3244 invalid_utf8_later_char_long_strings::<O>(),
3245 invalid_utf8_later_char_really_long_strings::<O>(),
3246 invalid_utf8_later_char_really_long_strings2::<O>(),
3247 ];
3248 for array in &cases {
3249 for encoding in STRING_ENCODINGS {
3250 let array = unsafe {
3253 GenericStringArray::<O>::new_unchecked(
3254 array.offsets().clone(),
3255 array.values().clone(),
3256 array.nulls().cloned(),
3257 )
3258 };
3259 let data_type = array.data_type().clone();
3260 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3261 let err = read_from_parquet(data).unwrap_err();
3262 let expected_err =
3263 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3264 assert!(
3265 err.to_string().contains(expected_err),
3266 "data type: {data_type}, expected: {expected_err}, got: {err}"
3267 );
3268 }
3269 }
3270 }
3271
3272 #[test]
3273 fn test_invalid_utf8_string_view_array() {
3274 let cases = [
3275 invalid_utf8_first_char::<i32>(),
3276 invalid_utf8_first_char_long_strings::<i32>(),
3277 invalid_utf8_later_char::<i32>(),
3278 invalid_utf8_later_char_long_strings::<i32>(),
3279 invalid_utf8_later_char_really_long_strings::<i32>(),
3280 invalid_utf8_later_char_really_long_strings2::<i32>(),
3281 ];
3282
3283 for encoding in STRING_ENCODINGS {
3284 for array in &cases {
3285 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3286 let array = array.as_binary_view();
3287
3288 let array = unsafe {
3291 StringViewArray::new_unchecked(
3292 array.views().clone(),
3293 array.data_buffers().to_vec(),
3294 array.nulls().cloned(),
3295 )
3296 };
3297
3298 let data_type = array.data_type().clone();
3299 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3300 let err = read_from_parquet(data).unwrap_err();
3301 let expected_err =
3302 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3303 assert!(
3304 err.to_string().contains(expected_err),
3305 "data type: {data_type}, expected: {expected_err}, got: {err}"
3306 );
3307 }
3308 }
3309 }
3310
3311 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3313 None,
3314 Some(Encoding::PLAIN),
3315 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3316 Some(Encoding::DELTA_BYTE_ARRAY),
3317 ];
3318
3319 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3322
3323 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3326
3327 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3329 let valid: &[u8] = b" ";
3330 let invalid = INVALID_UTF8_FIRST_CHAR;
3331 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3332 }
3333
3334 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3338 let valid: &[u8] = b" ";
3339 let mut invalid = vec![];
3340 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3341 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3342 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3343 }
3344
3345 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3348 let valid: &[u8] = b" ";
3349 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3350 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3351 }
3352
3353 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3357 let valid: &[u8] = b" ";
3358 let mut invalid = vec![];
3359 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3360 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3361 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3362 }
3363
3364 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3368 let valid: &[u8] = b" ";
3369 let mut invalid = vec![];
3370 for _ in 0..10 {
3371 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3373 }
3374 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3375 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3376 }
3377
3378 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3381 let valid: &[u8] = b" ";
3382 let mut valid_long = vec![];
3383 for _ in 0..10 {
3384 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3386 }
3387 let invalid = INVALID_UTF8_LATER_CHAR;
3388 GenericBinaryArray::<O>::from_iter(vec![
3389 None,
3390 Some(valid),
3391 Some(invalid),
3392 None,
3393 Some(&valid_long),
3394 Some(valid),
3395 ])
3396 }
3397
3398 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3403 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3404 let mut data = vec![];
3405 let schema = batch.schema();
3406 let props = encoding.map(|encoding| {
3407 WriterProperties::builder()
3408 .set_dictionary_enabled(false)
3410 .set_encoding(encoding)
3411 .build()
3412 });
3413
3414 {
3415 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3416 writer.write(&batch).unwrap();
3417 writer.flush().unwrap();
3418 writer.close().unwrap();
3419 };
3420 data
3421 }
3422
3423 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3425 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3426 .unwrap()
3427 .build()
3428 .unwrap();
3429
3430 reader.collect()
3431 }
3432
3433 #[test]
3434 fn test_dictionary_preservation() {
3435 let fields = vec![Arc::new(
3436 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3437 .with_repetition(Repetition::OPTIONAL)
3438 .with_converted_type(ConvertedType::UTF8)
3439 .build()
3440 .unwrap(),
3441 )];
3442
3443 let schema = Arc::new(
3444 Type::group_type_builder("test_schema")
3445 .with_fields(fields)
3446 .build()
3447 .unwrap(),
3448 );
3449
3450 let dict_type = ArrowDataType::Dictionary(
3451 Box::new(ArrowDataType::Int32),
3452 Box::new(ArrowDataType::Utf8),
3453 );
3454
3455 let arrow_field = Field::new("leaf", dict_type, true);
3456
3457 let mut file = tempfile::tempfile().unwrap();
3458
3459 let values = vec![
3460 vec![
3461 ByteArray::from("hello"),
3462 ByteArray::from("a"),
3463 ByteArray::from("b"),
3464 ByteArray::from("d"),
3465 ],
3466 vec![
3467 ByteArray::from("c"),
3468 ByteArray::from("a"),
3469 ByteArray::from("b"),
3470 ],
3471 ];
3472
3473 let def_levels = vec![
3474 vec![1, 0, 0, 1, 0, 0, 1, 1],
3475 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3476 ];
3477
3478 let opts = TestOptions {
3479 encoding: Encoding::RLE_DICTIONARY,
3480 ..Default::default()
3481 };
3482
3483 generate_single_column_file_with_data::<ByteArrayType>(
3484 &values,
3485 Some(&def_levels),
3486 file.try_clone().unwrap(), schema,
3488 Some(arrow_field),
3489 &opts,
3490 )
3491 .unwrap();
3492
3493 file.rewind().unwrap();
3494
3495 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3496
3497 let batches = record_reader
3498 .collect::<Result<Vec<RecordBatch>, _>>()
3499 .unwrap();
3500
3501 assert_eq!(batches.len(), 6);
3502 assert!(batches.iter().all(|x| x.num_columns() == 1));
3503
3504 let row_counts = batches
3505 .iter()
3506 .map(|x| (x.num_rows(), x.column(0).null_count()))
3507 .collect::<Vec<_>>();
3508
3509 assert_eq!(
3510 row_counts,
3511 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3512 );
3513
3514 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3515
3516 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3518 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3520 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3521 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3523 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3524 }
3525
3526 #[test]
3527 fn test_read_null_list() {
3528 let testdata = arrow::util::test_util::parquet_test_data();
3529 let path = format!("{testdata}/null_list.parquet");
3530 let file = File::open(path).unwrap();
3531 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3532
3533 let batch = record_batch_reader.next().unwrap().unwrap();
3534 assert_eq!(batch.num_rows(), 1);
3535 assert_eq!(batch.num_columns(), 1);
3536 assert_eq!(batch.column(0).len(), 1);
3537
3538 let list = batch
3539 .column(0)
3540 .as_any()
3541 .downcast_ref::<ListArray>()
3542 .unwrap();
3543 assert_eq!(list.len(), 1);
3544 assert!(list.is_valid(0));
3545
3546 let val = list.value(0);
3547 assert_eq!(val.len(), 0);
3548 }
3549
3550 #[test]
3551 fn test_null_schema_inference() {
3552 let testdata = arrow::util::test_util::parquet_test_data();
3553 let path = format!("{testdata}/null_list.parquet");
3554 let file = File::open(path).unwrap();
3555
3556 let arrow_field = Field::new(
3557 "emptylist",
3558 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3559 true,
3560 );
3561
3562 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3563 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3564 let schema = builder.schema();
3565 assert_eq!(schema.fields().len(), 1);
3566 assert_eq!(schema.field(0), &arrow_field);
3567 }
3568
3569 #[test]
3570 fn test_skip_metadata() {
3571 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3572 let field = Field::new("col", col.data_type().clone(), true);
3573
3574 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3575
3576 let metadata = [("key".to_string(), "value".to_string())]
3577 .into_iter()
3578 .collect();
3579
3580 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3581
3582 assert_ne!(schema_with_metadata, schema_without_metadata);
3583
3584 let batch =
3585 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3586
3587 let file = |version: WriterVersion| {
3588 let props = WriterProperties::builder()
3589 .set_writer_version(version)
3590 .build();
3591
3592 let file = tempfile().unwrap();
3593 let mut writer =
3594 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3595 .unwrap();
3596 writer.write(&batch).unwrap();
3597 writer.close().unwrap();
3598 file
3599 };
3600
3601 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3602
3603 let v1_reader = file(WriterVersion::PARQUET_1_0);
3604 let v2_reader = file(WriterVersion::PARQUET_2_0);
3605
3606 let arrow_reader =
3607 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3608 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3609
3610 let reader =
3611 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3612 .unwrap()
3613 .build()
3614 .unwrap();
3615 assert_eq!(reader.schema(), schema_without_metadata);
3616
3617 let arrow_reader =
3618 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3619 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3620
3621 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3622 .unwrap()
3623 .build()
3624 .unwrap();
3625 assert_eq!(reader.schema(), schema_without_metadata);
3626 }
3627
3628 fn write_parquet_from_iter<I, F>(value: I) -> File
3629 where
3630 I: IntoIterator<Item = (F, ArrayRef)>,
3631 F: AsRef<str>,
3632 {
3633 let batch = RecordBatch::try_from_iter(value).unwrap();
3634 let file = tempfile().unwrap();
3635 let mut writer =
3636 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3637 writer.write(&batch).unwrap();
3638 writer.close().unwrap();
3639 file
3640 }
3641
3642 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3643 where
3644 I: IntoIterator<Item = (F, ArrayRef)>,
3645 F: AsRef<str>,
3646 {
3647 let file = write_parquet_from_iter(value);
3648 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3649 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3650 file.try_clone().unwrap(),
3651 options_with_schema,
3652 );
3653 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3654 }
3655
3656 #[test]
3657 fn test_schema_too_few_columns() {
3658 run_schema_test_with_error(
3659 vec![
3660 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3661 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3662 ],
3663 Arc::new(Schema::new(vec![Field::new(
3664 "int64",
3665 ArrowDataType::Int64,
3666 false,
3667 )])),
3668 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3669 );
3670 }
3671
3672 #[test]
3673 fn test_schema_too_many_columns() {
3674 run_schema_test_with_error(
3675 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3676 Arc::new(Schema::new(vec![
3677 Field::new("int64", ArrowDataType::Int64, false),
3678 Field::new("int32", ArrowDataType::Int32, false),
3679 ])),
3680 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3681 );
3682 }
3683
3684 #[test]
3685 fn test_schema_mismatched_column_names() {
3686 run_schema_test_with_error(
3687 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3688 Arc::new(Schema::new(vec![Field::new(
3689 "other",
3690 ArrowDataType::Int64,
3691 false,
3692 )])),
3693 "Arrow: incompatible arrow schema, expected field named int64 got other",
3694 );
3695 }
3696
3697 #[test]
3698 fn test_schema_incompatible_columns() {
3699 run_schema_test_with_error(
3700 vec![
3701 (
3702 "col1_invalid",
3703 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3704 ),
3705 (
3706 "col2_valid",
3707 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3708 ),
3709 (
3710 "col3_invalid",
3711 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3712 ),
3713 ],
3714 Arc::new(Schema::new(vec![
3715 Field::new("col1_invalid", ArrowDataType::Int32, false),
3716 Field::new("col2_valid", ArrowDataType::Int32, false),
3717 Field::new("col3_invalid", ArrowDataType::Int32, false),
3718 ])),
3719 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3720 );
3721 }
3722
3723 #[test]
3724 fn test_one_incompatible_nested_column() {
3725 let nested_fields = Fields::from(vec![
3726 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3727 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3728 ]);
3729 let nested = StructArray::try_new(
3730 nested_fields,
3731 vec![
3732 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3733 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3734 ],
3735 None,
3736 )
3737 .expect("struct array");
3738 let supplied_nested_fields = Fields::from(vec![
3739 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3740 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3741 ]);
3742 run_schema_test_with_error(
3743 vec![
3744 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3745 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3746 ("nested", Arc::new(nested) as ArrayRef),
3747 ],
3748 Arc::new(Schema::new(vec![
3749 Field::new("col1", ArrowDataType::Int64, false),
3750 Field::new("col2", ArrowDataType::Int32, false),
3751 Field::new(
3752 "nested",
3753 ArrowDataType::Struct(supplied_nested_fields),
3754 false,
3755 ),
3756 ])),
3757 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3758 requested Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int32) \
3759 but found Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int64)",
3760 );
3761 }
3762
3763 fn utf8_parquet() -> Bytes {
3765 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3766 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3767 let props = None;
3768 let mut parquet_data = vec![];
3770 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3771 writer.write(&batch).unwrap();
3772 writer.close().unwrap();
3773 Bytes::from(parquet_data)
3774 }
3775
3776 #[test]
3777 fn test_schema_error_bad_types() {
3778 let parquet_data = utf8_parquet();
3780
3781 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3783 "column1",
3784 arrow::datatypes::DataType::Int32,
3785 false,
3786 )]));
3787
3788 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3790 let err =
3791 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3792 .unwrap_err();
3793 assert_eq!(
3794 err.to_string(),
3795 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
3796 )
3797 }
3798
3799 #[test]
3800 fn test_schema_error_bad_nullability() {
3801 let parquet_data = utf8_parquet();
3803
3804 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3806 "column1",
3807 arrow::datatypes::DataType::Utf8,
3808 true,
3809 )]));
3810
3811 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3813 let err =
3814 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3815 .unwrap_err();
3816 assert_eq!(
3817 err.to_string(),
3818 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
3819 )
3820 }
3821
3822 #[test]
3823 fn test_read_binary_as_utf8() {
3824 let file = write_parquet_from_iter(vec![
3825 (
3826 "binary_to_utf8",
3827 Arc::new(BinaryArray::from(vec![
3828 b"one".as_ref(),
3829 b"two".as_ref(),
3830 b"three".as_ref(),
3831 ])) as ArrayRef,
3832 ),
3833 (
3834 "large_binary_to_large_utf8",
3835 Arc::new(LargeBinaryArray::from(vec![
3836 b"one".as_ref(),
3837 b"two".as_ref(),
3838 b"three".as_ref(),
3839 ])) as ArrayRef,
3840 ),
3841 (
3842 "binary_view_to_utf8_view",
3843 Arc::new(BinaryViewArray::from(vec![
3844 b"one".as_ref(),
3845 b"two".as_ref(),
3846 b"three".as_ref(),
3847 ])) as ArrayRef,
3848 ),
3849 ]);
3850 let supplied_fields = Fields::from(vec![
3851 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3852 Field::new(
3853 "large_binary_to_large_utf8",
3854 ArrowDataType::LargeUtf8,
3855 false,
3856 ),
3857 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3858 ]);
3859
3860 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3861 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3862 file.try_clone().unwrap(),
3863 options,
3864 )
3865 .expect("reader builder with schema")
3866 .build()
3867 .expect("reader with schema");
3868
3869 let batch = arrow_reader.next().unwrap().unwrap();
3870 assert_eq!(batch.num_columns(), 3);
3871 assert_eq!(batch.num_rows(), 3);
3872 assert_eq!(
3873 batch
3874 .column(0)
3875 .as_string::<i32>()
3876 .iter()
3877 .collect::<Vec<_>>(),
3878 vec![Some("one"), Some("two"), Some("three")]
3879 );
3880
3881 assert_eq!(
3882 batch
3883 .column(1)
3884 .as_string::<i64>()
3885 .iter()
3886 .collect::<Vec<_>>(),
3887 vec![Some("one"), Some("two"), Some("three")]
3888 );
3889
3890 assert_eq!(
3891 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3892 vec![Some("one"), Some("two"), Some("three")]
3893 );
3894 }
3895
3896 #[test]
3897 #[should_panic(expected = "Invalid UTF8 sequence at")]
3898 fn test_read_non_utf8_binary_as_utf8() {
3899 let file = write_parquet_from_iter(vec![(
3900 "non_utf8_binary",
3901 Arc::new(BinaryArray::from(vec![
3902 b"\xDE\x00\xFF".as_ref(),
3903 b"\xDE\x01\xAA".as_ref(),
3904 b"\xDE\x02\xFF".as_ref(),
3905 ])) as ArrayRef,
3906 )]);
3907 let supplied_fields = Fields::from(vec![Field::new(
3908 "non_utf8_binary",
3909 ArrowDataType::Utf8,
3910 false,
3911 )]);
3912
3913 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3914 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3915 file.try_clone().unwrap(),
3916 options,
3917 )
3918 .expect("reader builder with schema")
3919 .build()
3920 .expect("reader with schema");
3921 arrow_reader.next().unwrap().unwrap_err();
3922 }
3923
3924 #[test]
3925 fn test_with_schema() {
3926 let nested_fields = Fields::from(vec![
3927 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3928 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3929 ]);
3930
3931 let nested_arrays: Vec<ArrayRef> = vec![
3932 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3933 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3934 ];
3935
3936 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3937
3938 let file = write_parquet_from_iter(vec![
3939 (
3940 "int32_to_ts_second",
3941 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3942 ),
3943 (
3944 "date32_to_date64",
3945 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3946 ),
3947 ("nested", Arc::new(nested) as ArrayRef),
3948 ]);
3949
3950 let supplied_nested_fields = Fields::from(vec![
3951 Field::new(
3952 "utf8_to_dict",
3953 ArrowDataType::Dictionary(
3954 Box::new(ArrowDataType::Int32),
3955 Box::new(ArrowDataType::Utf8),
3956 ),
3957 false,
3958 ),
3959 Field::new(
3960 "int64_to_ts_nano",
3961 ArrowDataType::Timestamp(
3962 arrow::datatypes::TimeUnit::Nanosecond,
3963 Some("+10:00".into()),
3964 ),
3965 false,
3966 ),
3967 ]);
3968
3969 let supplied_schema = Arc::new(Schema::new(vec![
3970 Field::new(
3971 "int32_to_ts_second",
3972 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3973 false,
3974 ),
3975 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3976 Field::new(
3977 "nested",
3978 ArrowDataType::Struct(supplied_nested_fields),
3979 false,
3980 ),
3981 ]));
3982
3983 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3984 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3985 file.try_clone().unwrap(),
3986 options,
3987 )
3988 .expect("reader builder with schema")
3989 .build()
3990 .expect("reader with schema");
3991
3992 assert_eq!(arrow_reader.schema(), supplied_schema);
3993 let batch = arrow_reader.next().unwrap().unwrap();
3994 assert_eq!(batch.num_columns(), 3);
3995 assert_eq!(batch.num_rows(), 4);
3996 assert_eq!(
3997 batch
3998 .column(0)
3999 .as_any()
4000 .downcast_ref::<TimestampSecondArray>()
4001 .expect("downcast to timestamp second")
4002 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4003 .map(|v| v.to_string())
4004 .expect("value as datetime"),
4005 "1970-01-01 01:00:00 +01:00"
4006 );
4007 assert_eq!(
4008 batch
4009 .column(1)
4010 .as_any()
4011 .downcast_ref::<Date64Array>()
4012 .expect("downcast to date64")
4013 .value_as_date(0)
4014 .map(|v| v.to_string())
4015 .expect("value as date"),
4016 "1970-01-01"
4017 );
4018
4019 let nested = batch
4020 .column(2)
4021 .as_any()
4022 .downcast_ref::<StructArray>()
4023 .expect("downcast to struct");
4024
4025 let nested_dict = nested
4026 .column(0)
4027 .as_any()
4028 .downcast_ref::<Int32DictionaryArray>()
4029 .expect("downcast to dictionary");
4030
4031 assert_eq!(
4032 nested_dict
4033 .values()
4034 .as_any()
4035 .downcast_ref::<StringArray>()
4036 .expect("downcast to string")
4037 .iter()
4038 .collect::<Vec<_>>(),
4039 vec![Some("a"), Some("b")]
4040 );
4041
4042 assert_eq!(
4043 nested_dict.keys().iter().collect::<Vec<_>>(),
4044 vec![Some(0), Some(0), Some(0), Some(1)]
4045 );
4046
4047 assert_eq!(
4048 nested
4049 .column(1)
4050 .as_any()
4051 .downcast_ref::<TimestampNanosecondArray>()
4052 .expect("downcast to timestamp nanosecond")
4053 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4054 .map(|v| v.to_string())
4055 .expect("value as datetime"),
4056 "1970-01-01 10:00:00.000000001 +10:00"
4057 );
4058 }
4059
4060 #[test]
4061 fn test_empty_projection() {
4062 let testdata = arrow::util::test_util::parquet_test_data();
4063 let path = format!("{testdata}/alltypes_plain.parquet");
4064 let file = File::open(path).unwrap();
4065
4066 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4067 let file_metadata = builder.metadata().file_metadata();
4068 let expected_rows = file_metadata.num_rows() as usize;
4069
4070 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4071 let batch_reader = builder
4072 .with_projection(mask)
4073 .with_batch_size(2)
4074 .build()
4075 .unwrap();
4076
4077 let mut total_rows = 0;
4078 for maybe_batch in batch_reader {
4079 let batch = maybe_batch.unwrap();
4080 total_rows += batch.num_rows();
4081 assert_eq!(batch.num_columns(), 0);
4082 assert!(batch.num_rows() <= 2);
4083 }
4084
4085 assert_eq!(total_rows, expected_rows);
4086 }
4087
4088 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4089 let schema = Arc::new(Schema::new(vec![Field::new(
4090 "list",
4091 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4092 true,
4093 )]));
4094
4095 let mut buf = Vec::with_capacity(1024);
4096
4097 let mut writer = ArrowWriter::try_new(
4098 &mut buf,
4099 schema.clone(),
4100 Some(
4101 WriterProperties::builder()
4102 .set_max_row_group_size(row_group_size)
4103 .build(),
4104 ),
4105 )
4106 .unwrap();
4107 for _ in 0..2 {
4108 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4109 for _ in 0..(batch_size) {
4110 list_builder.append(true);
4111 }
4112 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4113 .unwrap();
4114 writer.write(&batch).unwrap();
4115 }
4116 writer.close().unwrap();
4117
4118 let mut record_reader =
4119 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4120 assert_eq!(
4121 batch_size,
4122 record_reader.next().unwrap().unwrap().num_rows()
4123 );
4124 assert_eq!(
4125 batch_size,
4126 record_reader.next().unwrap().unwrap().num_rows()
4127 );
4128 }
4129
4130 #[test]
4131 fn test_row_group_exact_multiple() {
4132 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4133 test_row_group_batch(8, 8);
4134 test_row_group_batch(10, 8);
4135 test_row_group_batch(8, 10);
4136 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4137 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4138 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4139 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4140 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4141 }
4142
4143 fn get_expected_batches(
4146 column: &RecordBatch,
4147 selection: &RowSelection,
4148 batch_size: usize,
4149 ) -> Vec<RecordBatch> {
4150 let mut expected_batches = vec![];
4151
4152 let mut selection: VecDeque<_> = selection.clone().into();
4153 let mut row_offset = 0;
4154 let mut last_start = None;
4155 while row_offset < column.num_rows() && !selection.is_empty() {
4156 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4157 while batch_remaining > 0 && !selection.is_empty() {
4158 let (to_read, skip) = match selection.front_mut() {
4159 Some(selection) if selection.row_count > batch_remaining => {
4160 selection.row_count -= batch_remaining;
4161 (batch_remaining, selection.skip)
4162 }
4163 Some(_) => {
4164 let select = selection.pop_front().unwrap();
4165 (select.row_count, select.skip)
4166 }
4167 None => break,
4168 };
4169
4170 batch_remaining -= to_read;
4171
4172 match skip {
4173 true => {
4174 if let Some(last_start) = last_start.take() {
4175 expected_batches.push(column.slice(last_start, row_offset - last_start))
4176 }
4177 row_offset += to_read
4178 }
4179 false => {
4180 last_start.get_or_insert(row_offset);
4181 row_offset += to_read
4182 }
4183 }
4184 }
4185 }
4186
4187 if let Some(last_start) = last_start.take() {
4188 expected_batches.push(column.slice(last_start, row_offset - last_start))
4189 }
4190
4191 for batch in &expected_batches[..expected_batches.len() - 1] {
4193 assert_eq!(batch.num_rows(), batch_size);
4194 }
4195
4196 expected_batches
4197 }
4198
4199 fn create_test_selection(
4200 step_len: usize,
4201 total_len: usize,
4202 skip_first: bool,
4203 ) -> (RowSelection, usize) {
4204 let mut remaining = total_len;
4205 let mut skip = skip_first;
4206 let mut vec = vec![];
4207 let mut selected_count = 0;
4208 while remaining != 0 {
4209 let step = if remaining > step_len {
4210 step_len
4211 } else {
4212 remaining
4213 };
4214 vec.push(RowSelector {
4215 row_count: step,
4216 skip,
4217 });
4218 remaining -= step;
4219 if !skip {
4220 selected_count += step;
4221 }
4222 skip = !skip;
4223 }
4224 (vec.into(), selected_count)
4225 }
4226
4227 #[test]
4228 fn test_scan_row_with_selection() {
4229 let testdata = arrow::util::test_util::parquet_test_data();
4230 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4231 let test_file = File::open(&path).unwrap();
4232
4233 let mut serial_reader =
4234 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4235 let data = serial_reader.next().unwrap().unwrap();
4236
4237 let do_test = |batch_size: usize, selection_len: usize| {
4238 for skip_first in [false, true] {
4239 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4240
4241 let expected = get_expected_batches(&data, &selections, batch_size);
4242 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4243 assert_eq!(
4244 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4245 expected,
4246 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4247 );
4248 }
4249 };
4250
4251 do_test(1000, 1000);
4254
4255 do_test(20, 20);
4257
4258 do_test(20, 5);
4260
4261 do_test(20, 5);
4264
4265 fn create_skip_reader(
4266 test_file: &File,
4267 batch_size: usize,
4268 selections: RowSelection,
4269 ) -> ParquetRecordBatchReader {
4270 let options = ArrowReaderOptions::new().with_page_index(true);
4271 let file = test_file.try_clone().unwrap();
4272 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4273 .unwrap()
4274 .with_batch_size(batch_size)
4275 .with_row_selection(selections)
4276 .build()
4277 .unwrap()
4278 }
4279 }
4280
4281 #[test]
4282 fn test_batch_size_overallocate() {
4283 let testdata = arrow::util::test_util::parquet_test_data();
4284 let path = format!("{testdata}/alltypes_plain.parquet");
4286 let test_file = File::open(path).unwrap();
4287
4288 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4289 let num_rows = builder.metadata.file_metadata().num_rows();
4290 let reader = builder
4291 .with_batch_size(1024)
4292 .with_projection(ProjectionMask::all())
4293 .build()
4294 .unwrap();
4295 assert_ne!(1024, num_rows);
4296 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4297 }
4298
4299 #[test]
4300 fn test_read_with_page_index_enabled() {
4301 let testdata = arrow::util::test_util::parquet_test_data();
4302
4303 {
4304 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4306 let test_file = File::open(path).unwrap();
4307 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4308 test_file,
4309 ArrowReaderOptions::new().with_page_index(true),
4310 )
4311 .unwrap();
4312 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4313 let reader = builder.build().unwrap();
4314 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4315 assert_eq!(batches.len(), 8);
4316 }
4317
4318 {
4319 let path = format!("{testdata}/alltypes_plain.parquet");
4321 let test_file = File::open(path).unwrap();
4322 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4323 test_file,
4324 ArrowReaderOptions::new().with_page_index(true),
4325 )
4326 .unwrap();
4327 assert!(builder.metadata().offset_index().is_none());
4330 let reader = builder.build().unwrap();
4331 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4332 assert_eq!(batches.len(), 1);
4333 }
4334 }
4335
4336 #[test]
4337 fn test_raw_repetition() {
4338 const MESSAGE_TYPE: &str = "
4339 message Log {
4340 OPTIONAL INT32 eventType;
4341 REPEATED INT32 category;
4342 REPEATED group filter {
4343 OPTIONAL INT32 error;
4344 }
4345 }
4346 ";
4347 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4348 let props = Default::default();
4349
4350 let mut buf = Vec::with_capacity(1024);
4351 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4352 let mut row_group_writer = writer.next_row_group().unwrap();
4353
4354 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4356 col_writer
4357 .typed::<Int32Type>()
4358 .write_batch(&[1], Some(&[1]), None)
4359 .unwrap();
4360 col_writer.close().unwrap();
4361 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4363 col_writer
4364 .typed::<Int32Type>()
4365 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4366 .unwrap();
4367 col_writer.close().unwrap();
4368 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4370 col_writer
4371 .typed::<Int32Type>()
4372 .write_batch(&[1], Some(&[1]), Some(&[0]))
4373 .unwrap();
4374 col_writer.close().unwrap();
4375
4376 let rg_md = row_group_writer.close().unwrap();
4377 assert_eq!(rg_md.num_rows(), 1);
4378 writer.close().unwrap();
4379
4380 let bytes = Bytes::from(buf);
4381
4382 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4383 let full = no_mask.next().unwrap().unwrap();
4384
4385 assert_eq!(full.num_columns(), 3);
4386
4387 for idx in 0..3 {
4388 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4389 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4390 let mut reader = b.with_projection(mask).build().unwrap();
4391 let projected = reader.next().unwrap().unwrap();
4392
4393 assert_eq!(projected.num_columns(), 1);
4394 assert_eq!(full.column(idx), projected.column(0));
4395 }
4396 }
4397
4398 #[test]
4399 fn test_read_lz4_raw() {
4400 let testdata = arrow::util::test_util::parquet_test_data();
4401 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4402 let file = File::open(path).unwrap();
4403
4404 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4405 .unwrap()
4406 .collect::<Result<Vec<_>, _>>()
4407 .unwrap();
4408 assert_eq!(batches.len(), 1);
4409 let batch = &batches[0];
4410
4411 assert_eq!(batch.num_columns(), 3);
4412 assert_eq!(batch.num_rows(), 4);
4413
4414 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4416 assert_eq!(
4417 a.values(),
4418 &[1593604800, 1593604800, 1593604801, 1593604801]
4419 );
4420
4421 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4422 let a: Vec<_> = a.iter().flatten().collect();
4423 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4424
4425 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4426 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4427 }
4428
4429 #[test]
4439 fn test_read_lz4_hadoop_fallback() {
4440 for file in [
4441 "hadoop_lz4_compressed.parquet",
4442 "non_hadoop_lz4_compressed.parquet",
4443 ] {
4444 let testdata = arrow::util::test_util::parquet_test_data();
4445 let path = format!("{testdata}/{file}");
4446 let file = File::open(path).unwrap();
4447 let expected_rows = 4;
4448
4449 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4450 .unwrap()
4451 .collect::<Result<Vec<_>, _>>()
4452 .unwrap();
4453 assert_eq!(batches.len(), 1);
4454 let batch = &batches[0];
4455
4456 assert_eq!(batch.num_columns(), 3);
4457 assert_eq!(batch.num_rows(), expected_rows);
4458
4459 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4460 assert_eq!(
4461 a.values(),
4462 &[1593604800, 1593604800, 1593604801, 1593604801]
4463 );
4464
4465 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4466 let b: Vec<_> = b.iter().flatten().collect();
4467 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4468
4469 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4470 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4471 }
4472 }
4473
4474 #[test]
4475 fn test_read_lz4_hadoop_large() {
4476 let testdata = arrow::util::test_util::parquet_test_data();
4477 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4478 let file = File::open(path).unwrap();
4479 let expected_rows = 10000;
4480
4481 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4482 .unwrap()
4483 .collect::<Result<Vec<_>, _>>()
4484 .unwrap();
4485 assert_eq!(batches.len(), 1);
4486 let batch = &batches[0];
4487
4488 assert_eq!(batch.num_columns(), 1);
4489 assert_eq!(batch.num_rows(), expected_rows);
4490
4491 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4492 let a: Vec<_> = a.iter().flatten().collect();
4493 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4494 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4495 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4496 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4497 }
4498
4499 #[test]
4500 #[cfg(feature = "snap")]
4501 fn test_read_nested_lists() {
4502 let testdata = arrow::util::test_util::parquet_test_data();
4503 let path = format!("{testdata}/nested_lists.snappy.parquet");
4504 let file = File::open(path).unwrap();
4505
4506 let f = file.try_clone().unwrap();
4507 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4508 let expected = reader.next().unwrap().unwrap();
4509 assert_eq!(expected.num_rows(), 3);
4510
4511 let selection = RowSelection::from(vec![
4512 RowSelector::skip(1),
4513 RowSelector::select(1),
4514 RowSelector::skip(1),
4515 ]);
4516 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4517 .unwrap()
4518 .with_row_selection(selection)
4519 .build()
4520 .unwrap();
4521
4522 let actual = reader.next().unwrap().unwrap();
4523 assert_eq!(actual.num_rows(), 1);
4524 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4525 }
4526
4527 #[test]
4528 fn test_arbitrary_decimal() {
4529 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4530 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4531 .with_precision_and_scale(19, 0)
4532 .unwrap();
4533 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4534 .with_precision_and_scale(12, 0)
4535 .unwrap();
4536 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4537 .with_precision_and_scale(17, 10)
4538 .unwrap();
4539
4540 let written = RecordBatch::try_from_iter([
4541 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4542 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4543 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4544 ])
4545 .unwrap();
4546
4547 let mut buffer = Vec::with_capacity(1024);
4548 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4549 writer.write(&written).unwrap();
4550 writer.close().unwrap();
4551
4552 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4553 .unwrap()
4554 .collect::<Result<Vec<_>, _>>()
4555 .unwrap();
4556
4557 assert_eq!(&written.slice(0, 8), &read[0]);
4558 }
4559
4560 #[test]
4561 fn test_list_skip() {
4562 let mut list = ListBuilder::new(Int32Builder::new());
4563 list.append_value([Some(1), Some(2)]);
4564 list.append_value([Some(3)]);
4565 list.append_value([Some(4)]);
4566 let list = list.finish();
4567 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4568
4569 let props = WriterProperties::builder()
4571 .set_data_page_row_count_limit(1)
4572 .set_write_batch_size(2)
4573 .build();
4574
4575 let mut buffer = Vec::with_capacity(1024);
4576 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4577 writer.write(&batch).unwrap();
4578 writer.close().unwrap();
4579
4580 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4581 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4582 .unwrap()
4583 .with_row_selection(selection.into())
4584 .build()
4585 .unwrap();
4586 let out = reader.next().unwrap().unwrap();
4587 assert_eq!(out.num_rows(), 1);
4588 assert_eq!(out, batch.slice(2, 1));
4589 }
4590
4591 fn test_decimal32_roundtrip() {
4592 let d = |values: Vec<i32>, p: u8| {
4593 let iter = values.into_iter();
4594 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4595 .with_precision_and_scale(p, 2)
4596 .unwrap()
4597 };
4598
4599 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4600 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4601
4602 let mut buffer = Vec::with_capacity(1024);
4603 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4604 writer.write(&batch).unwrap();
4605 writer.close().unwrap();
4606
4607 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4608 let t1 = builder.parquet_schema().columns()[0].physical_type();
4609 assert_eq!(t1, PhysicalType::INT32);
4610
4611 let mut reader = builder.build().unwrap();
4612 assert_eq!(batch.schema(), reader.schema());
4613
4614 let out = reader.next().unwrap().unwrap();
4615 assert_eq!(batch, out);
4616 }
4617
4618 fn test_decimal64_roundtrip() {
4619 let d = |values: Vec<i64>, p: u8| {
4623 let iter = values.into_iter();
4624 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4625 .with_precision_and_scale(p, 2)
4626 .unwrap()
4627 };
4628
4629 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4630 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4631 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4632
4633 let batch = RecordBatch::try_from_iter([
4634 ("d1", Arc::new(d1) as ArrayRef),
4635 ("d2", Arc::new(d2) as ArrayRef),
4636 ("d3", Arc::new(d3) as ArrayRef),
4637 ])
4638 .unwrap();
4639
4640 let mut buffer = Vec::with_capacity(1024);
4641 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4642 writer.write(&batch).unwrap();
4643 writer.close().unwrap();
4644
4645 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4646 let t1 = builder.parquet_schema().columns()[0].physical_type();
4647 assert_eq!(t1, PhysicalType::INT32);
4648 let t2 = builder.parquet_schema().columns()[1].physical_type();
4649 assert_eq!(t2, PhysicalType::INT64);
4650 let t3 = builder.parquet_schema().columns()[2].physical_type();
4651 assert_eq!(t3, PhysicalType::INT64);
4652
4653 let mut reader = builder.build().unwrap();
4654 assert_eq!(batch.schema(), reader.schema());
4655
4656 let out = reader.next().unwrap().unwrap();
4657 assert_eq!(batch, out);
4658 }
4659
4660 fn test_decimal_roundtrip<T: DecimalType>() {
4661 let d = |values: Vec<usize>, p: u8| {
4666 let iter = values.into_iter().map(T::Native::usize_as);
4667 PrimitiveArray::<T>::from_iter_values(iter)
4668 .with_precision_and_scale(p, 2)
4669 .unwrap()
4670 };
4671
4672 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4673 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4674 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4675 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4676
4677 let batch = RecordBatch::try_from_iter([
4678 ("d1", Arc::new(d1) as ArrayRef),
4679 ("d2", Arc::new(d2) as ArrayRef),
4680 ("d3", Arc::new(d3) as ArrayRef),
4681 ("d4", Arc::new(d4) as ArrayRef),
4682 ])
4683 .unwrap();
4684
4685 let mut buffer = Vec::with_capacity(1024);
4686 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4687 writer.write(&batch).unwrap();
4688 writer.close().unwrap();
4689
4690 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4691 let t1 = builder.parquet_schema().columns()[0].physical_type();
4692 assert_eq!(t1, PhysicalType::INT32);
4693 let t2 = builder.parquet_schema().columns()[1].physical_type();
4694 assert_eq!(t2, PhysicalType::INT64);
4695 let t3 = builder.parquet_schema().columns()[2].physical_type();
4696 assert_eq!(t3, PhysicalType::INT64);
4697 let t4 = builder.parquet_schema().columns()[3].physical_type();
4698 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4699
4700 let mut reader = builder.build().unwrap();
4701 assert_eq!(batch.schema(), reader.schema());
4702
4703 let out = reader.next().unwrap().unwrap();
4704 assert_eq!(batch, out);
4705 }
4706
4707 #[test]
4708 fn test_decimal() {
4709 test_decimal32_roundtrip();
4710 test_decimal64_roundtrip();
4711 test_decimal_roundtrip::<Decimal128Type>();
4712 test_decimal_roundtrip::<Decimal256Type>();
4713 }
4714
4715 #[test]
4716 fn test_list_selection() {
4717 let schema = Arc::new(Schema::new(vec![Field::new_list(
4718 "list",
4719 Field::new_list_field(ArrowDataType::Utf8, true),
4720 false,
4721 )]));
4722 let mut buf = Vec::with_capacity(1024);
4723
4724 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4725
4726 for i in 0..2 {
4727 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4728 for j in 0..1024 {
4729 list_a_builder.values().append_value(format!("{i} {j}"));
4730 list_a_builder.append(true);
4731 }
4732 let batch =
4733 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4734 .unwrap();
4735 writer.write(&batch).unwrap();
4736 }
4737 let _metadata = writer.close().unwrap();
4738
4739 let buf = Bytes::from(buf);
4740 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4741 .unwrap()
4742 .with_row_selection(RowSelection::from(vec![
4743 RowSelector::skip(100),
4744 RowSelector::select(924),
4745 RowSelector::skip(100),
4746 RowSelector::select(924),
4747 ]))
4748 .build()
4749 .unwrap();
4750
4751 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4752 let batch = concat_batches(&schema, &batches).unwrap();
4753
4754 assert_eq!(batch.num_rows(), 924 * 2);
4755 let list = batch.column(0).as_list::<i32>();
4756
4757 for w in list.value_offsets().windows(2) {
4758 assert_eq!(w[0] + 1, w[1])
4759 }
4760 let mut values = list.values().as_string::<i32>().iter();
4761
4762 for i in 0..2 {
4763 for j in 100..1024 {
4764 let expected = format!("{i} {j}");
4765 assert_eq!(values.next().unwrap().unwrap(), &expected);
4766 }
4767 }
4768 }
4769
4770 #[test]
4771 fn test_list_selection_fuzz() {
4772 let mut rng = rng();
4773 let schema = Arc::new(Schema::new(vec![Field::new_list(
4774 "list",
4775 Field::new_list(
4776 Field::LIST_FIELD_DEFAULT_NAME,
4777 Field::new_list_field(ArrowDataType::Int32, true),
4778 true,
4779 ),
4780 true,
4781 )]));
4782 let mut buf = Vec::with_capacity(1024);
4783 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4784
4785 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4786
4787 for _ in 0..2048 {
4788 if rng.random_bool(0.2) {
4789 list_a_builder.append(false);
4790 continue;
4791 }
4792
4793 let list_a_len = rng.random_range(0..10);
4794 let list_b_builder = list_a_builder.values();
4795
4796 for _ in 0..list_a_len {
4797 if rng.random_bool(0.2) {
4798 list_b_builder.append(false);
4799 continue;
4800 }
4801
4802 let list_b_len = rng.random_range(0..10);
4803 let int_builder = list_b_builder.values();
4804 for _ in 0..list_b_len {
4805 match rng.random_bool(0.2) {
4806 true => int_builder.append_null(),
4807 false => int_builder.append_value(rng.random()),
4808 }
4809 }
4810 list_b_builder.append(true)
4811 }
4812 list_a_builder.append(true);
4813 }
4814
4815 let array = Arc::new(list_a_builder.finish());
4816 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4817
4818 writer.write(&batch).unwrap();
4819 let _metadata = writer.close().unwrap();
4820
4821 let buf = Bytes::from(buf);
4822
4823 let cases = [
4824 vec![
4825 RowSelector::skip(100),
4826 RowSelector::select(924),
4827 RowSelector::skip(100),
4828 RowSelector::select(924),
4829 ],
4830 vec![
4831 RowSelector::select(924),
4832 RowSelector::skip(100),
4833 RowSelector::select(924),
4834 RowSelector::skip(100),
4835 ],
4836 vec![
4837 RowSelector::skip(1023),
4838 RowSelector::select(1),
4839 RowSelector::skip(1023),
4840 RowSelector::select(1),
4841 ],
4842 vec![
4843 RowSelector::select(1),
4844 RowSelector::skip(1023),
4845 RowSelector::select(1),
4846 RowSelector::skip(1023),
4847 ],
4848 ];
4849
4850 for batch_size in [100, 1024, 2048] {
4851 for selection in &cases {
4852 let selection = RowSelection::from(selection.clone());
4853 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4854 .unwrap()
4855 .with_row_selection(selection.clone())
4856 .with_batch_size(batch_size)
4857 .build()
4858 .unwrap();
4859
4860 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4861 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4862 assert_eq!(actual.num_rows(), selection.row_count());
4863
4864 let mut batch_offset = 0;
4865 let mut actual_offset = 0;
4866 for selector in selection.iter() {
4867 if selector.skip {
4868 batch_offset += selector.row_count;
4869 continue;
4870 }
4871
4872 assert_eq!(
4873 batch.slice(batch_offset, selector.row_count),
4874 actual.slice(actual_offset, selector.row_count)
4875 );
4876
4877 batch_offset += selector.row_count;
4878 actual_offset += selector.row_count;
4879 }
4880 }
4881 }
4882 }
4883
4884 #[test]
4885 fn test_read_old_nested_list() {
4886 use arrow::datatypes::DataType;
4887 use arrow::datatypes::ToByteSlice;
4888
4889 let testdata = arrow::util::test_util::parquet_test_data();
4890 let path = format!("{testdata}/old_list_structure.parquet");
4899 let test_file = File::open(path).unwrap();
4900
4901 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4903
4904 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4906
4907 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4909 "array",
4910 DataType::Int32,
4911 false,
4912 ))))
4913 .len(2)
4914 .add_buffer(a_value_offsets)
4915 .add_child_data(a_values.into_data())
4916 .build()
4917 .unwrap();
4918 let a = ListArray::from(a_list_data);
4919
4920 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4921 let mut reader = builder.build().unwrap();
4922 let out = reader.next().unwrap().unwrap();
4923 assert_eq!(out.num_rows(), 1);
4924 assert_eq!(out.num_columns(), 1);
4925 let c0 = out.column(0);
4927 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4928 let r0 = c0arr.value(0);
4930 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4931 assert_eq!(r0arr, &a);
4932 }
4933
4934 #[test]
4935 fn test_map_no_value() {
4936 let testdata = arrow::util::test_util::parquet_test_data();
4956 let path = format!("{testdata}/map_no_value.parquet");
4957 let file = File::open(path).unwrap();
4958
4959 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4960 .unwrap()
4961 .build()
4962 .unwrap();
4963 let out = reader.next().unwrap().unwrap();
4964 assert_eq!(out.num_rows(), 3);
4965 assert_eq!(out.num_columns(), 3);
4966 let c0 = out.column(1).as_list::<i32>();
4968 let c1 = out.column(2).as_list::<i32>();
4969 assert_eq!(c0.len(), c1.len());
4970 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4971 }
4972
4973 #[test]
4974 fn test_get_row_group_column_bloom_filter_with_length() {
4975 let testdata = arrow::util::test_util::parquet_test_data();
4977 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4978 let file = File::open(path).unwrap();
4979 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4980 let schema = builder.schema().clone();
4981 let reader = builder.build().unwrap();
4982
4983 let mut parquet_data = Vec::new();
4984 let props = WriterProperties::builder()
4985 .set_bloom_filter_enabled(true)
4986 .build();
4987 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4988 for batch in reader {
4989 let batch = batch.unwrap();
4990 writer.write(&batch).unwrap();
4991 }
4992 writer.close().unwrap();
4993
4994 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4996 }
4997
4998 #[test]
4999 fn test_get_row_group_column_bloom_filter_without_length() {
5000 let testdata = arrow::util::test_util::parquet_test_data();
5001 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5002 let data = Bytes::from(std::fs::read(path).unwrap());
5003 test_get_row_group_column_bloom_filter(data, false);
5004 }
5005
5006 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5007 let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5008
5009 let metadata = builder.metadata();
5010 assert_eq!(metadata.num_row_groups(), 1);
5011 let row_group = metadata.row_group(0);
5012 let column = row_group.column(0);
5013 assert_eq!(column.bloom_filter_length().is_some(), with_length);
5014
5015 let sbbf = builder
5016 .get_row_group_column_bloom_filter(0, 0)
5017 .unwrap()
5018 .unwrap();
5019 assert!(sbbf.check(&"Hello"));
5020 assert!(!sbbf.check(&"Hello_Not_Exists"));
5021 }
5022}