1use arrow_array::Array;
21use arrow_array::cast::AsArray;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields};
32use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
33use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
34use crate::bloom_filter::{
35 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
36};
37use crate::column::page::{PageIterator, PageReader};
38#[cfg(feature = "encryption")]
39use crate::encryption::decrypt::FileDecryptionProperties;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
42use crate::file::reader::{ChunkReader, SerializedPageReader};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54pub struct ArrowReaderBuilder<T> {
100 pub(crate) input: T,
101
102 pub(crate) metadata: Arc<ParquetMetaData>,
103
104 pub(crate) schema: SchemaRef,
105
106 pub(crate) fields: Option<Arc<ParquetField>>,
107
108 pub(crate) batch_size: usize,
109
110 pub(crate) row_groups: Option<Vec<usize>>,
111
112 pub(crate) projection: ProjectionMask,
113
114 pub(crate) filter: Option<RowFilter>,
115
116 pub(crate) selection: Option<RowSelection>,
117
118 pub(crate) limit: Option<usize>,
119
120 pub(crate) offset: Option<usize>,
121
122 pub(crate) metrics: ArrowReaderMetrics,
123
124 pub(crate) max_predicate_cache_size: usize,
125}
126
127impl<T: Debug> Debug for ArrowReaderBuilder<T> {
128 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("ArrowReaderBuilder<T>")
130 .field("input", &self.input)
131 .field("metadata", &self.metadata)
132 .field("schema", &self.schema)
133 .field("fields", &self.fields)
134 .field("batch_size", &self.batch_size)
135 .field("row_groups", &self.row_groups)
136 .field("projection", &self.projection)
137 .field("filter", &self.filter)
138 .field("selection", &self.selection)
139 .field("limit", &self.limit)
140 .field("offset", &self.offset)
141 .field("metrics", &self.metrics)
142 .finish()
143 }
144}
145
146impl<T> ArrowReaderBuilder<T> {
147 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
148 Self {
149 input,
150 metadata: metadata.metadata,
151 schema: metadata.schema,
152 fields: metadata.fields,
153 batch_size: 1024,
154 row_groups: None,
155 projection: ProjectionMask::all(),
156 filter: None,
157 selection: None,
158 limit: None,
159 offset: None,
160 metrics: ArrowReaderMetrics::Disabled,
161 max_predicate_cache_size: 100 * 1024 * 1024, }
163 }
164
165 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
167 &self.metadata
168 }
169
170 pub fn parquet_schema(&self) -> &SchemaDescriptor {
172 self.metadata.file_metadata().schema_descr()
173 }
174
175 pub fn schema(&self) -> &SchemaRef {
177 &self.schema
178 }
179
180 pub fn with_batch_size(self, batch_size: usize) -> Self {
183 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
185 Self { batch_size, ..self }
186 }
187
188 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
192 Self {
193 row_groups: Some(row_groups),
194 ..self
195 }
196 }
197
198 pub fn with_projection(self, mask: ProjectionMask) -> Self {
200 Self {
201 projection: mask,
202 ..self
203 }
204 }
205
206 pub fn with_row_selection(self, selection: RowSelection) -> Self {
266 Self {
267 selection: Some(selection),
268 ..self
269 }
270 }
271
272 pub fn with_row_filter(self, filter: RowFilter) -> Self {
279 Self {
280 filter: Some(filter),
281 ..self
282 }
283 }
284
285 pub fn with_limit(self, limit: usize) -> Self {
293 Self {
294 limit: Some(limit),
295 ..self
296 }
297 }
298
299 pub fn with_offset(self, offset: usize) -> Self {
307 Self {
308 offset: Some(offset),
309 ..self
310 }
311 }
312
313 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
348 Self { metrics, ..self }
349 }
350
351 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
366 Self {
367 max_predicate_cache_size,
368 ..self
369 }
370 }
371}
372
373#[derive(Debug, Clone, Default)]
378pub struct ArrowReaderOptions {
379 skip_arrow_metadata: bool,
381 supplied_schema: Option<SchemaRef>,
386 pub(crate) page_index_policy: PageIndexPolicy,
388 #[cfg(feature = "encryption")]
390 pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
391}
392
393impl ArrowReaderOptions {
394 pub fn new() -> Self {
396 Self::default()
397 }
398
399 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
406 Self {
407 skip_arrow_metadata,
408 ..self
409 }
410 }
411
412 pub fn with_schema(self, schema: SchemaRef) -> Self {
469 Self {
470 supplied_schema: Some(schema),
471 skip_arrow_metadata: true,
472 ..self
473 }
474 }
475
476 pub fn with_page_index(self, page_index: bool) -> Self {
489 let page_index_policy = PageIndexPolicy::from(page_index);
490
491 Self {
492 page_index_policy,
493 ..self
494 }
495 }
496
497 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
499 Self {
500 page_index_policy: policy,
501 ..self
502 }
503 }
504
505 #[cfg(feature = "encryption")]
509 pub fn with_file_decryption_properties(
510 self,
511 file_decryption_properties: FileDecryptionProperties,
512 ) -> Self {
513 Self {
514 file_decryption_properties: Some(file_decryption_properties),
515 ..self
516 }
517 }
518
519 pub fn page_index(&self) -> bool {
523 self.page_index_policy != PageIndexPolicy::Skip
524 }
525
526 #[cfg(feature = "encryption")]
531 pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
532 self.file_decryption_properties.as_ref()
533 }
534}
535
536#[derive(Debug, Clone)]
551pub struct ArrowReaderMetadata {
552 pub(crate) metadata: Arc<ParquetMetaData>,
554 pub(crate) schema: SchemaRef,
556
557 pub(crate) fields: Option<Arc<ParquetField>>,
558}
559
560impl ArrowReaderMetadata {
561 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
572 let metadata =
573 ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
574 #[cfg(feature = "encryption")]
575 let metadata =
576 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
577 let metadata = metadata.parse_and_finish(reader)?;
578 Self::try_new(Arc::new(metadata), options)
579 }
580
581 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
588 match options.supplied_schema {
589 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
590 None => {
591 let kv_metadata = match options.skip_arrow_metadata {
592 true => None,
593 false => metadata.file_metadata().key_value_metadata(),
594 };
595
596 let (schema, fields) = parquet_to_arrow_schema_and_fields(
597 metadata.file_metadata().schema_descr(),
598 ProjectionMask::all(),
599 kv_metadata,
600 )?;
601
602 Ok(Self {
603 metadata,
604 schema: Arc::new(schema),
605 fields: fields.map(Arc::new),
606 })
607 }
608 }
609 }
610
611 fn with_supplied_schema(
612 metadata: Arc<ParquetMetaData>,
613 supplied_schema: SchemaRef,
614 ) -> Result<Self> {
615 let parquet_schema = metadata.file_metadata().schema_descr();
616 let field_levels = parquet_to_arrow_field_levels(
617 parquet_schema,
618 ProjectionMask::all(),
619 Some(supplied_schema.fields()),
620 )?;
621 let fields = field_levels.fields;
622 let inferred_len = fields.len();
623 let supplied_len = supplied_schema.fields().len();
624 if inferred_len != supplied_len {
628 return Err(arrow_err!(format!(
629 "Incompatible supplied Arrow schema: expected {} columns received {}",
630 inferred_len, supplied_len
631 )));
632 }
633
634 let mut errors = Vec::new();
635
636 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
637
638 for (field1, field2) in field_iter {
639 if field1.data_type() != field2.data_type() {
640 errors.push(format!(
641 "data type mismatch for field {}: requested {} but found {}",
642 field1.name(),
643 field1.data_type(),
644 field2.data_type()
645 ));
646 }
647 if field1.is_nullable() != field2.is_nullable() {
648 errors.push(format!(
649 "nullability mismatch for field {}: expected {:?} but found {:?}",
650 field1.name(),
651 field1.is_nullable(),
652 field2.is_nullable()
653 ));
654 }
655 if field1.metadata() != field2.metadata() {
656 errors.push(format!(
657 "metadata mismatch for field {}: expected {:?} but found {:?}",
658 field1.name(),
659 field1.metadata(),
660 field2.metadata()
661 ));
662 }
663 }
664
665 if !errors.is_empty() {
666 let message = errors.join(", ");
667 return Err(ParquetError::ArrowError(format!(
668 "Incompatible supplied Arrow schema: {message}",
669 )));
670 }
671
672 Ok(Self {
673 metadata,
674 schema: supplied_schema,
675 fields: field_levels.levels.map(Arc::new),
676 })
677 }
678
679 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
681 &self.metadata
682 }
683
684 pub fn parquet_schema(&self) -> &SchemaDescriptor {
686 self.metadata.file_metadata().schema_descr()
687 }
688
689 pub fn schema(&self) -> &SchemaRef {
691 &self.schema
692 }
693}
694
695#[doc(hidden)]
696pub struct SyncReader<T: ChunkReader>(T);
698
699impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
700 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701 f.debug_tuple("SyncReader").field(&self.0).finish()
702 }
703}
704
705pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
711
712impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
713 pub fn try_new(reader: T) -> Result<Self> {
742 Self::try_new_with_options(reader, Default::default())
743 }
744
745 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
747 let metadata = ArrowReaderMetadata::load(&reader, options)?;
748 Ok(Self::new_with_metadata(reader, metadata))
749 }
750
751 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
789 Self::new_builder(SyncReader(input), metadata)
790 }
791
792 pub fn get_row_group_column_bloom_filter(
798 &self,
799 row_group_idx: usize,
800 column_idx: usize,
801 ) -> Result<Option<Sbbf>> {
802 let metadata = self.metadata.row_group(row_group_idx);
803 let column_metadata = metadata.column(column_idx);
804
805 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
806 offset
807 .try_into()
808 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
809 } else {
810 return Ok(None);
811 };
812
813 let buffer = match column_metadata.bloom_filter_length() {
814 Some(length) => self.input.0.get_bytes(offset, length as usize),
815 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
816 }?;
817
818 let (header, bitset_offset) =
819 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
820
821 match header.algorithm {
822 BloomFilterAlgorithm::BLOCK => {
823 }
825 }
826 match header.compression {
827 BloomFilterCompression::UNCOMPRESSED => {
828 }
830 }
831 match header.hash {
832 BloomFilterHash::XXHASH => {
833 }
835 }
836
837 let bitset = match column_metadata.bloom_filter_length() {
838 Some(_) => buffer.slice(
839 (TryInto::<usize>::try_into(bitset_offset).unwrap()
840 - TryInto::<usize>::try_into(offset).unwrap())..,
841 ),
842 None => {
843 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
844 ParquetError::General("Bloom filter length is invalid".to_string())
845 })?;
846 self.input.0.get_bytes(bitset_offset, bitset_length)?
847 }
848 };
849 Ok(Some(Sbbf::new(&bitset)))
850 }
851
852 pub fn build(self) -> Result<ParquetRecordBatchReader> {
856 let Self {
857 input,
858 metadata,
859 schema: _,
860 fields,
861 batch_size: _,
862 row_groups,
863 projection,
864 mut filter,
865 selection,
866 limit,
867 offset,
868 metrics,
869 max_predicate_cache_size: _,
871 } = self;
872
873 let batch_size = self
875 .batch_size
876 .min(metadata.file_metadata().num_rows() as usize);
877
878 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
879
880 let reader = ReaderRowGroups {
881 reader: Arc::new(input.0),
882 metadata,
883 row_groups,
884 };
885
886 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
887
888 if let Some(filter) = filter.as_mut() {
890 for predicate in filter.predicates.iter_mut() {
891 if !plan_builder.selects_any() {
893 break;
894 }
895
896 let mut cache_projection = predicate.projection().clone();
897 cache_projection.intersect(&projection);
898
899 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
900 .build_array_reader(fields.as_deref(), predicate.projection())?;
901
902 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
903 }
904 }
905
906 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
907 .build_array_reader(fields.as_deref(), &projection)?;
908
909 let read_plan = plan_builder
910 .limited(reader.num_rows())
911 .with_offset(offset)
912 .with_limit(limit)
913 .build_limited()
914 .build();
915
916 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
917 }
918}
919
920struct ReaderRowGroups<T: ChunkReader> {
921 reader: Arc<T>,
922
923 metadata: Arc<ParquetMetaData>,
924 row_groups: Vec<usize>,
926}
927
928impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
929 fn num_rows(&self) -> usize {
930 let meta = self.metadata.row_groups();
931 self.row_groups
932 .iter()
933 .map(|x| meta[*x].num_rows() as usize)
934 .sum()
935 }
936
937 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
938 Ok(Box::new(ReaderPageIterator {
939 column_idx: i,
940 reader: self.reader.clone(),
941 metadata: self.metadata.clone(),
942 row_groups: self.row_groups.clone().into_iter(),
943 }))
944 }
945}
946
947struct ReaderPageIterator<T: ChunkReader> {
948 reader: Arc<T>,
949 column_idx: usize,
950 row_groups: std::vec::IntoIter<usize>,
951 metadata: Arc<ParquetMetaData>,
952}
953
954impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
955 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
957 let rg = self.metadata.row_group(rg_idx);
958 let column_chunk_metadata = rg.column(self.column_idx);
959 let offset_index = self.metadata.offset_index();
960 let page_locations = offset_index
963 .filter(|i| !i[rg_idx].is_empty())
964 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
965 let total_rows = rg.num_rows() as usize;
966 let reader = self.reader.clone();
967
968 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
969 .add_crypto_context(
970 rg_idx,
971 self.column_idx,
972 self.metadata.as_ref(),
973 column_chunk_metadata,
974 )
975 }
976}
977
978impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
979 type Item = Result<Box<dyn PageReader>>;
980
981 fn next(&mut self) -> Option<Self::Item> {
982 let rg_idx = self.row_groups.next()?;
983 let page_reader = self
984 .next_page_reader(rg_idx)
985 .map(|page_reader| Box::new(page_reader) as _);
986 Some(page_reader)
987 }
988}
989
990impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
991
992pub struct ParquetRecordBatchReader {
995 array_reader: Box<dyn ArrayReader>,
996 schema: SchemaRef,
997 read_plan: ReadPlan,
998}
999
1000impl Iterator for ParquetRecordBatchReader {
1001 type Item = Result<RecordBatch, ArrowError>;
1002
1003 fn next(&mut self) -> Option<Self::Item> {
1004 self.next_inner()
1005 .map_err(|arrow_err| arrow_err.into())
1006 .transpose()
1007 }
1008}
1009
1010impl ParquetRecordBatchReader {
1011 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1017 let mut read_records = 0;
1018 let batch_size = self.batch_size();
1019 match self.read_plan.selection_mut() {
1020 Some(selection) => {
1021 while read_records < batch_size && !selection.is_empty() {
1022 let front = selection.pop_front().unwrap();
1023 if front.skip {
1024 let skipped = self.array_reader.skip_records(front.row_count)?;
1025
1026 if skipped != front.row_count {
1027 return Err(general_err!(
1028 "failed to skip rows, expected {}, got {}",
1029 front.row_count,
1030 skipped
1031 ));
1032 }
1033 continue;
1034 }
1035
1036 if front.row_count == 0 {
1039 continue;
1040 }
1041
1042 let need_read = batch_size - read_records;
1044 let to_read = match front.row_count.checked_sub(need_read) {
1045 Some(remaining) if remaining != 0 => {
1046 selection.push_front(RowSelector::select(remaining));
1049 need_read
1050 }
1051 _ => front.row_count,
1052 };
1053 match self.array_reader.read_records(to_read)? {
1054 0 => break,
1055 rec => read_records += rec,
1056 };
1057 }
1058 }
1059 None => {
1060 self.array_reader.read_records(batch_size)?;
1061 }
1062 };
1063
1064 let array = self.array_reader.consume_batch()?;
1065 let struct_array = array.as_struct_opt().ok_or_else(|| {
1066 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1067 })?;
1068
1069 Ok(if struct_array.len() > 0 {
1070 Some(RecordBatch::from(struct_array))
1071 } else {
1072 None
1073 })
1074 }
1075}
1076
1077impl RecordBatchReader for ParquetRecordBatchReader {
1078 fn schema(&self) -> SchemaRef {
1083 self.schema.clone()
1084 }
1085}
1086
1087impl ParquetRecordBatchReader {
1088 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1092 ParquetRecordBatchReaderBuilder::try_new(reader)?
1093 .with_batch_size(batch_size)
1094 .build()
1095 }
1096
1097 pub fn try_new_with_row_groups(
1102 levels: &FieldLevels,
1103 row_groups: &dyn RowGroups,
1104 batch_size: usize,
1105 selection: Option<RowSelection>,
1106 ) -> Result<Self> {
1107 let metrics = ArrowReaderMetrics::disabled();
1109 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1110 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1111
1112 let read_plan = ReadPlanBuilder::new(batch_size)
1113 .with_selection(selection)
1114 .build();
1115
1116 Ok(Self {
1117 array_reader,
1118 schema: Arc::new(Schema::new(levels.fields.clone())),
1119 read_plan,
1120 })
1121 }
1122
1123 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1127 let schema = match array_reader.get_data_type() {
1128 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1129 _ => unreachable!("Struct array reader's data type is not struct!"),
1130 };
1131
1132 Self {
1133 array_reader,
1134 schema: Arc::new(schema),
1135 read_plan,
1136 }
1137 }
1138
1139 #[inline(always)]
1140 pub(crate) fn batch_size(&self) -> usize {
1141 self.read_plan.batch_size()
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use std::cmp::min;
1148 use std::collections::{HashMap, VecDeque};
1149 use std::fmt::Formatter;
1150 use std::fs::File;
1151 use std::io::Seek;
1152 use std::path::PathBuf;
1153 use std::sync::Arc;
1154
1155 use arrow_array::builder::*;
1156 use arrow_array::cast::AsArray;
1157 use arrow_array::types::{
1158 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1159 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1160 Time64MicrosecondType,
1161 };
1162 use arrow_array::*;
1163 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, i256};
1164 use arrow_data::{ArrayData, ArrayDataBuilder};
1165 use arrow_schema::{
1166 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1167 };
1168 use arrow_select::concat::concat_batches;
1169 use bytes::Bytes;
1170 use half::f16;
1171 use num_traits::PrimInt;
1172 use rand::{Rng, RngCore, rng};
1173 use tempfile::tempfile;
1174
1175 use crate::arrow::arrow_reader::{
1176 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1177 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1178 };
1179 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1180 use crate::arrow::{ArrowWriter, ProjectionMask};
1181 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1182 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1183 use crate::data_type::{
1184 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1185 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1186 };
1187 use crate::errors::Result;
1188 use crate::file::metadata::ParquetMetaData;
1189 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1190 use crate::file::writer::SerializedFileWriter;
1191 use crate::schema::parser::parse_message_type;
1192 use crate::schema::types::{Type, TypePtr};
1193 use crate::util::test_common::rand_gen::RandGen;
1194
1195 #[test]
1196 fn test_arrow_reader_all_columns() {
1197 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1198
1199 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1200 let original_schema = Arc::clone(builder.schema());
1201 let reader = builder.build().unwrap();
1202
1203 assert_eq!(original_schema.fields(), reader.schema().fields());
1205 }
1206
1207 #[test]
1208 fn test_arrow_reader_single_column() {
1209 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1210
1211 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1212 let original_schema = Arc::clone(builder.schema());
1213
1214 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1215 let reader = builder.with_projection(mask).build().unwrap();
1216
1217 assert_eq!(1, reader.schema().fields().len());
1219 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1220 }
1221
1222 #[test]
1223 fn test_arrow_reader_single_column_by_name() {
1224 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1225
1226 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1227 let original_schema = Arc::clone(builder.schema());
1228
1229 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1230 let reader = builder.with_projection(mask).build().unwrap();
1231
1232 assert_eq!(1, reader.schema().fields().len());
1234 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1235 }
1236
1237 #[test]
1238 fn test_null_column_reader_test() {
1239 let mut file = tempfile::tempfile().unwrap();
1240
1241 let schema = "
1242 message message {
1243 OPTIONAL INT32 int32;
1244 }
1245 ";
1246 let schema = Arc::new(parse_message_type(schema).unwrap());
1247
1248 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1249 generate_single_column_file_with_data::<Int32Type>(
1250 &[vec![], vec![]],
1251 Some(&def_levels),
1252 file.try_clone().unwrap(), schema,
1254 Some(Field::new("int32", ArrowDataType::Null, true)),
1255 &Default::default(),
1256 )
1257 .unwrap();
1258
1259 file.rewind().unwrap();
1260
1261 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1262 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1263
1264 assert_eq!(batches.len(), 4);
1265 for batch in &batches[0..3] {
1266 assert_eq!(batch.num_rows(), 2);
1267 assert_eq!(batch.num_columns(), 1);
1268 assert_eq!(batch.column(0).null_count(), 2);
1269 }
1270
1271 assert_eq!(batches[3].num_rows(), 1);
1272 assert_eq!(batches[3].num_columns(), 1);
1273 assert_eq!(batches[3].column(0).null_count(), 1);
1274 }
1275
1276 #[test]
1277 fn test_primitive_single_column_reader_test() {
1278 run_single_column_reader_tests::<BoolType, _, BoolType>(
1279 2,
1280 ConvertedType::NONE,
1281 None,
1282 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1283 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1284 );
1285 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1286 2,
1287 ConvertedType::NONE,
1288 None,
1289 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1290 &[
1291 Encoding::PLAIN,
1292 Encoding::RLE_DICTIONARY,
1293 Encoding::DELTA_BINARY_PACKED,
1294 Encoding::BYTE_STREAM_SPLIT,
1295 ],
1296 );
1297 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1298 2,
1299 ConvertedType::NONE,
1300 None,
1301 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1302 &[
1303 Encoding::PLAIN,
1304 Encoding::RLE_DICTIONARY,
1305 Encoding::DELTA_BINARY_PACKED,
1306 Encoding::BYTE_STREAM_SPLIT,
1307 ],
1308 );
1309 run_single_column_reader_tests::<FloatType, _, FloatType>(
1310 2,
1311 ConvertedType::NONE,
1312 None,
1313 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1314 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1315 );
1316 }
1317
1318 #[test]
1319 fn test_unsigned_primitive_single_column_reader_test() {
1320 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1321 2,
1322 ConvertedType::UINT_32,
1323 Some(ArrowDataType::UInt32),
1324 |vals| {
1325 Arc::new(UInt32Array::from_iter(
1326 vals.iter().map(|x| x.map(|x| x as u32)),
1327 ))
1328 },
1329 &[
1330 Encoding::PLAIN,
1331 Encoding::RLE_DICTIONARY,
1332 Encoding::DELTA_BINARY_PACKED,
1333 ],
1334 );
1335 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1336 2,
1337 ConvertedType::UINT_64,
1338 Some(ArrowDataType::UInt64),
1339 |vals| {
1340 Arc::new(UInt64Array::from_iter(
1341 vals.iter().map(|x| x.map(|x| x as u64)),
1342 ))
1343 },
1344 &[
1345 Encoding::PLAIN,
1346 Encoding::RLE_DICTIONARY,
1347 Encoding::DELTA_BINARY_PACKED,
1348 ],
1349 );
1350 }
1351
1352 #[test]
1353 fn test_unsigned_roundtrip() {
1354 let schema = Arc::new(Schema::new(vec![
1355 Field::new("uint32", ArrowDataType::UInt32, true),
1356 Field::new("uint64", ArrowDataType::UInt64, true),
1357 ]));
1358
1359 let mut buf = Vec::with_capacity(1024);
1360 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1361
1362 let original = RecordBatch::try_new(
1363 schema,
1364 vec![
1365 Arc::new(UInt32Array::from_iter_values([
1366 0,
1367 i32::MAX as u32,
1368 u32::MAX,
1369 ])),
1370 Arc::new(UInt64Array::from_iter_values([
1371 0,
1372 i64::MAX as u64,
1373 u64::MAX,
1374 ])),
1375 ],
1376 )
1377 .unwrap();
1378
1379 writer.write(&original).unwrap();
1380 writer.close().unwrap();
1381
1382 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1383 let ret = reader.next().unwrap().unwrap();
1384 assert_eq!(ret, original);
1385
1386 ret.column(0)
1388 .as_any()
1389 .downcast_ref::<UInt32Array>()
1390 .unwrap();
1391
1392 ret.column(1)
1393 .as_any()
1394 .downcast_ref::<UInt64Array>()
1395 .unwrap();
1396 }
1397
1398 #[test]
1399 fn test_float16_roundtrip() -> Result<()> {
1400 let schema = Arc::new(Schema::new(vec![
1401 Field::new("float16", ArrowDataType::Float16, false),
1402 Field::new("float16-nullable", ArrowDataType::Float16, true),
1403 ]));
1404
1405 let mut buf = Vec::with_capacity(1024);
1406 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1407
1408 let original = RecordBatch::try_new(
1409 schema,
1410 vec![
1411 Arc::new(Float16Array::from_iter_values([
1412 f16::EPSILON,
1413 f16::MIN,
1414 f16::MAX,
1415 f16::NAN,
1416 f16::INFINITY,
1417 f16::NEG_INFINITY,
1418 f16::ONE,
1419 f16::NEG_ONE,
1420 f16::ZERO,
1421 f16::NEG_ZERO,
1422 f16::E,
1423 f16::PI,
1424 f16::FRAC_1_PI,
1425 ])),
1426 Arc::new(Float16Array::from(vec![
1427 None,
1428 None,
1429 None,
1430 Some(f16::NAN),
1431 Some(f16::INFINITY),
1432 Some(f16::NEG_INFINITY),
1433 None,
1434 None,
1435 None,
1436 None,
1437 None,
1438 None,
1439 Some(f16::FRAC_1_PI),
1440 ])),
1441 ],
1442 )?;
1443
1444 writer.write(&original)?;
1445 writer.close()?;
1446
1447 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1448 let ret = reader.next().unwrap()?;
1449 assert_eq!(ret, original);
1450
1451 ret.column(0).as_primitive::<Float16Type>();
1453 ret.column(1).as_primitive::<Float16Type>();
1454
1455 Ok(())
1456 }
1457
1458 #[test]
1459 fn test_time_utc_roundtrip() -> Result<()> {
1460 let schema = Arc::new(Schema::new(vec![
1461 Field::new(
1462 "time_millis",
1463 ArrowDataType::Time32(TimeUnit::Millisecond),
1464 true,
1465 )
1466 .with_metadata(HashMap::from_iter(vec![(
1467 "adjusted_to_utc".to_string(),
1468 "".to_string(),
1469 )])),
1470 Field::new(
1471 "time_micros",
1472 ArrowDataType::Time64(TimeUnit::Microsecond),
1473 true,
1474 )
1475 .with_metadata(HashMap::from_iter(vec![(
1476 "adjusted_to_utc".to_string(),
1477 "".to_string(),
1478 )])),
1479 ]));
1480
1481 let mut buf = Vec::with_capacity(1024);
1482 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1483
1484 let original = RecordBatch::try_new(
1485 schema,
1486 vec![
1487 Arc::new(Time32MillisecondArray::from(vec![
1488 Some(-1),
1489 Some(0),
1490 Some(86_399_000),
1491 Some(86_400_000),
1492 Some(86_401_000),
1493 None,
1494 ])),
1495 Arc::new(Time64MicrosecondArray::from(vec![
1496 Some(-1),
1497 Some(0),
1498 Some(86_399 * 1_000_000),
1499 Some(86_400 * 1_000_000),
1500 Some(86_401 * 1_000_000),
1501 None,
1502 ])),
1503 ],
1504 )?;
1505
1506 writer.write(&original)?;
1507 writer.close()?;
1508
1509 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1510 let ret = reader.next().unwrap()?;
1511 assert_eq!(ret, original);
1512
1513 ret.column(0).as_primitive::<Time32MillisecondType>();
1515 ret.column(1).as_primitive::<Time64MicrosecondType>();
1516
1517 Ok(())
1518 }
1519
1520 #[test]
1521 fn test_date32_roundtrip() -> Result<()> {
1522 use arrow_array::Date32Array;
1523
1524 let schema = Arc::new(Schema::new(vec![Field::new(
1525 "date32",
1526 ArrowDataType::Date32,
1527 false,
1528 )]));
1529
1530 let mut buf = Vec::with_capacity(1024);
1531
1532 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1533
1534 let original = RecordBatch::try_new(
1535 schema,
1536 vec![Arc::new(Date32Array::from(vec![
1537 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1538 ]))],
1539 )?;
1540
1541 writer.write(&original)?;
1542 writer.close()?;
1543
1544 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1545 let ret = reader.next().unwrap()?;
1546 assert_eq!(ret, original);
1547
1548 ret.column(0).as_primitive::<Date32Type>();
1550
1551 Ok(())
1552 }
1553
1554 #[test]
1555 fn test_date64_roundtrip() -> Result<()> {
1556 use arrow_array::Date64Array;
1557
1558 let schema = Arc::new(Schema::new(vec![
1559 Field::new("small-date64", ArrowDataType::Date64, false),
1560 Field::new("big-date64", ArrowDataType::Date64, false),
1561 Field::new("invalid-date64", ArrowDataType::Date64, false),
1562 ]));
1563
1564 let mut default_buf = Vec::with_capacity(1024);
1565 let mut coerce_buf = Vec::with_capacity(1024);
1566
1567 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1568
1569 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1570 let mut coerce_writer =
1571 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1572
1573 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1574
1575 let original = RecordBatch::try_new(
1576 schema,
1577 vec![
1578 Arc::new(Date64Array::from(vec![
1580 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1581 -1_000 * NUM_MILLISECONDS_IN_DAY,
1582 0,
1583 1_000 * NUM_MILLISECONDS_IN_DAY,
1584 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1585 ])),
1586 Arc::new(Date64Array::from(vec![
1588 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1589 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1590 0,
1591 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1592 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1593 ])),
1594 Arc::new(Date64Array::from(vec![
1596 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1597 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1598 1,
1599 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1600 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1601 ])),
1602 ],
1603 )?;
1604
1605 default_writer.write(&original)?;
1606 coerce_writer.write(&original)?;
1607
1608 default_writer.close()?;
1609 coerce_writer.close()?;
1610
1611 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1612 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1613
1614 let default_ret = default_reader.next().unwrap()?;
1615 let coerce_ret = coerce_reader.next().unwrap()?;
1616
1617 assert_eq!(default_ret, original);
1619
1620 assert_eq!(coerce_ret.column(0), original.column(0));
1622 assert_ne!(coerce_ret.column(1), original.column(1));
1623 assert_ne!(coerce_ret.column(2), original.column(2));
1624
1625 default_ret.column(0).as_primitive::<Date64Type>();
1627 coerce_ret.column(0).as_primitive::<Date64Type>();
1628
1629 Ok(())
1630 }
1631 struct RandFixedLenGen {}
1632
1633 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1634 fn r#gen(len: i32) -> FixedLenByteArray {
1635 let mut v = vec![0u8; len as usize];
1636 rng().fill_bytes(&mut v);
1637 ByteArray::from(v).into()
1638 }
1639 }
1640
1641 #[test]
1642 fn test_fixed_length_binary_column_reader() {
1643 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1644 20,
1645 ConvertedType::NONE,
1646 None,
1647 |vals| {
1648 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1649 for val in vals {
1650 match val {
1651 Some(b) => builder.append_value(b).unwrap(),
1652 None => builder.append_null(),
1653 }
1654 }
1655 Arc::new(builder.finish())
1656 },
1657 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1658 );
1659 }
1660
1661 #[test]
1662 fn test_interval_day_time_column_reader() {
1663 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1664 12,
1665 ConvertedType::INTERVAL,
1666 None,
1667 |vals| {
1668 Arc::new(
1669 vals.iter()
1670 .map(|x| {
1671 x.as_ref().map(|b| IntervalDayTime {
1672 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1673 milliseconds: i32::from_le_bytes(
1674 b.as_ref()[8..12].try_into().unwrap(),
1675 ),
1676 })
1677 })
1678 .collect::<IntervalDayTimeArray>(),
1679 )
1680 },
1681 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1682 );
1683 }
1684
1685 #[test]
1686 fn test_int96_single_column_reader_test() {
1687 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1688
1689 type TypeHintAndConversionFunction =
1690 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1691
1692 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1693 (None, |vals: &[Option<Int96>]| {
1695 Arc::new(TimestampNanosecondArray::from_iter(
1696 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1697 )) as ArrayRef
1698 }),
1699 (
1701 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1702 |vals: &[Option<Int96>]| {
1703 Arc::new(TimestampSecondArray::from_iter(
1704 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1705 )) as ArrayRef
1706 },
1707 ),
1708 (
1709 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1710 |vals: &[Option<Int96>]| {
1711 Arc::new(TimestampMillisecondArray::from_iter(
1712 vals.iter().map(|x| x.map(|x| x.to_millis())),
1713 )) as ArrayRef
1714 },
1715 ),
1716 (
1717 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1718 |vals: &[Option<Int96>]| {
1719 Arc::new(TimestampMicrosecondArray::from_iter(
1720 vals.iter().map(|x| x.map(|x| x.to_micros())),
1721 )) as ArrayRef
1722 },
1723 ),
1724 (
1725 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1726 |vals: &[Option<Int96>]| {
1727 Arc::new(TimestampNanosecondArray::from_iter(
1728 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1729 )) as ArrayRef
1730 },
1731 ),
1732 (
1734 Some(ArrowDataType::Timestamp(
1735 TimeUnit::Second,
1736 Some(Arc::from("-05:00")),
1737 )),
1738 |vals: &[Option<Int96>]| {
1739 Arc::new(
1740 TimestampSecondArray::from_iter(
1741 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1742 )
1743 .with_timezone("-05:00"),
1744 ) as ArrayRef
1745 },
1746 ),
1747 ];
1748
1749 resolutions.iter().for_each(|(arrow_type, converter)| {
1750 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1751 2,
1752 ConvertedType::NONE,
1753 arrow_type.clone(),
1754 converter,
1755 encodings,
1756 );
1757 })
1758 }
1759
1760 #[test]
1761 fn test_int96_from_spark_file_with_provided_schema() {
1762 use arrow_schema::DataType::Timestamp;
1766 let test_data = arrow::util::test_util::parquet_test_data();
1767 let path = format!("{test_data}/int96_from_spark.parquet");
1768 let file = File::open(path).unwrap();
1769
1770 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1771 "a",
1772 Timestamp(TimeUnit::Microsecond, None),
1773 true,
1774 )]));
1775 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1776
1777 let mut record_reader =
1778 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1779 .unwrap()
1780 .build()
1781 .unwrap();
1782
1783 let batch = record_reader.next().unwrap().unwrap();
1784 assert_eq!(batch.num_columns(), 1);
1785 let column = batch.column(0);
1786 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1787
1788 let expected = Arc::new(Int64Array::from(vec![
1789 Some(1704141296123456),
1790 Some(1704070800000000),
1791 Some(253402225200000000),
1792 Some(1735599600000000),
1793 None,
1794 Some(9089380393200000000),
1795 ]));
1796
1797 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1802 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1803
1804 assert_eq!(casted_timestamps.len(), expected.len());
1805
1806 casted_timestamps
1807 .iter()
1808 .zip(expected.iter())
1809 .for_each(|(lhs, rhs)| {
1810 assert_eq!(lhs, rhs);
1811 });
1812 }
1813
1814 #[test]
1815 fn test_int96_from_spark_file_without_provided_schema() {
1816 use arrow_schema::DataType::Timestamp;
1820 let test_data = arrow::util::test_util::parquet_test_data();
1821 let path = format!("{test_data}/int96_from_spark.parquet");
1822 let file = File::open(path).unwrap();
1823
1824 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1825 .unwrap()
1826 .build()
1827 .unwrap();
1828
1829 let batch = record_reader.next().unwrap().unwrap();
1830 assert_eq!(batch.num_columns(), 1);
1831 let column = batch.column(0);
1832 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1833
1834 let expected = Arc::new(Int64Array::from(vec![
1835 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1840 Some(-4864435138808946688), ]));
1842
1843 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1848 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1849
1850 assert_eq!(casted_timestamps.len(), expected.len());
1851
1852 casted_timestamps
1853 .iter()
1854 .zip(expected.iter())
1855 .for_each(|(lhs, rhs)| {
1856 assert_eq!(lhs, rhs);
1857 });
1858 }
1859
1860 struct RandUtf8Gen {}
1861
1862 impl RandGen<ByteArrayType> for RandUtf8Gen {
1863 fn r#gen(len: i32) -> ByteArray {
1864 Int32Type::r#gen(len).to_string().as_str().into()
1865 }
1866 }
1867
1868 #[test]
1869 fn test_utf8_single_column_reader_test() {
1870 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1871 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1872 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1873 })))
1874 }
1875
1876 let encodings = &[
1877 Encoding::PLAIN,
1878 Encoding::RLE_DICTIONARY,
1879 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1880 Encoding::DELTA_BYTE_ARRAY,
1881 ];
1882
1883 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1884 2,
1885 ConvertedType::NONE,
1886 None,
1887 |vals| {
1888 Arc::new(BinaryArray::from_iter(
1889 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1890 ))
1891 },
1892 encodings,
1893 );
1894
1895 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1896 2,
1897 ConvertedType::UTF8,
1898 None,
1899 string_converter::<i32>,
1900 encodings,
1901 );
1902
1903 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1904 2,
1905 ConvertedType::UTF8,
1906 Some(ArrowDataType::Utf8),
1907 string_converter::<i32>,
1908 encodings,
1909 );
1910
1911 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1912 2,
1913 ConvertedType::UTF8,
1914 Some(ArrowDataType::LargeUtf8),
1915 string_converter::<i64>,
1916 encodings,
1917 );
1918
1919 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1920 for key in &small_key_types {
1921 for encoding in encodings {
1922 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1923 opts.encoding = *encoding;
1924
1925 let data_type =
1926 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1927
1928 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1930 opts,
1931 2,
1932 ConvertedType::UTF8,
1933 Some(data_type.clone()),
1934 move |vals| {
1935 let vals = string_converter::<i32>(vals);
1936 arrow::compute::cast(&vals, &data_type).unwrap()
1937 },
1938 );
1939 }
1940 }
1941
1942 let key_types = [
1943 ArrowDataType::Int16,
1944 ArrowDataType::UInt16,
1945 ArrowDataType::Int32,
1946 ArrowDataType::UInt32,
1947 ArrowDataType::Int64,
1948 ArrowDataType::UInt64,
1949 ];
1950
1951 for key in &key_types {
1952 let data_type =
1953 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1954
1955 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1956 2,
1957 ConvertedType::UTF8,
1958 Some(data_type.clone()),
1959 move |vals| {
1960 let vals = string_converter::<i32>(vals);
1961 arrow::compute::cast(&vals, &data_type).unwrap()
1962 },
1963 encodings,
1964 );
1965
1966 }
1983 }
1984
1985 #[test]
1986 fn test_decimal_nullable_struct() {
1987 let decimals = Decimal256Array::from_iter_values(
1988 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1989 );
1990
1991 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1992 "decimals",
1993 decimals.data_type().clone(),
1994 false,
1995 )])))
1996 .len(8)
1997 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1998 .child_data(vec![decimals.into_data()])
1999 .build()
2000 .unwrap();
2001
2002 let written =
2003 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2004 .unwrap();
2005
2006 let mut buffer = Vec::with_capacity(1024);
2007 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2008 writer.write(&written).unwrap();
2009 writer.close().unwrap();
2010
2011 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2012 .unwrap()
2013 .collect::<Result<Vec<_>, _>>()
2014 .unwrap();
2015
2016 assert_eq!(&written.slice(0, 3), &read[0]);
2017 assert_eq!(&written.slice(3, 3), &read[1]);
2018 assert_eq!(&written.slice(6, 2), &read[2]);
2019 }
2020
2021 #[test]
2022 fn test_int32_nullable_struct() {
2023 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2024 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2025 "int32",
2026 int32.data_type().clone(),
2027 false,
2028 )])))
2029 .len(8)
2030 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2031 .child_data(vec![int32.into_data()])
2032 .build()
2033 .unwrap();
2034
2035 let written =
2036 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2037 .unwrap();
2038
2039 let mut buffer = Vec::with_capacity(1024);
2040 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2041 writer.write(&written).unwrap();
2042 writer.close().unwrap();
2043
2044 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2045 .unwrap()
2046 .collect::<Result<Vec<_>, _>>()
2047 .unwrap();
2048
2049 assert_eq!(&written.slice(0, 3), &read[0]);
2050 assert_eq!(&written.slice(3, 3), &read[1]);
2051 assert_eq!(&written.slice(6, 2), &read[2]);
2052 }
2053
2054 #[test]
2055 fn test_decimal_list() {
2056 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2057
2058 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2060 decimals.data_type().clone(),
2061 false,
2062 ))))
2063 .len(7)
2064 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2065 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2066 .child_data(vec![decimals.into_data()])
2067 .build()
2068 .unwrap();
2069
2070 let written =
2071 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2072 .unwrap();
2073
2074 let mut buffer = Vec::with_capacity(1024);
2075 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2076 writer.write(&written).unwrap();
2077 writer.close().unwrap();
2078
2079 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2080 .unwrap()
2081 .collect::<Result<Vec<_>, _>>()
2082 .unwrap();
2083
2084 assert_eq!(&written.slice(0, 3), &read[0]);
2085 assert_eq!(&written.slice(3, 3), &read[1]);
2086 assert_eq!(&written.slice(6, 1), &read[2]);
2087 }
2088
2089 #[test]
2090 fn test_read_decimal_file() {
2091 use arrow_array::Decimal128Array;
2092 let testdata = arrow::util::test_util::parquet_test_data();
2093 let file_variants = vec![
2094 ("byte_array", 4),
2095 ("fixed_length", 25),
2096 ("int32", 4),
2097 ("int64", 10),
2098 ];
2099 for (prefix, target_precision) in file_variants {
2100 let path = format!("{testdata}/{prefix}_decimal.parquet");
2101 let file = File::open(path).unwrap();
2102 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2103
2104 let batch = record_reader.next().unwrap().unwrap();
2105 assert_eq!(batch.num_rows(), 24);
2106 let col = batch
2107 .column(0)
2108 .as_any()
2109 .downcast_ref::<Decimal128Array>()
2110 .unwrap();
2111
2112 let expected = 1..25;
2113
2114 assert_eq!(col.precision(), target_precision);
2115 assert_eq!(col.scale(), 2);
2116
2117 for (i, v) in expected.enumerate() {
2118 assert_eq!(col.value(i), v * 100_i128);
2119 }
2120 }
2121 }
2122
2123 #[test]
2124 fn test_read_float16_nonzeros_file() {
2125 use arrow_array::Float16Array;
2126 let testdata = arrow::util::test_util::parquet_test_data();
2127 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2129 let file = File::open(path).unwrap();
2130 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2131
2132 let batch = record_reader.next().unwrap().unwrap();
2133 assert_eq!(batch.num_rows(), 8);
2134 let col = batch
2135 .column(0)
2136 .as_any()
2137 .downcast_ref::<Float16Array>()
2138 .unwrap();
2139
2140 let f16_two = f16::ONE + f16::ONE;
2141
2142 assert_eq!(col.null_count(), 1);
2143 assert!(col.is_null(0));
2144 assert_eq!(col.value(1), f16::ONE);
2145 assert_eq!(col.value(2), -f16_two);
2146 assert!(col.value(3).is_nan());
2147 assert_eq!(col.value(4), f16::ZERO);
2148 assert!(col.value(4).is_sign_positive());
2149 assert_eq!(col.value(5), f16::NEG_ONE);
2150 assert_eq!(col.value(6), f16::NEG_ZERO);
2151 assert!(col.value(6).is_sign_negative());
2152 assert_eq!(col.value(7), f16_two);
2153 }
2154
2155 #[test]
2156 fn test_read_float16_zeros_file() {
2157 use arrow_array::Float16Array;
2158 let testdata = arrow::util::test_util::parquet_test_data();
2159 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2161 let file = File::open(path).unwrap();
2162 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2163
2164 let batch = record_reader.next().unwrap().unwrap();
2165 assert_eq!(batch.num_rows(), 3);
2166 let col = batch
2167 .column(0)
2168 .as_any()
2169 .downcast_ref::<Float16Array>()
2170 .unwrap();
2171
2172 assert_eq!(col.null_count(), 1);
2173 assert!(col.is_null(0));
2174 assert_eq!(col.value(1), f16::ZERO);
2175 assert!(col.value(1).is_sign_positive());
2176 assert!(col.value(2).is_nan());
2177 }
2178
2179 #[test]
2180 fn test_read_float32_float64_byte_stream_split() {
2181 let path = format!(
2182 "{}/byte_stream_split.zstd.parquet",
2183 arrow::util::test_util::parquet_test_data(),
2184 );
2185 let file = File::open(path).unwrap();
2186 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2187
2188 let mut row_count = 0;
2189 for batch in record_reader {
2190 let batch = batch.unwrap();
2191 row_count += batch.num_rows();
2192 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2193 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2194
2195 for &x in f32_col.values() {
2197 assert!(x > -10.0);
2198 assert!(x < 10.0);
2199 }
2200 for &x in f64_col.values() {
2201 assert!(x > -10.0);
2202 assert!(x < 10.0);
2203 }
2204 }
2205 assert_eq!(row_count, 300);
2206 }
2207
2208 #[test]
2209 fn test_read_extended_byte_stream_split() {
2210 let path = format!(
2211 "{}/byte_stream_split_extended.gzip.parquet",
2212 arrow::util::test_util::parquet_test_data(),
2213 );
2214 let file = File::open(path).unwrap();
2215 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2216
2217 let mut row_count = 0;
2218 for batch in record_reader {
2219 let batch = batch.unwrap();
2220 row_count += batch.num_rows();
2221
2222 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2224 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2225 assert_eq!(f16_col.len(), f16_bss.len());
2226 f16_col
2227 .iter()
2228 .zip(f16_bss.iter())
2229 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2230
2231 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2233 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2234 assert_eq!(f32_col.len(), f32_bss.len());
2235 f32_col
2236 .iter()
2237 .zip(f32_bss.iter())
2238 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2239
2240 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2242 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2243 assert_eq!(f64_col.len(), f64_bss.len());
2244 f64_col
2245 .iter()
2246 .zip(f64_bss.iter())
2247 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2248
2249 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2251 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2252 assert_eq!(i32_col.len(), i32_bss.len());
2253 i32_col
2254 .iter()
2255 .zip(i32_bss.iter())
2256 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2257
2258 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2260 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2261 assert_eq!(i64_col.len(), i64_bss.len());
2262 i64_col
2263 .iter()
2264 .zip(i64_bss.iter())
2265 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2266
2267 let flba_col = batch.column(10).as_fixed_size_binary();
2269 let flba_bss = batch.column(11).as_fixed_size_binary();
2270 assert_eq!(flba_col.len(), flba_bss.len());
2271 flba_col
2272 .iter()
2273 .zip(flba_bss.iter())
2274 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2275
2276 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2278 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2279 assert_eq!(dec_col.len(), dec_bss.len());
2280 dec_col
2281 .iter()
2282 .zip(dec_bss.iter())
2283 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2284 }
2285 assert_eq!(row_count, 200);
2286 }
2287
2288 #[test]
2289 fn test_read_incorrect_map_schema_file() {
2290 let testdata = arrow::util::test_util::parquet_test_data();
2291 let path = format!("{testdata}/incorrect_map_schema.parquet");
2293 let file = File::open(path).unwrap();
2294 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2295
2296 let batch = record_reader.next().unwrap().unwrap();
2297 assert_eq!(batch.num_rows(), 1);
2298
2299 let expected_schema = Schema::new(vec![Field::new(
2300 "my_map",
2301 ArrowDataType::Map(
2302 Arc::new(Field::new(
2303 "key_value",
2304 ArrowDataType::Struct(Fields::from(vec![
2305 Field::new("key", ArrowDataType::Utf8, false),
2306 Field::new("value", ArrowDataType::Utf8, true),
2307 ])),
2308 false,
2309 )),
2310 false,
2311 ),
2312 true,
2313 )]);
2314 assert_eq!(batch.schema().as_ref(), &expected_schema);
2315
2316 assert_eq!(batch.num_rows(), 1);
2317 assert_eq!(batch.column(0).null_count(), 0);
2318 assert_eq!(
2319 batch.column(0).as_map().keys().as_ref(),
2320 &StringArray::from(vec!["parent", "name"])
2321 );
2322 assert_eq!(
2323 batch.column(0).as_map().values().as_ref(),
2324 &StringArray::from(vec!["another", "report"])
2325 );
2326 }
2327
2328 #[test]
2329 fn test_read_dict_fixed_size_binary() {
2330 let schema = Arc::new(Schema::new(vec![Field::new(
2331 "a",
2332 ArrowDataType::Dictionary(
2333 Box::new(ArrowDataType::UInt8),
2334 Box::new(ArrowDataType::FixedSizeBinary(8)),
2335 ),
2336 true,
2337 )]));
2338 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2339 let values = FixedSizeBinaryArray::try_from_iter(
2340 vec![
2341 (0u8..8u8).collect::<Vec<u8>>(),
2342 (24u8..32u8).collect::<Vec<u8>>(),
2343 ]
2344 .into_iter(),
2345 )
2346 .unwrap();
2347 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2348 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2349
2350 let mut buffer = Vec::with_capacity(1024);
2351 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2352 writer.write(&batch).unwrap();
2353 writer.close().unwrap();
2354 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2355 .unwrap()
2356 .collect::<Result<Vec<_>, _>>()
2357 .unwrap();
2358
2359 assert_eq!(read.len(), 1);
2360 assert_eq!(&batch, &read[0])
2361 }
2362
2363 #[derive(Clone)]
2365 struct TestOptions {
2366 num_row_groups: usize,
2369 num_rows: usize,
2371 record_batch_size: usize,
2373 null_percent: Option<usize>,
2375 write_batch_size: usize,
2380 max_data_page_size: usize,
2382 max_dict_page_size: usize,
2384 writer_version: WriterVersion,
2386 enabled_statistics: EnabledStatistics,
2388 encoding: Encoding,
2390 row_selections: Option<(RowSelection, usize)>,
2392 row_filter: Option<Vec<bool>>,
2394 limit: Option<usize>,
2396 offset: Option<usize>,
2398 }
2399
2400 impl std::fmt::Debug for TestOptions {
2402 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2403 f.debug_struct("TestOptions")
2404 .field("num_row_groups", &self.num_row_groups)
2405 .field("num_rows", &self.num_rows)
2406 .field("record_batch_size", &self.record_batch_size)
2407 .field("null_percent", &self.null_percent)
2408 .field("write_batch_size", &self.write_batch_size)
2409 .field("max_data_page_size", &self.max_data_page_size)
2410 .field("max_dict_page_size", &self.max_dict_page_size)
2411 .field("writer_version", &self.writer_version)
2412 .field("enabled_statistics", &self.enabled_statistics)
2413 .field("encoding", &self.encoding)
2414 .field("row_selections", &self.row_selections.is_some())
2415 .field("row_filter", &self.row_filter.is_some())
2416 .field("limit", &self.limit)
2417 .field("offset", &self.offset)
2418 .finish()
2419 }
2420 }
2421
2422 impl Default for TestOptions {
2423 fn default() -> Self {
2424 Self {
2425 num_row_groups: 2,
2426 num_rows: 100,
2427 record_batch_size: 15,
2428 null_percent: None,
2429 write_batch_size: 64,
2430 max_data_page_size: 1024 * 1024,
2431 max_dict_page_size: 1024 * 1024,
2432 writer_version: WriterVersion::PARQUET_1_0,
2433 enabled_statistics: EnabledStatistics::Page,
2434 encoding: Encoding::PLAIN,
2435 row_selections: None,
2436 row_filter: None,
2437 limit: None,
2438 offset: None,
2439 }
2440 }
2441 }
2442
2443 impl TestOptions {
2444 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2445 Self {
2446 num_row_groups,
2447 num_rows,
2448 record_batch_size,
2449 ..Default::default()
2450 }
2451 }
2452
2453 fn with_null_percent(self, null_percent: usize) -> Self {
2454 Self {
2455 null_percent: Some(null_percent),
2456 ..self
2457 }
2458 }
2459
2460 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2461 Self {
2462 max_data_page_size,
2463 ..self
2464 }
2465 }
2466
2467 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2468 Self {
2469 max_dict_page_size,
2470 ..self
2471 }
2472 }
2473
2474 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2475 Self {
2476 enabled_statistics,
2477 ..self
2478 }
2479 }
2480
2481 fn with_row_selections(self) -> Self {
2482 assert!(self.row_filter.is_none(), "Must set row selection first");
2483
2484 let mut rng = rng();
2485 let step = rng.random_range(self.record_batch_size..self.num_rows);
2486 let row_selections = create_test_selection(
2487 step,
2488 self.num_row_groups * self.num_rows,
2489 rng.random::<bool>(),
2490 );
2491 Self {
2492 row_selections: Some(row_selections),
2493 ..self
2494 }
2495 }
2496
2497 fn with_row_filter(self) -> Self {
2498 let row_count = match &self.row_selections {
2499 Some((_, count)) => *count,
2500 None => self.num_row_groups * self.num_rows,
2501 };
2502
2503 let mut rng = rng();
2504 Self {
2505 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2506 ..self
2507 }
2508 }
2509
2510 fn with_limit(self, limit: usize) -> Self {
2511 Self {
2512 limit: Some(limit),
2513 ..self
2514 }
2515 }
2516
2517 fn with_offset(self, offset: usize) -> Self {
2518 Self {
2519 offset: Some(offset),
2520 ..self
2521 }
2522 }
2523
2524 fn writer_props(&self) -> WriterProperties {
2525 let builder = WriterProperties::builder()
2526 .set_data_page_size_limit(self.max_data_page_size)
2527 .set_write_batch_size(self.write_batch_size)
2528 .set_writer_version(self.writer_version)
2529 .set_statistics_enabled(self.enabled_statistics);
2530
2531 let builder = match self.encoding {
2532 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2533 .set_dictionary_enabled(true)
2534 .set_dictionary_page_size_limit(self.max_dict_page_size),
2535 _ => builder
2536 .set_dictionary_enabled(false)
2537 .set_encoding(self.encoding),
2538 };
2539
2540 builder.build()
2541 }
2542 }
2543
2544 fn run_single_column_reader_tests<T, F, G>(
2551 rand_max: i32,
2552 converted_type: ConvertedType,
2553 arrow_type: Option<ArrowDataType>,
2554 converter: F,
2555 encodings: &[Encoding],
2556 ) where
2557 T: DataType,
2558 G: RandGen<T>,
2559 F: Fn(&[Option<T::T>]) -> ArrayRef,
2560 {
2561 let all_options = vec![
2562 TestOptions::new(2, 100, 15),
2565 TestOptions::new(3, 25, 5),
2570 TestOptions::new(4, 100, 25),
2574 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2576 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2578 TestOptions::new(2, 256, 127).with_null_percent(0),
2580 TestOptions::new(2, 256, 93).with_null_percent(25),
2582 TestOptions::new(4, 100, 25).with_limit(0),
2584 TestOptions::new(4, 100, 25).with_limit(50),
2586 TestOptions::new(4, 100, 25).with_limit(10),
2588 TestOptions::new(4, 100, 25).with_limit(101),
2590 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2592 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2594 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2596 TestOptions::new(2, 256, 91)
2598 .with_null_percent(25)
2599 .with_enabled_statistics(EnabledStatistics::Chunk),
2600 TestOptions::new(2, 256, 91)
2602 .with_null_percent(25)
2603 .with_enabled_statistics(EnabledStatistics::None),
2604 TestOptions::new(2, 128, 91)
2606 .with_null_percent(100)
2607 .with_enabled_statistics(EnabledStatistics::None),
2608 TestOptions::new(2, 100, 15).with_row_selections(),
2613 TestOptions::new(3, 25, 5).with_row_selections(),
2618 TestOptions::new(4, 100, 25).with_row_selections(),
2622 TestOptions::new(3, 256, 73)
2624 .with_max_data_page_size(128)
2625 .with_row_selections(),
2626 TestOptions::new(3, 256, 57)
2628 .with_max_dict_page_size(128)
2629 .with_row_selections(),
2630 TestOptions::new(2, 256, 127)
2632 .with_null_percent(0)
2633 .with_row_selections(),
2634 TestOptions::new(2, 256, 93)
2636 .with_null_percent(25)
2637 .with_row_selections(),
2638 TestOptions::new(2, 256, 93)
2640 .with_null_percent(25)
2641 .with_row_selections()
2642 .with_limit(10),
2643 TestOptions::new(2, 256, 93)
2645 .with_null_percent(25)
2646 .with_row_selections()
2647 .with_offset(20)
2648 .with_limit(10),
2649 TestOptions::new(4, 100, 25).with_row_filter(),
2653 TestOptions::new(4, 100, 25)
2655 .with_row_selections()
2656 .with_row_filter(),
2657 TestOptions::new(2, 256, 93)
2659 .with_null_percent(25)
2660 .with_max_data_page_size(10)
2661 .with_row_filter(),
2662 TestOptions::new(2, 256, 93)
2664 .with_null_percent(25)
2665 .with_max_data_page_size(10)
2666 .with_row_selections()
2667 .with_row_filter(),
2668 TestOptions::new(2, 256, 93)
2670 .with_enabled_statistics(EnabledStatistics::None)
2671 .with_max_data_page_size(10)
2672 .with_row_selections(),
2673 ];
2674
2675 all_options.into_iter().for_each(|opts| {
2676 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2677 for encoding in encodings {
2678 let opts = TestOptions {
2679 writer_version,
2680 encoding: *encoding,
2681 ..opts.clone()
2682 };
2683
2684 single_column_reader_test::<T, _, G>(
2685 opts,
2686 rand_max,
2687 converted_type,
2688 arrow_type.clone(),
2689 &converter,
2690 )
2691 }
2692 }
2693 });
2694 }
2695
2696 fn single_column_reader_test<T, F, G>(
2700 opts: TestOptions,
2701 rand_max: i32,
2702 converted_type: ConvertedType,
2703 arrow_type: Option<ArrowDataType>,
2704 converter: F,
2705 ) where
2706 T: DataType,
2707 G: RandGen<T>,
2708 F: Fn(&[Option<T::T>]) -> ArrayRef,
2709 {
2710 println!(
2712 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2713 T::get_physical_type(),
2714 converted_type,
2715 arrow_type,
2716 opts
2717 );
2718
2719 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2721 Some(null_percent) => {
2722 let mut rng = rng();
2723
2724 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2725 .map(|_| {
2726 std::iter::from_fn(|| {
2727 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2728 })
2729 .take(opts.num_rows)
2730 .collect()
2731 })
2732 .collect();
2733 (Repetition::OPTIONAL, Some(def_levels))
2734 }
2735 None => (Repetition::REQUIRED, None),
2736 };
2737
2738 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2740 .map(|idx| {
2741 let null_count = match def_levels.as_ref() {
2742 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2743 None => 0,
2744 };
2745 G::gen_vec(rand_max, opts.num_rows - null_count)
2746 })
2747 .collect();
2748
2749 let len = match T::get_physical_type() {
2750 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2751 crate::basic::Type::INT96 => 12,
2752 _ => -1,
2753 };
2754
2755 let fields = vec![Arc::new(
2756 Type::primitive_type_builder("leaf", T::get_physical_type())
2757 .with_repetition(repetition)
2758 .with_converted_type(converted_type)
2759 .with_length(len)
2760 .build()
2761 .unwrap(),
2762 )];
2763
2764 let schema = Arc::new(
2765 Type::group_type_builder("test_schema")
2766 .with_fields(fields)
2767 .build()
2768 .unwrap(),
2769 );
2770
2771 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2772
2773 let mut file = tempfile::tempfile().unwrap();
2774
2775 generate_single_column_file_with_data::<T>(
2776 &values,
2777 def_levels.as_ref(),
2778 file.try_clone().unwrap(), schema,
2780 arrow_field,
2781 &opts,
2782 )
2783 .unwrap();
2784
2785 file.rewind().unwrap();
2786
2787 let options = ArrowReaderOptions::new()
2788 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2789
2790 let mut builder =
2791 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2792
2793 let expected_data = match opts.row_selections {
2794 Some((selections, row_count)) => {
2795 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2796
2797 let mut skip_data: Vec<Option<T::T>> = vec![];
2798 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2799 for select in dequeue {
2800 if select.skip {
2801 without_skip_data.drain(0..select.row_count);
2802 } else {
2803 skip_data.extend(without_skip_data.drain(0..select.row_count));
2804 }
2805 }
2806 builder = builder.with_row_selection(selections);
2807
2808 assert_eq!(skip_data.len(), row_count);
2809 skip_data
2810 }
2811 None => {
2812 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2814 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2815 expected_data
2816 }
2817 };
2818
2819 let mut expected_data = match opts.row_filter {
2820 Some(filter) => {
2821 let expected_data = expected_data
2822 .into_iter()
2823 .zip(filter.iter())
2824 .filter_map(|(d, f)| f.then(|| d))
2825 .collect();
2826
2827 let mut filter_offset = 0;
2828 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2829 ProjectionMask::all(),
2830 move |b| {
2831 let array = BooleanArray::from_iter(
2832 filter
2833 .iter()
2834 .skip(filter_offset)
2835 .take(b.num_rows())
2836 .map(|x| Some(*x)),
2837 );
2838 filter_offset += b.num_rows();
2839 Ok(array)
2840 },
2841 ))]);
2842
2843 builder = builder.with_row_filter(filter);
2844 expected_data
2845 }
2846 None => expected_data,
2847 };
2848
2849 if let Some(offset) = opts.offset {
2850 builder = builder.with_offset(offset);
2851 expected_data = expected_data.into_iter().skip(offset).collect();
2852 }
2853
2854 if let Some(limit) = opts.limit {
2855 builder = builder.with_limit(limit);
2856 expected_data = expected_data.into_iter().take(limit).collect();
2857 }
2858
2859 let mut record_reader = builder
2860 .with_batch_size(opts.record_batch_size)
2861 .build()
2862 .unwrap();
2863
2864 let mut total_read = 0;
2865 loop {
2866 let maybe_batch = record_reader.next();
2867 if total_read < expected_data.len() {
2868 let end = min(total_read + opts.record_batch_size, expected_data.len());
2869 let batch = maybe_batch.unwrap().unwrap();
2870 assert_eq!(end - total_read, batch.num_rows());
2871
2872 let a = converter(&expected_data[total_read..end]);
2873 let b = Arc::clone(batch.column(0));
2874
2875 assert_eq!(a.data_type(), b.data_type());
2876 assert_eq!(a.to_data(), b.to_data());
2877 assert_eq!(
2878 a.as_any().type_id(),
2879 b.as_any().type_id(),
2880 "incorrect type ids"
2881 );
2882
2883 total_read = end;
2884 } else {
2885 assert!(maybe_batch.is_none());
2886 break;
2887 }
2888 }
2889 }
2890
2891 fn gen_expected_data<T: DataType>(
2892 def_levels: Option<&Vec<Vec<i16>>>,
2893 values: &[Vec<T::T>],
2894 ) -> Vec<Option<T::T>> {
2895 let data: Vec<Option<T::T>> = match def_levels {
2896 Some(levels) => {
2897 let mut values_iter = values.iter().flatten();
2898 levels
2899 .iter()
2900 .flatten()
2901 .map(|d| match d {
2902 1 => Some(values_iter.next().cloned().unwrap()),
2903 0 => None,
2904 _ => unreachable!(),
2905 })
2906 .collect()
2907 }
2908 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2909 };
2910 data
2911 }
2912
2913 fn generate_single_column_file_with_data<T: DataType>(
2914 values: &[Vec<T::T>],
2915 def_levels: Option<&Vec<Vec<i16>>>,
2916 file: File,
2917 schema: TypePtr,
2918 field: Option<Field>,
2919 opts: &TestOptions,
2920 ) -> Result<ParquetMetaData> {
2921 let mut writer_props = opts.writer_props();
2922 if let Some(field) = field {
2923 let arrow_schema = Schema::new(vec![field]);
2924 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2925 }
2926
2927 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2928
2929 for (idx, v) in values.iter().enumerate() {
2930 let def_levels = def_levels.map(|d| d[idx].as_slice());
2931 let mut row_group_writer = writer.next_row_group()?;
2932 {
2933 let mut column_writer = row_group_writer
2934 .next_column()?
2935 .expect("Column writer is none!");
2936
2937 column_writer
2938 .typed::<T>()
2939 .write_batch(v, def_levels, None)?;
2940
2941 column_writer.close()?;
2942 }
2943 row_group_writer.close()?;
2944 }
2945
2946 writer.close()
2947 }
2948
2949 fn get_test_file(file_name: &str) -> File {
2950 let mut path = PathBuf::new();
2951 path.push(arrow::util::test_util::arrow_test_data());
2952 path.push(file_name);
2953
2954 File::open(path.as_path()).expect("File not found!")
2955 }
2956
2957 #[test]
2958 fn test_read_structs() {
2959 let testdata = arrow::util::test_util::parquet_test_data();
2963 let path = format!("{testdata}/nested_structs.rust.parquet");
2964 let file = File::open(&path).unwrap();
2965 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2966
2967 for batch in record_batch_reader {
2968 batch.unwrap();
2969 }
2970
2971 let file = File::open(&path).unwrap();
2972 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2973
2974 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2975 let projected_reader = builder
2976 .with_projection(mask)
2977 .with_batch_size(60)
2978 .build()
2979 .unwrap();
2980
2981 let expected_schema = Schema::new(vec![
2982 Field::new(
2983 "roll_num",
2984 ArrowDataType::Struct(Fields::from(vec![Field::new(
2985 "count",
2986 ArrowDataType::UInt64,
2987 false,
2988 )])),
2989 false,
2990 ),
2991 Field::new(
2992 "PC_CUR",
2993 ArrowDataType::Struct(Fields::from(vec![
2994 Field::new("mean", ArrowDataType::Int64, false),
2995 Field::new("sum", ArrowDataType::Int64, false),
2996 ])),
2997 false,
2998 ),
2999 ]);
3000
3001 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3003
3004 for batch in projected_reader {
3005 let batch = batch.unwrap();
3006 assert_eq!(batch.schema().as_ref(), &expected_schema);
3007 }
3008 }
3009
3010 #[test]
3011 fn test_read_structs_by_name() {
3013 let testdata = arrow::util::test_util::parquet_test_data();
3014 let path = format!("{testdata}/nested_structs.rust.parquet");
3015 let file = File::open(&path).unwrap();
3016 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3017
3018 for batch in record_batch_reader {
3019 batch.unwrap();
3020 }
3021
3022 let file = File::open(&path).unwrap();
3023 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3024
3025 let mask = ProjectionMask::columns(
3026 builder.parquet_schema(),
3027 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3028 );
3029 let projected_reader = builder
3030 .with_projection(mask)
3031 .with_batch_size(60)
3032 .build()
3033 .unwrap();
3034
3035 let expected_schema = Schema::new(vec![
3036 Field::new(
3037 "roll_num",
3038 ArrowDataType::Struct(Fields::from(vec![Field::new(
3039 "count",
3040 ArrowDataType::UInt64,
3041 false,
3042 )])),
3043 false,
3044 ),
3045 Field::new(
3046 "PC_CUR",
3047 ArrowDataType::Struct(Fields::from(vec![
3048 Field::new("mean", ArrowDataType::Int64, false),
3049 Field::new("sum", ArrowDataType::Int64, false),
3050 ])),
3051 false,
3052 ),
3053 ]);
3054
3055 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3056
3057 for batch in projected_reader {
3058 let batch = batch.unwrap();
3059 assert_eq!(batch.schema().as_ref(), &expected_schema);
3060 }
3061 }
3062
3063 #[test]
3064 fn test_read_maps() {
3065 let testdata = arrow::util::test_util::parquet_test_data();
3066 let path = format!("{testdata}/nested_maps.snappy.parquet");
3067 let file = File::open(path).unwrap();
3068 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3069
3070 for batch in record_batch_reader {
3071 batch.unwrap();
3072 }
3073 }
3074
3075 #[test]
3076 fn test_nested_nullability() {
3077 let message_type = "message nested {
3078 OPTIONAL Group group {
3079 REQUIRED INT32 leaf;
3080 }
3081 }";
3082
3083 let file = tempfile::tempfile().unwrap();
3084 let schema = Arc::new(parse_message_type(message_type).unwrap());
3085
3086 {
3087 let mut writer =
3089 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3090 .unwrap();
3091
3092 {
3093 let mut row_group_writer = writer.next_row_group().unwrap();
3094 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3095
3096 column_writer
3097 .typed::<Int32Type>()
3098 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3099 .unwrap();
3100
3101 column_writer.close().unwrap();
3102 row_group_writer.close().unwrap();
3103 }
3104
3105 writer.close().unwrap();
3106 }
3107
3108 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3109 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3110
3111 let reader = builder.with_projection(mask).build().unwrap();
3112
3113 let expected_schema = Schema::new(vec![Field::new(
3114 "group",
3115 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3116 true,
3117 )]);
3118
3119 let batch = reader.into_iter().next().unwrap().unwrap();
3120 assert_eq!(batch.schema().as_ref(), &expected_schema);
3121 assert_eq!(batch.num_rows(), 4);
3122 assert_eq!(batch.column(0).null_count(), 2);
3123 }
3124
3125 #[test]
3126 fn test_invalid_utf8() {
3127 let data = vec![
3129 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3130 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3131 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3132 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3133 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3134 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3135 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3136 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3137 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3138 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3139 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3140 82, 49,
3141 ];
3142
3143 let file = Bytes::from(data);
3144 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3145
3146 let error = record_batch_reader.next().unwrap().unwrap_err();
3147
3148 assert!(
3149 error.to_string().contains("invalid utf-8 sequence"),
3150 "{}",
3151 error
3152 );
3153 }
3154
3155 #[test]
3156 fn test_invalid_utf8_string_array() {
3157 test_invalid_utf8_string_array_inner::<i32>();
3158 }
3159
3160 #[test]
3161 fn test_invalid_utf8_large_string_array() {
3162 test_invalid_utf8_string_array_inner::<i64>();
3163 }
3164
3165 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3166 let cases = [
3167 invalid_utf8_first_char::<O>(),
3168 invalid_utf8_first_char_long_strings::<O>(),
3169 invalid_utf8_later_char::<O>(),
3170 invalid_utf8_later_char_long_strings::<O>(),
3171 invalid_utf8_later_char_really_long_strings::<O>(),
3172 invalid_utf8_later_char_really_long_strings2::<O>(),
3173 ];
3174 for array in &cases {
3175 for encoding in STRING_ENCODINGS {
3176 let array = unsafe {
3179 GenericStringArray::<O>::new_unchecked(
3180 array.offsets().clone(),
3181 array.values().clone(),
3182 array.nulls().cloned(),
3183 )
3184 };
3185 let data_type = array.data_type().clone();
3186 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3187 let err = read_from_parquet(data).unwrap_err();
3188 let expected_err =
3189 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3190 assert!(
3191 err.to_string().contains(expected_err),
3192 "data type: {data_type}, expected: {expected_err}, got: {err}"
3193 );
3194 }
3195 }
3196 }
3197
3198 #[test]
3199 fn test_invalid_utf8_string_view_array() {
3200 let cases = [
3201 invalid_utf8_first_char::<i32>(),
3202 invalid_utf8_first_char_long_strings::<i32>(),
3203 invalid_utf8_later_char::<i32>(),
3204 invalid_utf8_later_char_long_strings::<i32>(),
3205 invalid_utf8_later_char_really_long_strings::<i32>(),
3206 invalid_utf8_later_char_really_long_strings2::<i32>(),
3207 ];
3208
3209 for encoding in STRING_ENCODINGS {
3210 for array in &cases {
3211 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3212 let array = array.as_binary_view();
3213
3214 let array = unsafe {
3217 StringViewArray::new_unchecked(
3218 array.views().clone(),
3219 array.data_buffers().to_vec(),
3220 array.nulls().cloned(),
3221 )
3222 };
3223
3224 let data_type = array.data_type().clone();
3225 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3226 let err = read_from_parquet(data).unwrap_err();
3227 let expected_err =
3228 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3229 assert!(
3230 err.to_string().contains(expected_err),
3231 "data type: {data_type}, expected: {expected_err}, got: {err}"
3232 );
3233 }
3234 }
3235 }
3236
3237 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3239 None,
3240 Some(Encoding::PLAIN),
3241 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3242 Some(Encoding::DELTA_BYTE_ARRAY),
3243 ];
3244
3245 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3248
3249 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3252
3253 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3255 let valid: &[u8] = b" ";
3256 let invalid = INVALID_UTF8_FIRST_CHAR;
3257 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3258 }
3259
3260 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3264 let valid: &[u8] = b" ";
3265 let mut invalid = vec![];
3266 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3267 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3268 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3269 }
3270
3271 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3274 let valid: &[u8] = b" ";
3275 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3276 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3277 }
3278
3279 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3283 let valid: &[u8] = b" ";
3284 let mut invalid = vec![];
3285 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3286 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3287 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3288 }
3289
3290 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3294 let valid: &[u8] = b" ";
3295 let mut invalid = vec![];
3296 for _ in 0..10 {
3297 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3299 }
3300 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3301 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3302 }
3303
3304 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3307 let valid: &[u8] = b" ";
3308 let mut valid_long = vec![];
3309 for _ in 0..10 {
3310 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3312 }
3313 let invalid = INVALID_UTF8_LATER_CHAR;
3314 GenericBinaryArray::<O>::from_iter(vec![
3315 None,
3316 Some(valid),
3317 Some(invalid),
3318 None,
3319 Some(&valid_long),
3320 Some(valid),
3321 ])
3322 }
3323
3324 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3329 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3330 let mut data = vec![];
3331 let schema = batch.schema();
3332 let props = encoding.map(|encoding| {
3333 WriterProperties::builder()
3334 .set_dictionary_enabled(false)
3336 .set_encoding(encoding)
3337 .build()
3338 });
3339
3340 {
3341 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3342 writer.write(&batch).unwrap();
3343 writer.flush().unwrap();
3344 writer.close().unwrap();
3345 };
3346 data
3347 }
3348
3349 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3351 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3352 .unwrap()
3353 .build()
3354 .unwrap();
3355
3356 reader.collect()
3357 }
3358
3359 #[test]
3360 fn test_dictionary_preservation() {
3361 let fields = vec![Arc::new(
3362 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3363 .with_repetition(Repetition::OPTIONAL)
3364 .with_converted_type(ConvertedType::UTF8)
3365 .build()
3366 .unwrap(),
3367 )];
3368
3369 let schema = Arc::new(
3370 Type::group_type_builder("test_schema")
3371 .with_fields(fields)
3372 .build()
3373 .unwrap(),
3374 );
3375
3376 let dict_type = ArrowDataType::Dictionary(
3377 Box::new(ArrowDataType::Int32),
3378 Box::new(ArrowDataType::Utf8),
3379 );
3380
3381 let arrow_field = Field::new("leaf", dict_type, true);
3382
3383 let mut file = tempfile::tempfile().unwrap();
3384
3385 let values = vec![
3386 vec![
3387 ByteArray::from("hello"),
3388 ByteArray::from("a"),
3389 ByteArray::from("b"),
3390 ByteArray::from("d"),
3391 ],
3392 vec![
3393 ByteArray::from("c"),
3394 ByteArray::from("a"),
3395 ByteArray::from("b"),
3396 ],
3397 ];
3398
3399 let def_levels = vec![
3400 vec![1, 0, 0, 1, 0, 0, 1, 1],
3401 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3402 ];
3403
3404 let opts = TestOptions {
3405 encoding: Encoding::RLE_DICTIONARY,
3406 ..Default::default()
3407 };
3408
3409 generate_single_column_file_with_data::<ByteArrayType>(
3410 &values,
3411 Some(&def_levels),
3412 file.try_clone().unwrap(), schema,
3414 Some(arrow_field),
3415 &opts,
3416 )
3417 .unwrap();
3418
3419 file.rewind().unwrap();
3420
3421 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3422
3423 let batches = record_reader
3424 .collect::<Result<Vec<RecordBatch>, _>>()
3425 .unwrap();
3426
3427 assert_eq!(batches.len(), 6);
3428 assert!(batches.iter().all(|x| x.num_columns() == 1));
3429
3430 let row_counts = batches
3431 .iter()
3432 .map(|x| (x.num_rows(), x.column(0).null_count()))
3433 .collect::<Vec<_>>();
3434
3435 assert_eq!(
3436 row_counts,
3437 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3438 );
3439
3440 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3441
3442 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3444 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3446 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3447 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3449 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3450 }
3451
3452 #[test]
3453 fn test_read_null_list() {
3454 let testdata = arrow::util::test_util::parquet_test_data();
3455 let path = format!("{testdata}/null_list.parquet");
3456 let file = File::open(path).unwrap();
3457 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3458
3459 let batch = record_batch_reader.next().unwrap().unwrap();
3460 assert_eq!(batch.num_rows(), 1);
3461 assert_eq!(batch.num_columns(), 1);
3462 assert_eq!(batch.column(0).len(), 1);
3463
3464 let list = batch
3465 .column(0)
3466 .as_any()
3467 .downcast_ref::<ListArray>()
3468 .unwrap();
3469 assert_eq!(list.len(), 1);
3470 assert!(list.is_valid(0));
3471
3472 let val = list.value(0);
3473 assert_eq!(val.len(), 0);
3474 }
3475
3476 #[test]
3477 fn test_null_schema_inference() {
3478 let testdata = arrow::util::test_util::parquet_test_data();
3479 let path = format!("{testdata}/null_list.parquet");
3480 let file = File::open(path).unwrap();
3481
3482 let arrow_field = Field::new(
3483 "emptylist",
3484 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3485 true,
3486 );
3487
3488 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3489 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3490 let schema = builder.schema();
3491 assert_eq!(schema.fields().len(), 1);
3492 assert_eq!(schema.field(0), &arrow_field);
3493 }
3494
3495 #[test]
3496 fn test_skip_metadata() {
3497 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3498 let field = Field::new("col", col.data_type().clone(), true);
3499
3500 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3501
3502 let metadata = [("key".to_string(), "value".to_string())]
3503 .into_iter()
3504 .collect();
3505
3506 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3507
3508 assert_ne!(schema_with_metadata, schema_without_metadata);
3509
3510 let batch =
3511 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3512
3513 let file = |version: WriterVersion| {
3514 let props = WriterProperties::builder()
3515 .set_writer_version(version)
3516 .build();
3517
3518 let file = tempfile().unwrap();
3519 let mut writer =
3520 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3521 .unwrap();
3522 writer.write(&batch).unwrap();
3523 writer.close().unwrap();
3524 file
3525 };
3526
3527 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3528
3529 let v1_reader = file(WriterVersion::PARQUET_1_0);
3530 let v2_reader = file(WriterVersion::PARQUET_2_0);
3531
3532 let arrow_reader =
3533 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3534 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3535
3536 let reader =
3537 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3538 .unwrap()
3539 .build()
3540 .unwrap();
3541 assert_eq!(reader.schema(), schema_without_metadata);
3542
3543 let arrow_reader =
3544 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3545 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3546
3547 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3548 .unwrap()
3549 .build()
3550 .unwrap();
3551 assert_eq!(reader.schema(), schema_without_metadata);
3552 }
3553
3554 fn write_parquet_from_iter<I, F>(value: I) -> File
3555 where
3556 I: IntoIterator<Item = (F, ArrayRef)>,
3557 F: AsRef<str>,
3558 {
3559 let batch = RecordBatch::try_from_iter(value).unwrap();
3560 let file = tempfile().unwrap();
3561 let mut writer =
3562 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3563 writer.write(&batch).unwrap();
3564 writer.close().unwrap();
3565 file
3566 }
3567
3568 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3569 where
3570 I: IntoIterator<Item = (F, ArrayRef)>,
3571 F: AsRef<str>,
3572 {
3573 let file = write_parquet_from_iter(value);
3574 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3575 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3576 file.try_clone().unwrap(),
3577 options_with_schema,
3578 );
3579 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3580 }
3581
3582 #[test]
3583 fn test_schema_too_few_columns() {
3584 run_schema_test_with_error(
3585 vec![
3586 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3587 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3588 ],
3589 Arc::new(Schema::new(vec![Field::new(
3590 "int64",
3591 ArrowDataType::Int64,
3592 false,
3593 )])),
3594 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3595 );
3596 }
3597
3598 #[test]
3599 fn test_schema_too_many_columns() {
3600 run_schema_test_with_error(
3601 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3602 Arc::new(Schema::new(vec![
3603 Field::new("int64", ArrowDataType::Int64, false),
3604 Field::new("int32", ArrowDataType::Int32, false),
3605 ])),
3606 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3607 );
3608 }
3609
3610 #[test]
3611 fn test_schema_mismatched_column_names() {
3612 run_schema_test_with_error(
3613 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3614 Arc::new(Schema::new(vec![Field::new(
3615 "other",
3616 ArrowDataType::Int64,
3617 false,
3618 )])),
3619 "Arrow: incompatible arrow schema, expected field named int64 got other",
3620 );
3621 }
3622
3623 #[test]
3624 fn test_schema_incompatible_columns() {
3625 run_schema_test_with_error(
3626 vec![
3627 (
3628 "col1_invalid",
3629 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3630 ),
3631 (
3632 "col2_valid",
3633 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3634 ),
3635 (
3636 "col3_invalid",
3637 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3638 ),
3639 ],
3640 Arc::new(Schema::new(vec![
3641 Field::new("col1_invalid", ArrowDataType::Int32, false),
3642 Field::new("col2_valid", ArrowDataType::Int32, false),
3643 Field::new("col3_invalid", ArrowDataType::Int32, false),
3644 ])),
3645 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3646 );
3647 }
3648
3649 #[test]
3650 fn test_one_incompatible_nested_column() {
3651 let nested_fields = Fields::from(vec![
3652 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3653 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3654 ]);
3655 let nested = StructArray::try_new(
3656 nested_fields,
3657 vec![
3658 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3659 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3660 ],
3661 None,
3662 )
3663 .expect("struct array");
3664 let supplied_nested_fields = Fields::from(vec![
3665 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3666 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3667 ]);
3668 run_schema_test_with_error(
3669 vec![
3670 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3671 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3672 ("nested", Arc::new(nested) as ArrayRef),
3673 ],
3674 Arc::new(Schema::new(vec![
3675 Field::new("col1", ArrowDataType::Int64, false),
3676 Field::new("col2", ArrowDataType::Int32, false),
3677 Field::new(
3678 "nested",
3679 ArrowDataType::Struct(supplied_nested_fields),
3680 false,
3681 ),
3682 ])),
3683 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3684 requested Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int32) \
3685 but found Struct(\"nested1_valid\": Utf8, \"nested1_invalid\": Int64)",
3686 );
3687 }
3688
3689 fn utf8_parquet() -> Bytes {
3691 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3692 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3693 let props = None;
3694 let mut parquet_data = vec![];
3696 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3697 writer.write(&batch).unwrap();
3698 writer.close().unwrap();
3699 Bytes::from(parquet_data)
3700 }
3701
3702 #[test]
3703 fn test_schema_error_bad_types() {
3704 let parquet_data = utf8_parquet();
3706
3707 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3709 "column1",
3710 arrow::datatypes::DataType::Int32,
3711 false,
3712 )]));
3713
3714 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3716 let err =
3717 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3718 .unwrap_err();
3719 assert_eq!(
3720 err.to_string(),
3721 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
3722 )
3723 }
3724
3725 #[test]
3726 fn test_schema_error_bad_nullability() {
3727 let parquet_data = utf8_parquet();
3729
3730 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3732 "column1",
3733 arrow::datatypes::DataType::Utf8,
3734 true,
3735 )]));
3736
3737 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3739 let err =
3740 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3741 .unwrap_err();
3742 assert_eq!(
3743 err.to_string(),
3744 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
3745 )
3746 }
3747
3748 #[test]
3749 fn test_read_binary_as_utf8() {
3750 let file = write_parquet_from_iter(vec![
3751 (
3752 "binary_to_utf8",
3753 Arc::new(BinaryArray::from(vec![
3754 b"one".as_ref(),
3755 b"two".as_ref(),
3756 b"three".as_ref(),
3757 ])) as ArrayRef,
3758 ),
3759 (
3760 "large_binary_to_large_utf8",
3761 Arc::new(LargeBinaryArray::from(vec![
3762 b"one".as_ref(),
3763 b"two".as_ref(),
3764 b"three".as_ref(),
3765 ])) as ArrayRef,
3766 ),
3767 (
3768 "binary_view_to_utf8_view",
3769 Arc::new(BinaryViewArray::from(vec![
3770 b"one".as_ref(),
3771 b"two".as_ref(),
3772 b"three".as_ref(),
3773 ])) as ArrayRef,
3774 ),
3775 ]);
3776 let supplied_fields = Fields::from(vec![
3777 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3778 Field::new(
3779 "large_binary_to_large_utf8",
3780 ArrowDataType::LargeUtf8,
3781 false,
3782 ),
3783 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3784 ]);
3785
3786 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3787 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3788 file.try_clone().unwrap(),
3789 options,
3790 )
3791 .expect("reader builder with schema")
3792 .build()
3793 .expect("reader with schema");
3794
3795 let batch = arrow_reader.next().unwrap().unwrap();
3796 assert_eq!(batch.num_columns(), 3);
3797 assert_eq!(batch.num_rows(), 3);
3798 assert_eq!(
3799 batch
3800 .column(0)
3801 .as_string::<i32>()
3802 .iter()
3803 .collect::<Vec<_>>(),
3804 vec![Some("one"), Some("two"), Some("three")]
3805 );
3806
3807 assert_eq!(
3808 batch
3809 .column(1)
3810 .as_string::<i64>()
3811 .iter()
3812 .collect::<Vec<_>>(),
3813 vec![Some("one"), Some("two"), Some("three")]
3814 );
3815
3816 assert_eq!(
3817 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3818 vec![Some("one"), Some("two"), Some("three")]
3819 );
3820 }
3821
3822 #[test]
3823 #[should_panic(expected = "Invalid UTF8 sequence at")]
3824 fn test_read_non_utf8_binary_as_utf8() {
3825 let file = write_parquet_from_iter(vec![(
3826 "non_utf8_binary",
3827 Arc::new(BinaryArray::from(vec![
3828 b"\xDE\x00\xFF".as_ref(),
3829 b"\xDE\x01\xAA".as_ref(),
3830 b"\xDE\x02\xFF".as_ref(),
3831 ])) as ArrayRef,
3832 )]);
3833 let supplied_fields = Fields::from(vec![Field::new(
3834 "non_utf8_binary",
3835 ArrowDataType::Utf8,
3836 false,
3837 )]);
3838
3839 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3840 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3841 file.try_clone().unwrap(),
3842 options,
3843 )
3844 .expect("reader builder with schema")
3845 .build()
3846 .expect("reader with schema");
3847 arrow_reader.next().unwrap().unwrap_err();
3848 }
3849
3850 #[test]
3851 fn test_with_schema() {
3852 let nested_fields = Fields::from(vec![
3853 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3854 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3855 ]);
3856
3857 let nested_arrays: Vec<ArrayRef> = vec![
3858 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3859 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3860 ];
3861
3862 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3863
3864 let file = write_parquet_from_iter(vec![
3865 (
3866 "int32_to_ts_second",
3867 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3868 ),
3869 (
3870 "date32_to_date64",
3871 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3872 ),
3873 ("nested", Arc::new(nested) as ArrayRef),
3874 ]);
3875
3876 let supplied_nested_fields = Fields::from(vec![
3877 Field::new(
3878 "utf8_to_dict",
3879 ArrowDataType::Dictionary(
3880 Box::new(ArrowDataType::Int32),
3881 Box::new(ArrowDataType::Utf8),
3882 ),
3883 false,
3884 ),
3885 Field::new(
3886 "int64_to_ts_nano",
3887 ArrowDataType::Timestamp(
3888 arrow::datatypes::TimeUnit::Nanosecond,
3889 Some("+10:00".into()),
3890 ),
3891 false,
3892 ),
3893 ]);
3894
3895 let supplied_schema = Arc::new(Schema::new(vec![
3896 Field::new(
3897 "int32_to_ts_second",
3898 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3899 false,
3900 ),
3901 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3902 Field::new(
3903 "nested",
3904 ArrowDataType::Struct(supplied_nested_fields),
3905 false,
3906 ),
3907 ]));
3908
3909 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3910 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3911 file.try_clone().unwrap(),
3912 options,
3913 )
3914 .expect("reader builder with schema")
3915 .build()
3916 .expect("reader with schema");
3917
3918 assert_eq!(arrow_reader.schema(), supplied_schema);
3919 let batch = arrow_reader.next().unwrap().unwrap();
3920 assert_eq!(batch.num_columns(), 3);
3921 assert_eq!(batch.num_rows(), 4);
3922 assert_eq!(
3923 batch
3924 .column(0)
3925 .as_any()
3926 .downcast_ref::<TimestampSecondArray>()
3927 .expect("downcast to timestamp second")
3928 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3929 .map(|v| v.to_string())
3930 .expect("value as datetime"),
3931 "1970-01-01 01:00:00 +01:00"
3932 );
3933 assert_eq!(
3934 batch
3935 .column(1)
3936 .as_any()
3937 .downcast_ref::<Date64Array>()
3938 .expect("downcast to date64")
3939 .value_as_date(0)
3940 .map(|v| v.to_string())
3941 .expect("value as date"),
3942 "1970-01-01"
3943 );
3944
3945 let nested = batch
3946 .column(2)
3947 .as_any()
3948 .downcast_ref::<StructArray>()
3949 .expect("downcast to struct");
3950
3951 let nested_dict = nested
3952 .column(0)
3953 .as_any()
3954 .downcast_ref::<Int32DictionaryArray>()
3955 .expect("downcast to dictionary");
3956
3957 assert_eq!(
3958 nested_dict
3959 .values()
3960 .as_any()
3961 .downcast_ref::<StringArray>()
3962 .expect("downcast to string")
3963 .iter()
3964 .collect::<Vec<_>>(),
3965 vec![Some("a"), Some("b")]
3966 );
3967
3968 assert_eq!(
3969 nested_dict.keys().iter().collect::<Vec<_>>(),
3970 vec![Some(0), Some(0), Some(0), Some(1)]
3971 );
3972
3973 assert_eq!(
3974 nested
3975 .column(1)
3976 .as_any()
3977 .downcast_ref::<TimestampNanosecondArray>()
3978 .expect("downcast to timestamp nanosecond")
3979 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3980 .map(|v| v.to_string())
3981 .expect("value as datetime"),
3982 "1970-01-01 10:00:00.000000001 +10:00"
3983 );
3984 }
3985
3986 #[test]
3987 fn test_empty_projection() {
3988 let testdata = arrow::util::test_util::parquet_test_data();
3989 let path = format!("{testdata}/alltypes_plain.parquet");
3990 let file = File::open(path).unwrap();
3991
3992 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3993 let file_metadata = builder.metadata().file_metadata();
3994 let expected_rows = file_metadata.num_rows() as usize;
3995
3996 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3997 let batch_reader = builder
3998 .with_projection(mask)
3999 .with_batch_size(2)
4000 .build()
4001 .unwrap();
4002
4003 let mut total_rows = 0;
4004 for maybe_batch in batch_reader {
4005 let batch = maybe_batch.unwrap();
4006 total_rows += batch.num_rows();
4007 assert_eq!(batch.num_columns(), 0);
4008 assert!(batch.num_rows() <= 2);
4009 }
4010
4011 assert_eq!(total_rows, expected_rows);
4012 }
4013
4014 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4015 let schema = Arc::new(Schema::new(vec![Field::new(
4016 "list",
4017 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4018 true,
4019 )]));
4020
4021 let mut buf = Vec::with_capacity(1024);
4022
4023 let mut writer = ArrowWriter::try_new(
4024 &mut buf,
4025 schema.clone(),
4026 Some(
4027 WriterProperties::builder()
4028 .set_max_row_group_size(row_group_size)
4029 .build(),
4030 ),
4031 )
4032 .unwrap();
4033 for _ in 0..2 {
4034 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4035 for _ in 0..(batch_size) {
4036 list_builder.append(true);
4037 }
4038 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4039 .unwrap();
4040 writer.write(&batch).unwrap();
4041 }
4042 writer.close().unwrap();
4043
4044 let mut record_reader =
4045 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4046 assert_eq!(
4047 batch_size,
4048 record_reader.next().unwrap().unwrap().num_rows()
4049 );
4050 assert_eq!(
4051 batch_size,
4052 record_reader.next().unwrap().unwrap().num_rows()
4053 );
4054 }
4055
4056 #[test]
4057 fn test_row_group_exact_multiple() {
4058 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4059 test_row_group_batch(8, 8);
4060 test_row_group_batch(10, 8);
4061 test_row_group_batch(8, 10);
4062 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4063 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4064 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4065 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4066 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4067 }
4068
4069 fn get_expected_batches(
4072 column: &RecordBatch,
4073 selection: &RowSelection,
4074 batch_size: usize,
4075 ) -> Vec<RecordBatch> {
4076 let mut expected_batches = vec![];
4077
4078 let mut selection: VecDeque<_> = selection.clone().into();
4079 let mut row_offset = 0;
4080 let mut last_start = None;
4081 while row_offset < column.num_rows() && !selection.is_empty() {
4082 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4083 while batch_remaining > 0 && !selection.is_empty() {
4084 let (to_read, skip) = match selection.front_mut() {
4085 Some(selection) if selection.row_count > batch_remaining => {
4086 selection.row_count -= batch_remaining;
4087 (batch_remaining, selection.skip)
4088 }
4089 Some(_) => {
4090 let select = selection.pop_front().unwrap();
4091 (select.row_count, select.skip)
4092 }
4093 None => break,
4094 };
4095
4096 batch_remaining -= to_read;
4097
4098 match skip {
4099 true => {
4100 if let Some(last_start) = last_start.take() {
4101 expected_batches.push(column.slice(last_start, row_offset - last_start))
4102 }
4103 row_offset += to_read
4104 }
4105 false => {
4106 last_start.get_or_insert(row_offset);
4107 row_offset += to_read
4108 }
4109 }
4110 }
4111 }
4112
4113 if let Some(last_start) = last_start.take() {
4114 expected_batches.push(column.slice(last_start, row_offset - last_start))
4115 }
4116
4117 for batch in &expected_batches[..expected_batches.len() - 1] {
4119 assert_eq!(batch.num_rows(), batch_size);
4120 }
4121
4122 expected_batches
4123 }
4124
4125 fn create_test_selection(
4126 step_len: usize,
4127 total_len: usize,
4128 skip_first: bool,
4129 ) -> (RowSelection, usize) {
4130 let mut remaining = total_len;
4131 let mut skip = skip_first;
4132 let mut vec = vec![];
4133 let mut selected_count = 0;
4134 while remaining != 0 {
4135 let step = if remaining > step_len {
4136 step_len
4137 } else {
4138 remaining
4139 };
4140 vec.push(RowSelector {
4141 row_count: step,
4142 skip,
4143 });
4144 remaining -= step;
4145 if !skip {
4146 selected_count += step;
4147 }
4148 skip = !skip;
4149 }
4150 (vec.into(), selected_count)
4151 }
4152
4153 #[test]
4154 fn test_scan_row_with_selection() {
4155 let testdata = arrow::util::test_util::parquet_test_data();
4156 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4157 let test_file = File::open(&path).unwrap();
4158
4159 let mut serial_reader =
4160 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4161 let data = serial_reader.next().unwrap().unwrap();
4162
4163 let do_test = |batch_size: usize, selection_len: usize| {
4164 for skip_first in [false, true] {
4165 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4166
4167 let expected = get_expected_batches(&data, &selections, batch_size);
4168 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4169 assert_eq!(
4170 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4171 expected,
4172 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4173 );
4174 }
4175 };
4176
4177 do_test(1000, 1000);
4180
4181 do_test(20, 20);
4183
4184 do_test(20, 5);
4186
4187 do_test(20, 5);
4190
4191 fn create_skip_reader(
4192 test_file: &File,
4193 batch_size: usize,
4194 selections: RowSelection,
4195 ) -> ParquetRecordBatchReader {
4196 let options = ArrowReaderOptions::new().with_page_index(true);
4197 let file = test_file.try_clone().unwrap();
4198 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4199 .unwrap()
4200 .with_batch_size(batch_size)
4201 .with_row_selection(selections)
4202 .build()
4203 .unwrap()
4204 }
4205 }
4206
4207 #[test]
4208 fn test_batch_size_overallocate() {
4209 let testdata = arrow::util::test_util::parquet_test_data();
4210 let path = format!("{testdata}/alltypes_plain.parquet");
4212 let test_file = File::open(path).unwrap();
4213
4214 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4215 let num_rows = builder.metadata.file_metadata().num_rows();
4216 let reader = builder
4217 .with_batch_size(1024)
4218 .with_projection(ProjectionMask::all())
4219 .build()
4220 .unwrap();
4221 assert_ne!(1024, num_rows);
4222 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4223 }
4224
4225 #[test]
4226 fn test_read_with_page_index_enabled() {
4227 let testdata = arrow::util::test_util::parquet_test_data();
4228
4229 {
4230 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4232 let test_file = File::open(path).unwrap();
4233 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4234 test_file,
4235 ArrowReaderOptions::new().with_page_index(true),
4236 )
4237 .unwrap();
4238 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4239 let reader = builder.build().unwrap();
4240 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4241 assert_eq!(batches.len(), 8);
4242 }
4243
4244 {
4245 let path = format!("{testdata}/alltypes_plain.parquet");
4247 let test_file = File::open(path).unwrap();
4248 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4249 test_file,
4250 ArrowReaderOptions::new().with_page_index(true),
4251 )
4252 .unwrap();
4253 assert!(builder.metadata().offset_index().is_none());
4256 let reader = builder.build().unwrap();
4257 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4258 assert_eq!(batches.len(), 1);
4259 }
4260 }
4261
4262 #[test]
4263 fn test_raw_repetition() {
4264 const MESSAGE_TYPE: &str = "
4265 message Log {
4266 OPTIONAL INT32 eventType;
4267 REPEATED INT32 category;
4268 REPEATED group filter {
4269 OPTIONAL INT32 error;
4270 }
4271 }
4272 ";
4273 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4274 let props = Default::default();
4275
4276 let mut buf = Vec::with_capacity(1024);
4277 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4278 let mut row_group_writer = writer.next_row_group().unwrap();
4279
4280 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4282 col_writer
4283 .typed::<Int32Type>()
4284 .write_batch(&[1], Some(&[1]), None)
4285 .unwrap();
4286 col_writer.close().unwrap();
4287 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4289 col_writer
4290 .typed::<Int32Type>()
4291 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4292 .unwrap();
4293 col_writer.close().unwrap();
4294 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4296 col_writer
4297 .typed::<Int32Type>()
4298 .write_batch(&[1], Some(&[1]), Some(&[0]))
4299 .unwrap();
4300 col_writer.close().unwrap();
4301
4302 let rg_md = row_group_writer.close().unwrap();
4303 assert_eq!(rg_md.num_rows(), 1);
4304 writer.close().unwrap();
4305
4306 let bytes = Bytes::from(buf);
4307
4308 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4309 let full = no_mask.next().unwrap().unwrap();
4310
4311 assert_eq!(full.num_columns(), 3);
4312
4313 for idx in 0..3 {
4314 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4315 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4316 let mut reader = b.with_projection(mask).build().unwrap();
4317 let projected = reader.next().unwrap().unwrap();
4318
4319 assert_eq!(projected.num_columns(), 1);
4320 assert_eq!(full.column(idx), projected.column(0));
4321 }
4322 }
4323
4324 #[test]
4325 fn test_read_lz4_raw() {
4326 let testdata = arrow::util::test_util::parquet_test_data();
4327 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4328 let file = File::open(path).unwrap();
4329
4330 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4331 .unwrap()
4332 .collect::<Result<Vec<_>, _>>()
4333 .unwrap();
4334 assert_eq!(batches.len(), 1);
4335 let batch = &batches[0];
4336
4337 assert_eq!(batch.num_columns(), 3);
4338 assert_eq!(batch.num_rows(), 4);
4339
4340 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4342 assert_eq!(
4343 a.values(),
4344 &[1593604800, 1593604800, 1593604801, 1593604801]
4345 );
4346
4347 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4348 let a: Vec<_> = a.iter().flatten().collect();
4349 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4350
4351 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4352 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4353 }
4354
4355 #[test]
4365 fn test_read_lz4_hadoop_fallback() {
4366 for file in [
4367 "hadoop_lz4_compressed.parquet",
4368 "non_hadoop_lz4_compressed.parquet",
4369 ] {
4370 let testdata = arrow::util::test_util::parquet_test_data();
4371 let path = format!("{testdata}/{file}");
4372 let file = File::open(path).unwrap();
4373 let expected_rows = 4;
4374
4375 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4376 .unwrap()
4377 .collect::<Result<Vec<_>, _>>()
4378 .unwrap();
4379 assert_eq!(batches.len(), 1);
4380 let batch = &batches[0];
4381
4382 assert_eq!(batch.num_columns(), 3);
4383 assert_eq!(batch.num_rows(), expected_rows);
4384
4385 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4386 assert_eq!(
4387 a.values(),
4388 &[1593604800, 1593604800, 1593604801, 1593604801]
4389 );
4390
4391 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4392 let b: Vec<_> = b.iter().flatten().collect();
4393 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4394
4395 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4396 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4397 }
4398 }
4399
4400 #[test]
4401 fn test_read_lz4_hadoop_large() {
4402 let testdata = arrow::util::test_util::parquet_test_data();
4403 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4404 let file = File::open(path).unwrap();
4405 let expected_rows = 10000;
4406
4407 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4408 .unwrap()
4409 .collect::<Result<Vec<_>, _>>()
4410 .unwrap();
4411 assert_eq!(batches.len(), 1);
4412 let batch = &batches[0];
4413
4414 assert_eq!(batch.num_columns(), 1);
4415 assert_eq!(batch.num_rows(), expected_rows);
4416
4417 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4418 let a: Vec<_> = a.iter().flatten().collect();
4419 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4420 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4421 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4422 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4423 }
4424
4425 #[test]
4426 #[cfg(feature = "snap")]
4427 fn test_read_nested_lists() {
4428 let testdata = arrow::util::test_util::parquet_test_data();
4429 let path = format!("{testdata}/nested_lists.snappy.parquet");
4430 let file = File::open(path).unwrap();
4431
4432 let f = file.try_clone().unwrap();
4433 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4434 let expected = reader.next().unwrap().unwrap();
4435 assert_eq!(expected.num_rows(), 3);
4436
4437 let selection = RowSelection::from(vec![
4438 RowSelector::skip(1),
4439 RowSelector::select(1),
4440 RowSelector::skip(1),
4441 ]);
4442 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4443 .unwrap()
4444 .with_row_selection(selection)
4445 .build()
4446 .unwrap();
4447
4448 let actual = reader.next().unwrap().unwrap();
4449 assert_eq!(actual.num_rows(), 1);
4450 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4451 }
4452
4453 #[test]
4454 fn test_arbitrary_decimal() {
4455 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4456 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4457 .with_precision_and_scale(19, 0)
4458 .unwrap();
4459 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4460 .with_precision_and_scale(12, 0)
4461 .unwrap();
4462 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4463 .with_precision_and_scale(17, 10)
4464 .unwrap();
4465
4466 let written = RecordBatch::try_from_iter([
4467 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4468 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4469 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4470 ])
4471 .unwrap();
4472
4473 let mut buffer = Vec::with_capacity(1024);
4474 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4475 writer.write(&written).unwrap();
4476 writer.close().unwrap();
4477
4478 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4479 .unwrap()
4480 .collect::<Result<Vec<_>, _>>()
4481 .unwrap();
4482
4483 assert_eq!(&written.slice(0, 8), &read[0]);
4484 }
4485
4486 #[test]
4487 fn test_list_skip() {
4488 let mut list = ListBuilder::new(Int32Builder::new());
4489 list.append_value([Some(1), Some(2)]);
4490 list.append_value([Some(3)]);
4491 list.append_value([Some(4)]);
4492 let list = list.finish();
4493 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4494
4495 let props = WriterProperties::builder()
4497 .set_data_page_row_count_limit(1)
4498 .set_write_batch_size(2)
4499 .build();
4500
4501 let mut buffer = Vec::with_capacity(1024);
4502 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4503 writer.write(&batch).unwrap();
4504 writer.close().unwrap();
4505
4506 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4507 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4508 .unwrap()
4509 .with_row_selection(selection.into())
4510 .build()
4511 .unwrap();
4512 let out = reader.next().unwrap().unwrap();
4513 assert_eq!(out.num_rows(), 1);
4514 assert_eq!(out, batch.slice(2, 1));
4515 }
4516
4517 fn test_decimal32_roundtrip() {
4518 let d = |values: Vec<i32>, p: u8| {
4519 let iter = values.into_iter();
4520 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4521 .with_precision_and_scale(p, 2)
4522 .unwrap()
4523 };
4524
4525 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4526 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4527
4528 let mut buffer = Vec::with_capacity(1024);
4529 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4530 writer.write(&batch).unwrap();
4531 writer.close().unwrap();
4532
4533 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4534 let t1 = builder.parquet_schema().columns()[0].physical_type();
4535 assert_eq!(t1, PhysicalType::INT32);
4536
4537 let mut reader = builder.build().unwrap();
4538 assert_eq!(batch.schema(), reader.schema());
4539
4540 let out = reader.next().unwrap().unwrap();
4541 assert_eq!(batch, out);
4542 }
4543
4544 fn test_decimal64_roundtrip() {
4545 let d = |values: Vec<i64>, p: u8| {
4549 let iter = values.into_iter();
4550 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4551 .with_precision_and_scale(p, 2)
4552 .unwrap()
4553 };
4554
4555 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4556 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4557 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4558
4559 let batch = RecordBatch::try_from_iter([
4560 ("d1", Arc::new(d1) as ArrayRef),
4561 ("d2", Arc::new(d2) as ArrayRef),
4562 ("d3", Arc::new(d3) as ArrayRef),
4563 ])
4564 .unwrap();
4565
4566 let mut buffer = Vec::with_capacity(1024);
4567 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4568 writer.write(&batch).unwrap();
4569 writer.close().unwrap();
4570
4571 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4572 let t1 = builder.parquet_schema().columns()[0].physical_type();
4573 assert_eq!(t1, PhysicalType::INT32);
4574 let t2 = builder.parquet_schema().columns()[1].physical_type();
4575 assert_eq!(t2, PhysicalType::INT64);
4576 let t3 = builder.parquet_schema().columns()[2].physical_type();
4577 assert_eq!(t3, PhysicalType::INT64);
4578
4579 let mut reader = builder.build().unwrap();
4580 assert_eq!(batch.schema(), reader.schema());
4581
4582 let out = reader.next().unwrap().unwrap();
4583 assert_eq!(batch, out);
4584 }
4585
4586 fn test_decimal_roundtrip<T: DecimalType>() {
4587 let d = |values: Vec<usize>, p: u8| {
4592 let iter = values.into_iter().map(T::Native::usize_as);
4593 PrimitiveArray::<T>::from_iter_values(iter)
4594 .with_precision_and_scale(p, 2)
4595 .unwrap()
4596 };
4597
4598 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4599 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4600 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4601 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4602
4603 let batch = RecordBatch::try_from_iter([
4604 ("d1", Arc::new(d1) as ArrayRef),
4605 ("d2", Arc::new(d2) as ArrayRef),
4606 ("d3", Arc::new(d3) as ArrayRef),
4607 ("d4", Arc::new(d4) as ArrayRef),
4608 ])
4609 .unwrap();
4610
4611 let mut buffer = Vec::with_capacity(1024);
4612 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4613 writer.write(&batch).unwrap();
4614 writer.close().unwrap();
4615
4616 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4617 let t1 = builder.parquet_schema().columns()[0].physical_type();
4618 assert_eq!(t1, PhysicalType::INT32);
4619 let t2 = builder.parquet_schema().columns()[1].physical_type();
4620 assert_eq!(t2, PhysicalType::INT64);
4621 let t3 = builder.parquet_schema().columns()[2].physical_type();
4622 assert_eq!(t3, PhysicalType::INT64);
4623 let t4 = builder.parquet_schema().columns()[3].physical_type();
4624 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4625
4626 let mut reader = builder.build().unwrap();
4627 assert_eq!(batch.schema(), reader.schema());
4628
4629 let out = reader.next().unwrap().unwrap();
4630 assert_eq!(batch, out);
4631 }
4632
4633 #[test]
4634 fn test_decimal() {
4635 test_decimal32_roundtrip();
4636 test_decimal64_roundtrip();
4637 test_decimal_roundtrip::<Decimal128Type>();
4638 test_decimal_roundtrip::<Decimal256Type>();
4639 }
4640
4641 #[test]
4642 fn test_list_selection() {
4643 let schema = Arc::new(Schema::new(vec![Field::new_list(
4644 "list",
4645 Field::new_list_field(ArrowDataType::Utf8, true),
4646 false,
4647 )]));
4648 let mut buf = Vec::with_capacity(1024);
4649
4650 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4651
4652 for i in 0..2 {
4653 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4654 for j in 0..1024 {
4655 list_a_builder.values().append_value(format!("{i} {j}"));
4656 list_a_builder.append(true);
4657 }
4658 let batch =
4659 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4660 .unwrap();
4661 writer.write(&batch).unwrap();
4662 }
4663 let _metadata = writer.close().unwrap();
4664
4665 let buf = Bytes::from(buf);
4666 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4667 .unwrap()
4668 .with_row_selection(RowSelection::from(vec![
4669 RowSelector::skip(100),
4670 RowSelector::select(924),
4671 RowSelector::skip(100),
4672 RowSelector::select(924),
4673 ]))
4674 .build()
4675 .unwrap();
4676
4677 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4678 let batch = concat_batches(&schema, &batches).unwrap();
4679
4680 assert_eq!(batch.num_rows(), 924 * 2);
4681 let list = batch.column(0).as_list::<i32>();
4682
4683 for w in list.value_offsets().windows(2) {
4684 assert_eq!(w[0] + 1, w[1])
4685 }
4686 let mut values = list.values().as_string::<i32>().iter();
4687
4688 for i in 0..2 {
4689 for j in 100..1024 {
4690 let expected = format!("{i} {j}");
4691 assert_eq!(values.next().unwrap().unwrap(), &expected);
4692 }
4693 }
4694 }
4695
4696 #[test]
4697 fn test_list_selection_fuzz() {
4698 let mut rng = rng();
4699 let schema = Arc::new(Schema::new(vec![Field::new_list(
4700 "list",
4701 Field::new_list(
4702 Field::LIST_FIELD_DEFAULT_NAME,
4703 Field::new_list_field(ArrowDataType::Int32, true),
4704 true,
4705 ),
4706 true,
4707 )]));
4708 let mut buf = Vec::with_capacity(1024);
4709 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4710
4711 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4712
4713 for _ in 0..2048 {
4714 if rng.random_bool(0.2) {
4715 list_a_builder.append(false);
4716 continue;
4717 }
4718
4719 let list_a_len = rng.random_range(0..10);
4720 let list_b_builder = list_a_builder.values();
4721
4722 for _ in 0..list_a_len {
4723 if rng.random_bool(0.2) {
4724 list_b_builder.append(false);
4725 continue;
4726 }
4727
4728 let list_b_len = rng.random_range(0..10);
4729 let int_builder = list_b_builder.values();
4730 for _ in 0..list_b_len {
4731 match rng.random_bool(0.2) {
4732 true => int_builder.append_null(),
4733 false => int_builder.append_value(rng.random()),
4734 }
4735 }
4736 list_b_builder.append(true)
4737 }
4738 list_a_builder.append(true);
4739 }
4740
4741 let array = Arc::new(list_a_builder.finish());
4742 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4743
4744 writer.write(&batch).unwrap();
4745 let _metadata = writer.close().unwrap();
4746
4747 let buf = Bytes::from(buf);
4748
4749 let cases = [
4750 vec![
4751 RowSelector::skip(100),
4752 RowSelector::select(924),
4753 RowSelector::skip(100),
4754 RowSelector::select(924),
4755 ],
4756 vec![
4757 RowSelector::select(924),
4758 RowSelector::skip(100),
4759 RowSelector::select(924),
4760 RowSelector::skip(100),
4761 ],
4762 vec![
4763 RowSelector::skip(1023),
4764 RowSelector::select(1),
4765 RowSelector::skip(1023),
4766 RowSelector::select(1),
4767 ],
4768 vec![
4769 RowSelector::select(1),
4770 RowSelector::skip(1023),
4771 RowSelector::select(1),
4772 RowSelector::skip(1023),
4773 ],
4774 ];
4775
4776 for batch_size in [100, 1024, 2048] {
4777 for selection in &cases {
4778 let selection = RowSelection::from(selection.clone());
4779 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4780 .unwrap()
4781 .with_row_selection(selection.clone())
4782 .with_batch_size(batch_size)
4783 .build()
4784 .unwrap();
4785
4786 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4787 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4788 assert_eq!(actual.num_rows(), selection.row_count());
4789
4790 let mut batch_offset = 0;
4791 let mut actual_offset = 0;
4792 for selector in selection.iter() {
4793 if selector.skip {
4794 batch_offset += selector.row_count;
4795 continue;
4796 }
4797
4798 assert_eq!(
4799 batch.slice(batch_offset, selector.row_count),
4800 actual.slice(actual_offset, selector.row_count)
4801 );
4802
4803 batch_offset += selector.row_count;
4804 actual_offset += selector.row_count;
4805 }
4806 }
4807 }
4808 }
4809
4810 #[test]
4811 fn test_read_old_nested_list() {
4812 use arrow::datatypes::DataType;
4813 use arrow::datatypes::ToByteSlice;
4814
4815 let testdata = arrow::util::test_util::parquet_test_data();
4816 let path = format!("{testdata}/old_list_structure.parquet");
4825 let test_file = File::open(path).unwrap();
4826
4827 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4829
4830 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4832
4833 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4835 "array",
4836 DataType::Int32,
4837 false,
4838 ))))
4839 .len(2)
4840 .add_buffer(a_value_offsets)
4841 .add_child_data(a_values.into_data())
4842 .build()
4843 .unwrap();
4844 let a = ListArray::from(a_list_data);
4845
4846 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4847 let mut reader = builder.build().unwrap();
4848 let out = reader.next().unwrap().unwrap();
4849 assert_eq!(out.num_rows(), 1);
4850 assert_eq!(out.num_columns(), 1);
4851 let c0 = out.column(0);
4853 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4854 let r0 = c0arr.value(0);
4856 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4857 assert_eq!(r0arr, &a);
4858 }
4859
4860 #[test]
4861 fn test_map_no_value() {
4862 let testdata = arrow::util::test_util::parquet_test_data();
4882 let path = format!("{testdata}/map_no_value.parquet");
4883 let file = File::open(path).unwrap();
4884
4885 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4886 .unwrap()
4887 .build()
4888 .unwrap();
4889 let out = reader.next().unwrap().unwrap();
4890 assert_eq!(out.num_rows(), 3);
4891 assert_eq!(out.num_columns(), 3);
4892 let c0 = out.column(1).as_list::<i32>();
4894 let c1 = out.column(2).as_list::<i32>();
4895 assert_eq!(c0.len(), c1.len());
4896 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4897 }
4898
4899 #[test]
4900 fn test_get_row_group_column_bloom_filter_with_length() {
4901 let testdata = arrow::util::test_util::parquet_test_data();
4903 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4904 let file = File::open(path).unwrap();
4905 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4906 let schema = builder.schema().clone();
4907 let reader = builder.build().unwrap();
4908
4909 let mut parquet_data = Vec::new();
4910 let props = WriterProperties::builder()
4911 .set_bloom_filter_enabled(true)
4912 .build();
4913 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4914 for batch in reader {
4915 let batch = batch.unwrap();
4916 writer.write(&batch).unwrap();
4917 }
4918 writer.close().unwrap();
4919
4920 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4922 }
4923
4924 #[test]
4925 fn test_get_row_group_column_bloom_filter_without_length() {
4926 let testdata = arrow::util::test_util::parquet_test_data();
4927 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4928 let data = Bytes::from(std::fs::read(path).unwrap());
4929 test_get_row_group_column_bloom_filter(data, false);
4930 }
4931
4932 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
4933 let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
4934
4935 let metadata = builder.metadata();
4936 assert_eq!(metadata.num_row_groups(), 1);
4937 let row_group = metadata.row_group(0);
4938 let column = row_group.column(0);
4939 assert_eq!(column.bloom_filter_length().is_some(), with_length);
4940
4941 let sbbf = builder
4942 .get_row_group_column_bloom_filter(0, 0)
4943 .unwrap()
4944 .unwrap();
4945 assert!(sbbf.check(&"Hello"));
4946 assert!(!sbbf.check(&"Hello_Not_Exists"));
4947 }
4948}