1use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
32use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use crate::bloom_filter::{
34 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
35};
36use crate::column::page::{PageIterator, PageReader};
37#[cfg(feature = "encryption")]
38use crate::encryption::decrypt::FileDecryptionProperties;
39use crate::errors::{ParquetError, Result};
40use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
41use crate::file::reader::{ChunkReader, SerializedPageReader};
42use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54pub struct ArrowReaderBuilder<T> {
100 pub(crate) input: T,
101
102 pub(crate) metadata: Arc<ParquetMetaData>,
103
104 pub(crate) schema: SchemaRef,
105
106 pub(crate) fields: Option<Arc<ParquetField>>,
107
108 pub(crate) batch_size: usize,
109
110 pub(crate) row_groups: Option<Vec<usize>>,
111
112 pub(crate) projection: ProjectionMask,
113
114 pub(crate) filter: Option<RowFilter>,
115
116 pub(crate) selection: Option<RowSelection>,
117
118 pub(crate) limit: Option<usize>,
119
120 pub(crate) offset: Option<usize>,
121
122 pub(crate) metrics: ArrowReaderMetrics,
123
124 pub(crate) max_predicate_cache_size: usize,
125}
126
127impl<T: Debug> Debug for ArrowReaderBuilder<T> {
128 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("ArrowReaderBuilder<T>")
130 .field("input", &self.input)
131 .field("metadata", &self.metadata)
132 .field("schema", &self.schema)
133 .field("fields", &self.fields)
134 .field("batch_size", &self.batch_size)
135 .field("row_groups", &self.row_groups)
136 .field("projection", &self.projection)
137 .field("filter", &self.filter)
138 .field("selection", &self.selection)
139 .field("limit", &self.limit)
140 .field("offset", &self.offset)
141 .field("metrics", &self.metrics)
142 .finish()
143 }
144}
145
146impl<T> ArrowReaderBuilder<T> {
147 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
148 Self {
149 input,
150 metadata: metadata.metadata,
151 schema: metadata.schema,
152 fields: metadata.fields,
153 batch_size: 1024,
154 row_groups: None,
155 projection: ProjectionMask::all(),
156 filter: None,
157 selection: None,
158 limit: None,
159 offset: None,
160 metrics: ArrowReaderMetrics::Disabled,
161 max_predicate_cache_size: 100 * 1024 * 1024, }
163 }
164
165 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
167 &self.metadata
168 }
169
170 pub fn parquet_schema(&self) -> &SchemaDescriptor {
172 self.metadata.file_metadata().schema_descr()
173 }
174
175 pub fn schema(&self) -> &SchemaRef {
177 &self.schema
178 }
179
180 pub fn with_batch_size(self, batch_size: usize) -> Self {
183 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
185 Self { batch_size, ..self }
186 }
187
188 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
192 Self {
193 row_groups: Some(row_groups),
194 ..self
195 }
196 }
197
198 pub fn with_projection(self, mask: ProjectionMask) -> Self {
200 Self {
201 projection: mask,
202 ..self
203 }
204 }
205
206 pub fn with_row_selection(self, selection: RowSelection) -> Self {
266 Self {
267 selection: Some(selection),
268 ..self
269 }
270 }
271
272 pub fn with_row_filter(self, filter: RowFilter) -> Self {
279 Self {
280 filter: Some(filter),
281 ..self
282 }
283 }
284
285 pub fn with_limit(self, limit: usize) -> Self {
293 Self {
294 limit: Some(limit),
295 ..self
296 }
297 }
298
299 pub fn with_offset(self, offset: usize) -> Self {
307 Self {
308 offset: Some(offset),
309 ..self
310 }
311 }
312
313 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
348 Self { metrics, ..self }
349 }
350
351 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
366 Self {
367 max_predicate_cache_size,
368 ..self
369 }
370 }
371}
372
373#[derive(Debug, Clone, Default)]
378pub struct ArrowReaderOptions {
379 skip_arrow_metadata: bool,
381 supplied_schema: Option<SchemaRef>,
386 pub(crate) page_index_policy: PageIndexPolicy,
388 #[cfg(feature = "encryption")]
390 pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
391}
392
393impl ArrowReaderOptions {
394 pub fn new() -> Self {
396 Self::default()
397 }
398
399 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
406 Self {
407 skip_arrow_metadata,
408 ..self
409 }
410 }
411
412 pub fn with_schema(self, schema: SchemaRef) -> Self {
469 Self {
470 supplied_schema: Some(schema),
471 skip_arrow_metadata: true,
472 ..self
473 }
474 }
475
476 pub fn with_page_index(self, page_index: bool) -> Self {
489 let page_index_policy = PageIndexPolicy::from(page_index);
490
491 Self {
492 page_index_policy,
493 ..self
494 }
495 }
496
497 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
499 Self {
500 page_index_policy: policy,
501 ..self
502 }
503 }
504
505 #[cfg(feature = "encryption")]
509 pub fn with_file_decryption_properties(
510 self,
511 file_decryption_properties: FileDecryptionProperties,
512 ) -> Self {
513 Self {
514 file_decryption_properties: Some(file_decryption_properties),
515 ..self
516 }
517 }
518
519 pub fn page_index(&self) -> bool {
523 self.page_index_policy != PageIndexPolicy::Skip
524 }
525
526 #[cfg(feature = "encryption")]
531 pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
532 self.file_decryption_properties.as_ref()
533 }
534}
535
536#[derive(Debug, Clone)]
551pub struct ArrowReaderMetadata {
552 pub(crate) metadata: Arc<ParquetMetaData>,
554 pub(crate) schema: SchemaRef,
556
557 pub(crate) fields: Option<Arc<ParquetField>>,
558}
559
560impl ArrowReaderMetadata {
561 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
572 let metadata =
573 ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
574 #[cfg(feature = "encryption")]
575 let metadata =
576 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
577 let metadata = metadata.parse_and_finish(reader)?;
578 Self::try_new(Arc::new(metadata), options)
579 }
580
581 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
588 match options.supplied_schema {
589 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
590 None => {
591 let kv_metadata = match options.skip_arrow_metadata {
592 true => None,
593 false => metadata.file_metadata().key_value_metadata(),
594 };
595
596 let (schema, fields) = parquet_to_arrow_schema_and_fields(
597 metadata.file_metadata().schema_descr(),
598 ProjectionMask::all(),
599 kv_metadata,
600 )?;
601
602 Ok(Self {
603 metadata,
604 schema: Arc::new(schema),
605 fields: fields.map(Arc::new),
606 })
607 }
608 }
609 }
610
611 fn with_supplied_schema(
612 metadata: Arc<ParquetMetaData>,
613 supplied_schema: SchemaRef,
614 ) -> Result<Self> {
615 let parquet_schema = metadata.file_metadata().schema_descr();
616 let field_levels = parquet_to_arrow_field_levels(
617 parquet_schema,
618 ProjectionMask::all(),
619 Some(supplied_schema.fields()),
620 )?;
621 let fields = field_levels.fields;
622 let inferred_len = fields.len();
623 let supplied_len = supplied_schema.fields().len();
624 if inferred_len != supplied_len {
628 return Err(arrow_err!(format!(
629 "Incompatible supplied Arrow schema: expected {} columns received {}",
630 inferred_len, supplied_len
631 )));
632 }
633
634 let mut errors = Vec::new();
635
636 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
637
638 for (field1, field2) in field_iter {
639 if field1.data_type() != field2.data_type() {
640 errors.push(format!(
641 "data type mismatch for field {}: requested {:?} but found {:?}",
642 field1.name(),
643 field1.data_type(),
644 field2.data_type()
645 ));
646 }
647 if field1.is_nullable() != field2.is_nullable() {
648 errors.push(format!(
649 "nullability mismatch for field {}: expected {:?} but found {:?}",
650 field1.name(),
651 field1.is_nullable(),
652 field2.is_nullable()
653 ));
654 }
655 if field1.metadata() != field2.metadata() {
656 errors.push(format!(
657 "metadata mismatch for field {}: expected {:?} but found {:?}",
658 field1.name(),
659 field1.metadata(),
660 field2.metadata()
661 ));
662 }
663 }
664
665 if !errors.is_empty() {
666 let message = errors.join(", ");
667 return Err(ParquetError::ArrowError(format!(
668 "Incompatible supplied Arrow schema: {message}",
669 )));
670 }
671
672 Ok(Self {
673 metadata,
674 schema: supplied_schema,
675 fields: field_levels.levels.map(Arc::new),
676 })
677 }
678
679 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
681 &self.metadata
682 }
683
684 pub fn parquet_schema(&self) -> &SchemaDescriptor {
686 self.metadata.file_metadata().schema_descr()
687 }
688
689 pub fn schema(&self) -> &SchemaRef {
691 &self.schema
692 }
693}
694
695#[doc(hidden)]
696pub struct SyncReader<T: ChunkReader>(T);
698
699impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
700 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
701 f.debug_tuple("SyncReader").field(&self.0).finish()
702 }
703}
704
705pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
711
712impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
713 pub fn try_new(reader: T) -> Result<Self> {
742 Self::try_new_with_options(reader, Default::default())
743 }
744
745 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
747 let metadata = ArrowReaderMetadata::load(&reader, options)?;
748 Ok(Self::new_with_metadata(reader, metadata))
749 }
750
751 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
789 Self::new_builder(SyncReader(input), metadata)
790 }
791
792 pub fn get_row_group_column_bloom_filter(
798 &mut self,
799 row_group_idx: usize,
800 column_idx: usize,
801 ) -> Result<Option<Sbbf>> {
802 let metadata = self.metadata.row_group(row_group_idx);
803 let column_metadata = metadata.column(column_idx);
804
805 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
806 offset
807 .try_into()
808 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
809 } else {
810 return Ok(None);
811 };
812
813 let buffer = match column_metadata.bloom_filter_length() {
814 Some(length) => self.input.0.get_bytes(offset, length as usize),
815 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
816 }?;
817
818 let (header, bitset_offset) =
819 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
820
821 match header.algorithm {
822 BloomFilterAlgorithm::BLOCK(_) => {
823 }
825 }
826 match header.compression {
827 BloomFilterCompression::UNCOMPRESSED(_) => {
828 }
830 }
831 match header.hash {
832 BloomFilterHash::XXHASH(_) => {
833 }
835 }
836
837 let bitset = match column_metadata.bloom_filter_length() {
838 Some(_) => buffer.slice(
839 (TryInto::<usize>::try_into(bitset_offset).unwrap()
840 - TryInto::<usize>::try_into(offset).unwrap())..,
841 ),
842 None => {
843 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
844 ParquetError::General("Bloom filter length is invalid".to_string())
845 })?;
846 self.input.0.get_bytes(bitset_offset, bitset_length)?
847 }
848 };
849 Ok(Some(Sbbf::new(&bitset)))
850 }
851
852 pub fn build(self) -> Result<ParquetRecordBatchReader> {
856 let Self {
857 input,
858 metadata,
859 schema: _,
860 fields,
861 batch_size: _,
862 row_groups,
863 projection,
864 mut filter,
865 selection,
866 limit,
867 offset,
868 metrics,
869 max_predicate_cache_size: _,
871 } = self;
872
873 let batch_size = self
875 .batch_size
876 .min(metadata.file_metadata().num_rows() as usize);
877
878 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
879
880 let reader = ReaderRowGroups {
881 reader: Arc::new(input.0),
882 metadata,
883 row_groups,
884 };
885
886 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
887
888 if let Some(filter) = filter.as_mut() {
890 for predicate in filter.predicates.iter_mut() {
891 if !plan_builder.selects_any() {
893 break;
894 }
895
896 let mut cache_projection = predicate.projection().clone();
897 cache_projection.intersect(&projection);
898
899 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
900 .build_array_reader(fields.as_deref(), predicate.projection())?;
901
902 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
903 }
904 }
905
906 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
907 .build_array_reader(fields.as_deref(), &projection)?;
908
909 let read_plan = plan_builder
910 .limited(reader.num_rows())
911 .with_offset(offset)
912 .with_limit(limit)
913 .build_limited()
914 .build();
915
916 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
917 }
918}
919
920struct ReaderRowGroups<T: ChunkReader> {
921 reader: Arc<T>,
922
923 metadata: Arc<ParquetMetaData>,
924 row_groups: Vec<usize>,
926}
927
928impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
929 fn num_rows(&self) -> usize {
930 let meta = self.metadata.row_groups();
931 self.row_groups
932 .iter()
933 .map(|x| meta[*x].num_rows() as usize)
934 .sum()
935 }
936
937 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
938 Ok(Box::new(ReaderPageIterator {
939 column_idx: i,
940 reader: self.reader.clone(),
941 metadata: self.metadata.clone(),
942 row_groups: self.row_groups.clone().into_iter(),
943 }))
944 }
945}
946
947struct ReaderPageIterator<T: ChunkReader> {
948 reader: Arc<T>,
949 column_idx: usize,
950 row_groups: std::vec::IntoIter<usize>,
951 metadata: Arc<ParquetMetaData>,
952}
953
954impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
955 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
957 let rg = self.metadata.row_group(rg_idx);
958 let column_chunk_metadata = rg.column(self.column_idx);
959 let offset_index = self.metadata.offset_index();
960 let page_locations = offset_index
963 .filter(|i| !i[rg_idx].is_empty())
964 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
965 let total_rows = rg.num_rows() as usize;
966 let reader = self.reader.clone();
967
968 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
969 .add_crypto_context(
970 rg_idx,
971 self.column_idx,
972 self.metadata.as_ref(),
973 column_chunk_metadata,
974 )
975 }
976}
977
978impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
979 type Item = Result<Box<dyn PageReader>>;
980
981 fn next(&mut self) -> Option<Self::Item> {
982 let rg_idx = self.row_groups.next()?;
983 let page_reader = self
984 .next_page_reader(rg_idx)
985 .map(|page_reader| Box::new(page_reader) as _);
986 Some(page_reader)
987 }
988}
989
990impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
991
992pub struct ParquetRecordBatchReader {
995 array_reader: Box<dyn ArrayReader>,
996 schema: SchemaRef,
997 read_plan: ReadPlan,
998}
999
1000impl Iterator for ParquetRecordBatchReader {
1001 type Item = Result<RecordBatch, ArrowError>;
1002
1003 fn next(&mut self) -> Option<Self::Item> {
1004 self.next_inner()
1005 .map_err(|arrow_err| arrow_err.into())
1006 .transpose()
1007 }
1008}
1009
1010impl ParquetRecordBatchReader {
1011 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1017 let mut read_records = 0;
1018 let batch_size = self.batch_size();
1019 match self.read_plan.selection_mut() {
1020 Some(selection) => {
1021 while read_records < batch_size && !selection.is_empty() {
1022 let front = selection.pop_front().unwrap();
1023 if front.skip {
1024 let skipped = self.array_reader.skip_records(front.row_count)?;
1025
1026 if skipped != front.row_count {
1027 return Err(general_err!(
1028 "failed to skip rows, expected {}, got {}",
1029 front.row_count,
1030 skipped
1031 ));
1032 }
1033 continue;
1034 }
1035
1036 if front.row_count == 0 {
1039 continue;
1040 }
1041
1042 let need_read = batch_size - read_records;
1044 let to_read = match front.row_count.checked_sub(need_read) {
1045 Some(remaining) if remaining != 0 => {
1046 selection.push_front(RowSelector::select(remaining));
1049 need_read
1050 }
1051 _ => front.row_count,
1052 };
1053 match self.array_reader.read_records(to_read)? {
1054 0 => break,
1055 rec => read_records += rec,
1056 };
1057 }
1058 }
1059 None => {
1060 self.array_reader.read_records(batch_size)?;
1061 }
1062 };
1063
1064 let array = self.array_reader.consume_batch()?;
1065 let struct_array = array.as_struct_opt().ok_or_else(|| {
1066 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1067 })?;
1068
1069 Ok(if struct_array.len() > 0 {
1070 Some(RecordBatch::from(struct_array))
1071 } else {
1072 None
1073 })
1074 }
1075}
1076
1077impl RecordBatchReader for ParquetRecordBatchReader {
1078 fn schema(&self) -> SchemaRef {
1083 self.schema.clone()
1084 }
1085}
1086
1087impl ParquetRecordBatchReader {
1088 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1092 ParquetRecordBatchReaderBuilder::try_new(reader)?
1093 .with_batch_size(batch_size)
1094 .build()
1095 }
1096
1097 pub fn try_new_with_row_groups(
1102 levels: &FieldLevels,
1103 row_groups: &dyn RowGroups,
1104 batch_size: usize,
1105 selection: Option<RowSelection>,
1106 ) -> Result<Self> {
1107 let metrics = ArrowReaderMetrics::disabled();
1109 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1110 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1111
1112 let read_plan = ReadPlanBuilder::new(batch_size)
1113 .with_selection(selection)
1114 .build();
1115
1116 Ok(Self {
1117 array_reader,
1118 schema: Arc::new(Schema::new(levels.fields.clone())),
1119 read_plan,
1120 })
1121 }
1122
1123 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1127 let schema = match array_reader.get_data_type() {
1128 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
1129 _ => unreachable!("Struct array reader's data type is not struct!"),
1130 };
1131
1132 Self {
1133 array_reader,
1134 schema: Arc::new(schema),
1135 read_plan,
1136 }
1137 }
1138
1139 #[inline(always)]
1140 pub(crate) fn batch_size(&self) -> usize {
1141 self.read_plan.batch_size()
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use std::cmp::min;
1148 use std::collections::{HashMap, VecDeque};
1149 use std::fmt::Formatter;
1150 use std::fs::File;
1151 use std::io::Seek;
1152 use std::path::PathBuf;
1153 use std::sync::Arc;
1154
1155 use arrow_array::builder::*;
1156 use arrow_array::cast::AsArray;
1157 use arrow_array::types::{
1158 Date32Type, Date64Type, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type,
1159 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1160 Time64MicrosecondType,
1161 };
1162 use arrow_array::*;
1163 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1164 use arrow_data::{ArrayData, ArrayDataBuilder};
1165 use arrow_schema::{
1166 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1167 };
1168 use arrow_select::concat::concat_batches;
1169 use bytes::Bytes;
1170 use half::f16;
1171 use num::PrimInt;
1172 use rand::{rng, Rng, RngCore};
1173 use tempfile::tempfile;
1174
1175 use crate::arrow::arrow_reader::{
1176 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1177 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1178 };
1179 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1180 use crate::arrow::{ArrowWriter, ProjectionMask};
1181 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1182 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1183 use crate::data_type::{
1184 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1185 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1186 };
1187 use crate::errors::Result;
1188 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1189 use crate::file::writer::SerializedFileWriter;
1190 use crate::schema::parser::parse_message_type;
1191 use crate::schema::types::{Type, TypePtr};
1192 use crate::util::test_common::rand_gen::RandGen;
1193
1194 #[test]
1195 fn test_arrow_reader_all_columns() {
1196 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1197
1198 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1199 let original_schema = Arc::clone(builder.schema());
1200 let reader = builder.build().unwrap();
1201
1202 assert_eq!(original_schema.fields(), reader.schema().fields());
1204 }
1205
1206 #[test]
1207 fn test_arrow_reader_single_column() {
1208 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1209
1210 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1211 let original_schema = Arc::clone(builder.schema());
1212
1213 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1214 let reader = builder.with_projection(mask).build().unwrap();
1215
1216 assert_eq!(1, reader.schema().fields().len());
1218 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1219 }
1220
1221 #[test]
1222 fn test_arrow_reader_single_column_by_name() {
1223 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1224
1225 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1226 let original_schema = Arc::clone(builder.schema());
1227
1228 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1229 let reader = builder.with_projection(mask).build().unwrap();
1230
1231 assert_eq!(1, reader.schema().fields().len());
1233 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1234 }
1235
1236 #[test]
1237 fn test_null_column_reader_test() {
1238 let mut file = tempfile::tempfile().unwrap();
1239
1240 let schema = "
1241 message message {
1242 OPTIONAL INT32 int32;
1243 }
1244 ";
1245 let schema = Arc::new(parse_message_type(schema).unwrap());
1246
1247 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1248 generate_single_column_file_with_data::<Int32Type>(
1249 &[vec![], vec![]],
1250 Some(&def_levels),
1251 file.try_clone().unwrap(), schema,
1253 Some(Field::new("int32", ArrowDataType::Null, true)),
1254 &Default::default(),
1255 )
1256 .unwrap();
1257
1258 file.rewind().unwrap();
1259
1260 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1261 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1262
1263 assert_eq!(batches.len(), 4);
1264 for batch in &batches[0..3] {
1265 assert_eq!(batch.num_rows(), 2);
1266 assert_eq!(batch.num_columns(), 1);
1267 assert_eq!(batch.column(0).null_count(), 2);
1268 }
1269
1270 assert_eq!(batches[3].num_rows(), 1);
1271 assert_eq!(batches[3].num_columns(), 1);
1272 assert_eq!(batches[3].column(0).null_count(), 1);
1273 }
1274
1275 #[test]
1276 fn test_primitive_single_column_reader_test() {
1277 run_single_column_reader_tests::<BoolType, _, BoolType>(
1278 2,
1279 ConvertedType::NONE,
1280 None,
1281 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1282 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1283 );
1284 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1285 2,
1286 ConvertedType::NONE,
1287 None,
1288 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1289 &[
1290 Encoding::PLAIN,
1291 Encoding::RLE_DICTIONARY,
1292 Encoding::DELTA_BINARY_PACKED,
1293 Encoding::BYTE_STREAM_SPLIT,
1294 ],
1295 );
1296 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1297 2,
1298 ConvertedType::NONE,
1299 None,
1300 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1301 &[
1302 Encoding::PLAIN,
1303 Encoding::RLE_DICTIONARY,
1304 Encoding::DELTA_BINARY_PACKED,
1305 Encoding::BYTE_STREAM_SPLIT,
1306 ],
1307 );
1308 run_single_column_reader_tests::<FloatType, _, FloatType>(
1309 2,
1310 ConvertedType::NONE,
1311 None,
1312 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1313 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1314 );
1315 }
1316
1317 #[test]
1318 fn test_unsigned_primitive_single_column_reader_test() {
1319 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1320 2,
1321 ConvertedType::UINT_32,
1322 Some(ArrowDataType::UInt32),
1323 |vals| {
1324 Arc::new(UInt32Array::from_iter(
1325 vals.iter().map(|x| x.map(|x| x as u32)),
1326 ))
1327 },
1328 &[
1329 Encoding::PLAIN,
1330 Encoding::RLE_DICTIONARY,
1331 Encoding::DELTA_BINARY_PACKED,
1332 ],
1333 );
1334 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1335 2,
1336 ConvertedType::UINT_64,
1337 Some(ArrowDataType::UInt64),
1338 |vals| {
1339 Arc::new(UInt64Array::from_iter(
1340 vals.iter().map(|x| x.map(|x| x as u64)),
1341 ))
1342 },
1343 &[
1344 Encoding::PLAIN,
1345 Encoding::RLE_DICTIONARY,
1346 Encoding::DELTA_BINARY_PACKED,
1347 ],
1348 );
1349 }
1350
1351 #[test]
1352 fn test_unsigned_roundtrip() {
1353 let schema = Arc::new(Schema::new(vec![
1354 Field::new("uint32", ArrowDataType::UInt32, true),
1355 Field::new("uint64", ArrowDataType::UInt64, true),
1356 ]));
1357
1358 let mut buf = Vec::with_capacity(1024);
1359 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1360
1361 let original = RecordBatch::try_new(
1362 schema,
1363 vec![
1364 Arc::new(UInt32Array::from_iter_values([
1365 0,
1366 i32::MAX as u32,
1367 u32::MAX,
1368 ])),
1369 Arc::new(UInt64Array::from_iter_values([
1370 0,
1371 i64::MAX as u64,
1372 u64::MAX,
1373 ])),
1374 ],
1375 )
1376 .unwrap();
1377
1378 writer.write(&original).unwrap();
1379 writer.close().unwrap();
1380
1381 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1382 let ret = reader.next().unwrap().unwrap();
1383 assert_eq!(ret, original);
1384
1385 ret.column(0)
1387 .as_any()
1388 .downcast_ref::<UInt32Array>()
1389 .unwrap();
1390
1391 ret.column(1)
1392 .as_any()
1393 .downcast_ref::<UInt64Array>()
1394 .unwrap();
1395 }
1396
1397 #[test]
1398 fn test_float16_roundtrip() -> Result<()> {
1399 let schema = Arc::new(Schema::new(vec![
1400 Field::new("float16", ArrowDataType::Float16, false),
1401 Field::new("float16-nullable", ArrowDataType::Float16, true),
1402 ]));
1403
1404 let mut buf = Vec::with_capacity(1024);
1405 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1406
1407 let original = RecordBatch::try_new(
1408 schema,
1409 vec![
1410 Arc::new(Float16Array::from_iter_values([
1411 f16::EPSILON,
1412 f16::MIN,
1413 f16::MAX,
1414 f16::NAN,
1415 f16::INFINITY,
1416 f16::NEG_INFINITY,
1417 f16::ONE,
1418 f16::NEG_ONE,
1419 f16::ZERO,
1420 f16::NEG_ZERO,
1421 f16::E,
1422 f16::PI,
1423 f16::FRAC_1_PI,
1424 ])),
1425 Arc::new(Float16Array::from(vec![
1426 None,
1427 None,
1428 None,
1429 Some(f16::NAN),
1430 Some(f16::INFINITY),
1431 Some(f16::NEG_INFINITY),
1432 None,
1433 None,
1434 None,
1435 None,
1436 None,
1437 None,
1438 Some(f16::FRAC_1_PI),
1439 ])),
1440 ],
1441 )?;
1442
1443 writer.write(&original)?;
1444 writer.close()?;
1445
1446 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1447 let ret = reader.next().unwrap()?;
1448 assert_eq!(ret, original);
1449
1450 ret.column(0).as_primitive::<Float16Type>();
1452 ret.column(1).as_primitive::<Float16Type>();
1453
1454 Ok(())
1455 }
1456
1457 #[test]
1458 fn test_time_utc_roundtrip() -> Result<()> {
1459 let schema = Arc::new(Schema::new(vec![
1460 Field::new(
1461 "time_millis",
1462 ArrowDataType::Time32(TimeUnit::Millisecond),
1463 true,
1464 )
1465 .with_metadata(HashMap::from_iter(vec![(
1466 "adjusted_to_utc".to_string(),
1467 "".to_string(),
1468 )])),
1469 Field::new(
1470 "time_micros",
1471 ArrowDataType::Time64(TimeUnit::Microsecond),
1472 true,
1473 )
1474 .with_metadata(HashMap::from_iter(vec![(
1475 "adjusted_to_utc".to_string(),
1476 "".to_string(),
1477 )])),
1478 ]));
1479
1480 let mut buf = Vec::with_capacity(1024);
1481 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1482
1483 let original = RecordBatch::try_new(
1484 schema,
1485 vec![
1486 Arc::new(Time32MillisecondArray::from(vec![
1487 Some(-1),
1488 Some(0),
1489 Some(86_399_000),
1490 Some(86_400_000),
1491 Some(86_401_000),
1492 None,
1493 ])),
1494 Arc::new(Time64MicrosecondArray::from(vec![
1495 Some(-1),
1496 Some(0),
1497 Some(86_399 * 1_000_000),
1498 Some(86_400 * 1_000_000),
1499 Some(86_401 * 1_000_000),
1500 None,
1501 ])),
1502 ],
1503 )?;
1504
1505 writer.write(&original)?;
1506 writer.close()?;
1507
1508 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1509 let ret = reader.next().unwrap()?;
1510 assert_eq!(ret, original);
1511
1512 ret.column(0).as_primitive::<Time32MillisecondType>();
1514 ret.column(1).as_primitive::<Time64MicrosecondType>();
1515
1516 Ok(())
1517 }
1518
1519 #[test]
1520 fn test_date32_roundtrip() -> Result<()> {
1521 use arrow_array::Date32Array;
1522
1523 let schema = Arc::new(Schema::new(vec![Field::new(
1524 "date32",
1525 ArrowDataType::Date32,
1526 false,
1527 )]));
1528
1529 let mut buf = Vec::with_capacity(1024);
1530
1531 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1532
1533 let original = RecordBatch::try_new(
1534 schema,
1535 vec![Arc::new(Date32Array::from(vec![
1536 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1537 ]))],
1538 )?;
1539
1540 writer.write(&original)?;
1541 writer.close()?;
1542
1543 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1544 let ret = reader.next().unwrap()?;
1545 assert_eq!(ret, original);
1546
1547 ret.column(0).as_primitive::<Date32Type>();
1549
1550 Ok(())
1551 }
1552
1553 #[test]
1554 fn test_date64_roundtrip() -> Result<()> {
1555 use arrow_array::Date64Array;
1556
1557 let schema = Arc::new(Schema::new(vec![
1558 Field::new("small-date64", ArrowDataType::Date64, false),
1559 Field::new("big-date64", ArrowDataType::Date64, false),
1560 Field::new("invalid-date64", ArrowDataType::Date64, false),
1561 ]));
1562
1563 let mut default_buf = Vec::with_capacity(1024);
1564 let mut coerce_buf = Vec::with_capacity(1024);
1565
1566 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1567
1568 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1569 let mut coerce_writer =
1570 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1571
1572 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1573
1574 let original = RecordBatch::try_new(
1575 schema,
1576 vec![
1577 Arc::new(Date64Array::from(vec![
1579 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1580 -1_000 * NUM_MILLISECONDS_IN_DAY,
1581 0,
1582 1_000 * NUM_MILLISECONDS_IN_DAY,
1583 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1584 ])),
1585 Arc::new(Date64Array::from(vec![
1587 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1588 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1589 0,
1590 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1591 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1592 ])),
1593 Arc::new(Date64Array::from(vec![
1595 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1596 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1597 1,
1598 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1599 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1600 ])),
1601 ],
1602 )?;
1603
1604 default_writer.write(&original)?;
1605 coerce_writer.write(&original)?;
1606
1607 default_writer.close()?;
1608 coerce_writer.close()?;
1609
1610 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1611 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1612
1613 let default_ret = default_reader.next().unwrap()?;
1614 let coerce_ret = coerce_reader.next().unwrap()?;
1615
1616 assert_eq!(default_ret, original);
1618
1619 assert_eq!(coerce_ret.column(0), original.column(0));
1621 assert_ne!(coerce_ret.column(1), original.column(1));
1622 assert_ne!(coerce_ret.column(2), original.column(2));
1623
1624 default_ret.column(0).as_primitive::<Date64Type>();
1626 coerce_ret.column(0).as_primitive::<Date64Type>();
1627
1628 Ok(())
1629 }
1630 struct RandFixedLenGen {}
1631
1632 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1633 fn gen(len: i32) -> FixedLenByteArray {
1634 let mut v = vec![0u8; len as usize];
1635 rng().fill_bytes(&mut v);
1636 ByteArray::from(v).into()
1637 }
1638 }
1639
1640 #[test]
1641 fn test_fixed_length_binary_column_reader() {
1642 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1643 20,
1644 ConvertedType::NONE,
1645 None,
1646 |vals| {
1647 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1648 for val in vals {
1649 match val {
1650 Some(b) => builder.append_value(b).unwrap(),
1651 None => builder.append_null(),
1652 }
1653 }
1654 Arc::new(builder.finish())
1655 },
1656 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1657 );
1658 }
1659
1660 #[test]
1661 fn test_interval_day_time_column_reader() {
1662 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1663 12,
1664 ConvertedType::INTERVAL,
1665 None,
1666 |vals| {
1667 Arc::new(
1668 vals.iter()
1669 .map(|x| {
1670 x.as_ref().map(|b| IntervalDayTime {
1671 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1672 milliseconds: i32::from_le_bytes(
1673 b.as_ref()[8..12].try_into().unwrap(),
1674 ),
1675 })
1676 })
1677 .collect::<IntervalDayTimeArray>(),
1678 )
1679 },
1680 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1681 );
1682 }
1683
1684 #[test]
1685 fn test_int96_single_column_reader_test() {
1686 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1687
1688 type TypeHintAndConversionFunction =
1689 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1690
1691 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1692 (None, |vals: &[Option<Int96>]| {
1694 Arc::new(TimestampNanosecondArray::from_iter(
1695 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1696 )) as ArrayRef
1697 }),
1698 (
1700 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1701 |vals: &[Option<Int96>]| {
1702 Arc::new(TimestampSecondArray::from_iter(
1703 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1704 )) as ArrayRef
1705 },
1706 ),
1707 (
1708 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1709 |vals: &[Option<Int96>]| {
1710 Arc::new(TimestampMillisecondArray::from_iter(
1711 vals.iter().map(|x| x.map(|x| x.to_millis())),
1712 )) as ArrayRef
1713 },
1714 ),
1715 (
1716 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1717 |vals: &[Option<Int96>]| {
1718 Arc::new(TimestampMicrosecondArray::from_iter(
1719 vals.iter().map(|x| x.map(|x| x.to_micros())),
1720 )) as ArrayRef
1721 },
1722 ),
1723 (
1724 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1725 |vals: &[Option<Int96>]| {
1726 Arc::new(TimestampNanosecondArray::from_iter(
1727 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1728 )) as ArrayRef
1729 },
1730 ),
1731 (
1733 Some(ArrowDataType::Timestamp(
1734 TimeUnit::Second,
1735 Some(Arc::from("-05:00")),
1736 )),
1737 |vals: &[Option<Int96>]| {
1738 Arc::new(
1739 TimestampSecondArray::from_iter(
1740 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1741 )
1742 .with_timezone("-05:00"),
1743 ) as ArrayRef
1744 },
1745 ),
1746 ];
1747
1748 resolutions.iter().for_each(|(arrow_type, converter)| {
1749 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1750 2,
1751 ConvertedType::NONE,
1752 arrow_type.clone(),
1753 converter,
1754 encodings,
1755 );
1756 })
1757 }
1758
1759 #[test]
1760 fn test_int96_from_spark_file_with_provided_schema() {
1761 use arrow_schema::DataType::Timestamp;
1765 let test_data = arrow::util::test_util::parquet_test_data();
1766 let path = format!("{test_data}/int96_from_spark.parquet");
1767 let file = File::open(path).unwrap();
1768
1769 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1770 "a",
1771 Timestamp(TimeUnit::Microsecond, None),
1772 true,
1773 )]));
1774 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1775
1776 let mut record_reader =
1777 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1778 .unwrap()
1779 .build()
1780 .unwrap();
1781
1782 let batch = record_reader.next().unwrap().unwrap();
1783 assert_eq!(batch.num_columns(), 1);
1784 let column = batch.column(0);
1785 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1786
1787 let expected = Arc::new(Int64Array::from(vec![
1788 Some(1704141296123456),
1789 Some(1704070800000000),
1790 Some(253402225200000000),
1791 Some(1735599600000000),
1792 None,
1793 Some(9089380393200000000),
1794 ]));
1795
1796 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1801 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1802
1803 assert_eq!(casted_timestamps.len(), expected.len());
1804
1805 casted_timestamps
1806 .iter()
1807 .zip(expected.iter())
1808 .for_each(|(lhs, rhs)| {
1809 assert_eq!(lhs, rhs);
1810 });
1811 }
1812
1813 #[test]
1814 fn test_int96_from_spark_file_without_provided_schema() {
1815 use arrow_schema::DataType::Timestamp;
1819 let test_data = arrow::util::test_util::parquet_test_data();
1820 let path = format!("{test_data}/int96_from_spark.parquet");
1821 let file = File::open(path).unwrap();
1822
1823 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1824 .unwrap()
1825 .build()
1826 .unwrap();
1827
1828 let batch = record_reader.next().unwrap().unwrap();
1829 assert_eq!(batch.num_columns(), 1);
1830 let column = batch.column(0);
1831 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1832
1833 let expected = Arc::new(Int64Array::from(vec![
1834 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1839 Some(-4864435138808946688), ]));
1841
1842 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1847 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1848
1849 assert_eq!(casted_timestamps.len(), expected.len());
1850
1851 casted_timestamps
1852 .iter()
1853 .zip(expected.iter())
1854 .for_each(|(lhs, rhs)| {
1855 assert_eq!(lhs, rhs);
1856 });
1857 }
1858
1859 struct RandUtf8Gen {}
1860
1861 impl RandGen<ByteArrayType> for RandUtf8Gen {
1862 fn gen(len: i32) -> ByteArray {
1863 Int32Type::gen(len).to_string().as_str().into()
1864 }
1865 }
1866
1867 #[test]
1868 fn test_utf8_single_column_reader_test() {
1869 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1870 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1871 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1872 })))
1873 }
1874
1875 let encodings = &[
1876 Encoding::PLAIN,
1877 Encoding::RLE_DICTIONARY,
1878 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1879 Encoding::DELTA_BYTE_ARRAY,
1880 ];
1881
1882 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1883 2,
1884 ConvertedType::NONE,
1885 None,
1886 |vals| {
1887 Arc::new(BinaryArray::from_iter(
1888 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1889 ))
1890 },
1891 encodings,
1892 );
1893
1894 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1895 2,
1896 ConvertedType::UTF8,
1897 None,
1898 string_converter::<i32>,
1899 encodings,
1900 );
1901
1902 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1903 2,
1904 ConvertedType::UTF8,
1905 Some(ArrowDataType::Utf8),
1906 string_converter::<i32>,
1907 encodings,
1908 );
1909
1910 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1911 2,
1912 ConvertedType::UTF8,
1913 Some(ArrowDataType::LargeUtf8),
1914 string_converter::<i64>,
1915 encodings,
1916 );
1917
1918 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1919 for key in &small_key_types {
1920 for encoding in encodings {
1921 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1922 opts.encoding = *encoding;
1923
1924 let data_type =
1925 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1926
1927 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1929 opts,
1930 2,
1931 ConvertedType::UTF8,
1932 Some(data_type.clone()),
1933 move |vals| {
1934 let vals = string_converter::<i32>(vals);
1935 arrow::compute::cast(&vals, &data_type).unwrap()
1936 },
1937 );
1938 }
1939 }
1940
1941 let key_types = [
1942 ArrowDataType::Int16,
1943 ArrowDataType::UInt16,
1944 ArrowDataType::Int32,
1945 ArrowDataType::UInt32,
1946 ArrowDataType::Int64,
1947 ArrowDataType::UInt64,
1948 ];
1949
1950 for key in &key_types {
1951 let data_type =
1952 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1953
1954 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1955 2,
1956 ConvertedType::UTF8,
1957 Some(data_type.clone()),
1958 move |vals| {
1959 let vals = string_converter::<i32>(vals);
1960 arrow::compute::cast(&vals, &data_type).unwrap()
1961 },
1962 encodings,
1963 );
1964
1965 }
1982 }
1983
1984 #[test]
1985 fn test_decimal_nullable_struct() {
1986 let decimals = Decimal256Array::from_iter_values(
1987 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1988 );
1989
1990 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1991 "decimals",
1992 decimals.data_type().clone(),
1993 false,
1994 )])))
1995 .len(8)
1996 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1997 .child_data(vec![decimals.into_data()])
1998 .build()
1999 .unwrap();
2000
2001 let written =
2002 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2003 .unwrap();
2004
2005 let mut buffer = Vec::with_capacity(1024);
2006 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2007 writer.write(&written).unwrap();
2008 writer.close().unwrap();
2009
2010 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2011 .unwrap()
2012 .collect::<Result<Vec<_>, _>>()
2013 .unwrap();
2014
2015 assert_eq!(&written.slice(0, 3), &read[0]);
2016 assert_eq!(&written.slice(3, 3), &read[1]);
2017 assert_eq!(&written.slice(6, 2), &read[2]);
2018 }
2019
2020 #[test]
2021 fn test_int32_nullable_struct() {
2022 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2023 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2024 "int32",
2025 int32.data_type().clone(),
2026 false,
2027 )])))
2028 .len(8)
2029 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2030 .child_data(vec![int32.into_data()])
2031 .build()
2032 .unwrap();
2033
2034 let written =
2035 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2036 .unwrap();
2037
2038 let mut buffer = Vec::with_capacity(1024);
2039 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2040 writer.write(&written).unwrap();
2041 writer.close().unwrap();
2042
2043 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2044 .unwrap()
2045 .collect::<Result<Vec<_>, _>>()
2046 .unwrap();
2047
2048 assert_eq!(&written.slice(0, 3), &read[0]);
2049 assert_eq!(&written.slice(3, 3), &read[1]);
2050 assert_eq!(&written.slice(6, 2), &read[2]);
2051 }
2052
2053 #[test]
2054 fn test_decimal_list() {
2055 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2056
2057 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2059 decimals.data_type().clone(),
2060 false,
2061 ))))
2062 .len(7)
2063 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2064 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2065 .child_data(vec![decimals.into_data()])
2066 .build()
2067 .unwrap();
2068
2069 let written =
2070 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2071 .unwrap();
2072
2073 let mut buffer = Vec::with_capacity(1024);
2074 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2075 writer.write(&written).unwrap();
2076 writer.close().unwrap();
2077
2078 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2079 .unwrap()
2080 .collect::<Result<Vec<_>, _>>()
2081 .unwrap();
2082
2083 assert_eq!(&written.slice(0, 3), &read[0]);
2084 assert_eq!(&written.slice(3, 3), &read[1]);
2085 assert_eq!(&written.slice(6, 1), &read[2]);
2086 }
2087
2088 #[test]
2089 fn test_read_decimal_file() {
2090 use arrow_array::Decimal128Array;
2091 let testdata = arrow::util::test_util::parquet_test_data();
2092 let file_variants = vec![
2093 ("byte_array", 4),
2094 ("fixed_length", 25),
2095 ("int32", 4),
2096 ("int64", 10),
2097 ];
2098 for (prefix, target_precision) in file_variants {
2099 let path = format!("{testdata}/{prefix}_decimal.parquet");
2100 let file = File::open(path).unwrap();
2101 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2102
2103 let batch = record_reader.next().unwrap().unwrap();
2104 assert_eq!(batch.num_rows(), 24);
2105 let col = batch
2106 .column(0)
2107 .as_any()
2108 .downcast_ref::<Decimal128Array>()
2109 .unwrap();
2110
2111 let expected = 1..25;
2112
2113 assert_eq!(col.precision(), target_precision);
2114 assert_eq!(col.scale(), 2);
2115
2116 for (i, v) in expected.enumerate() {
2117 assert_eq!(col.value(i), v * 100_i128);
2118 }
2119 }
2120 }
2121
2122 #[test]
2123 fn test_read_float16_nonzeros_file() {
2124 use arrow_array::Float16Array;
2125 let testdata = arrow::util::test_util::parquet_test_data();
2126 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2128 let file = File::open(path).unwrap();
2129 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2130
2131 let batch = record_reader.next().unwrap().unwrap();
2132 assert_eq!(batch.num_rows(), 8);
2133 let col = batch
2134 .column(0)
2135 .as_any()
2136 .downcast_ref::<Float16Array>()
2137 .unwrap();
2138
2139 let f16_two = f16::ONE + f16::ONE;
2140
2141 assert_eq!(col.null_count(), 1);
2142 assert!(col.is_null(0));
2143 assert_eq!(col.value(1), f16::ONE);
2144 assert_eq!(col.value(2), -f16_two);
2145 assert!(col.value(3).is_nan());
2146 assert_eq!(col.value(4), f16::ZERO);
2147 assert!(col.value(4).is_sign_positive());
2148 assert_eq!(col.value(5), f16::NEG_ONE);
2149 assert_eq!(col.value(6), f16::NEG_ZERO);
2150 assert!(col.value(6).is_sign_negative());
2151 assert_eq!(col.value(7), f16_two);
2152 }
2153
2154 #[test]
2155 fn test_read_float16_zeros_file() {
2156 use arrow_array::Float16Array;
2157 let testdata = arrow::util::test_util::parquet_test_data();
2158 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2160 let file = File::open(path).unwrap();
2161 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2162
2163 let batch = record_reader.next().unwrap().unwrap();
2164 assert_eq!(batch.num_rows(), 3);
2165 let col = batch
2166 .column(0)
2167 .as_any()
2168 .downcast_ref::<Float16Array>()
2169 .unwrap();
2170
2171 assert_eq!(col.null_count(), 1);
2172 assert!(col.is_null(0));
2173 assert_eq!(col.value(1), f16::ZERO);
2174 assert!(col.value(1).is_sign_positive());
2175 assert!(col.value(2).is_nan());
2176 }
2177
2178 #[test]
2179 fn test_read_float32_float64_byte_stream_split() {
2180 let path = format!(
2181 "{}/byte_stream_split.zstd.parquet",
2182 arrow::util::test_util::parquet_test_data(),
2183 );
2184 let file = File::open(path).unwrap();
2185 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2186
2187 let mut row_count = 0;
2188 for batch in record_reader {
2189 let batch = batch.unwrap();
2190 row_count += batch.num_rows();
2191 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2192 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2193
2194 for &x in f32_col.values() {
2196 assert!(x > -10.0);
2197 assert!(x < 10.0);
2198 }
2199 for &x in f64_col.values() {
2200 assert!(x > -10.0);
2201 assert!(x < 10.0);
2202 }
2203 }
2204 assert_eq!(row_count, 300);
2205 }
2206
2207 #[test]
2208 fn test_read_extended_byte_stream_split() {
2209 let path = format!(
2210 "{}/byte_stream_split_extended.gzip.parquet",
2211 arrow::util::test_util::parquet_test_data(),
2212 );
2213 let file = File::open(path).unwrap();
2214 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2215
2216 let mut row_count = 0;
2217 for batch in record_reader {
2218 let batch = batch.unwrap();
2219 row_count += batch.num_rows();
2220
2221 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2223 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2224 assert_eq!(f16_col.len(), f16_bss.len());
2225 f16_col
2226 .iter()
2227 .zip(f16_bss.iter())
2228 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2229
2230 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2232 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2233 assert_eq!(f32_col.len(), f32_bss.len());
2234 f32_col
2235 .iter()
2236 .zip(f32_bss.iter())
2237 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2238
2239 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2241 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2242 assert_eq!(f64_col.len(), f64_bss.len());
2243 f64_col
2244 .iter()
2245 .zip(f64_bss.iter())
2246 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2247
2248 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2250 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2251 assert_eq!(i32_col.len(), i32_bss.len());
2252 i32_col
2253 .iter()
2254 .zip(i32_bss.iter())
2255 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2256
2257 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2259 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2260 assert_eq!(i64_col.len(), i64_bss.len());
2261 i64_col
2262 .iter()
2263 .zip(i64_bss.iter())
2264 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2265
2266 let flba_col = batch.column(10).as_fixed_size_binary();
2268 let flba_bss = batch.column(11).as_fixed_size_binary();
2269 assert_eq!(flba_col.len(), flba_bss.len());
2270 flba_col
2271 .iter()
2272 .zip(flba_bss.iter())
2273 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2274
2275 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2277 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2278 assert_eq!(dec_col.len(), dec_bss.len());
2279 dec_col
2280 .iter()
2281 .zip(dec_bss.iter())
2282 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2283 }
2284 assert_eq!(row_count, 200);
2285 }
2286
2287 #[test]
2288 fn test_read_incorrect_map_schema_file() {
2289 let testdata = arrow::util::test_util::parquet_test_data();
2290 let path = format!("{testdata}/incorrect_map_schema.parquet");
2292 let file = File::open(path).unwrap();
2293 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2294
2295 let batch = record_reader.next().unwrap().unwrap();
2296 assert_eq!(batch.num_rows(), 1);
2297
2298 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2299 "my_map",
2300 ArrowDataType::Map(
2301 Arc::new(Field::new(
2302 "key_value",
2303 ArrowDataType::Struct(Fields::from(vec![
2304 Field::new("key", ArrowDataType::Utf8, false),
2305 Field::new("value", ArrowDataType::Utf8, true),
2306 ])),
2307 false,
2308 )),
2309 false,
2310 ),
2311 true,
2312 )]));
2313 assert_eq!(batch.schema().as_ref(), &expected_schema);
2314
2315 assert_eq!(batch.num_rows(), 1);
2316 assert_eq!(batch.column(0).null_count(), 0);
2317 assert_eq!(
2318 batch.column(0).as_map().keys().as_ref(),
2319 &StringArray::from(vec!["parent", "name"])
2320 );
2321 assert_eq!(
2322 batch.column(0).as_map().values().as_ref(),
2323 &StringArray::from(vec!["another", "report"])
2324 );
2325 }
2326
2327 #[test]
2328 fn test_read_dict_fixed_size_binary() {
2329 let schema = Arc::new(Schema::new(vec![Field::new(
2330 "a",
2331 ArrowDataType::Dictionary(
2332 Box::new(ArrowDataType::UInt8),
2333 Box::new(ArrowDataType::FixedSizeBinary(8)),
2334 ),
2335 true,
2336 )]));
2337 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2338 let values = FixedSizeBinaryArray::try_from_iter(
2339 vec![
2340 (0u8..8u8).collect::<Vec<u8>>(),
2341 (24u8..32u8).collect::<Vec<u8>>(),
2342 ]
2343 .into_iter(),
2344 )
2345 .unwrap();
2346 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2347 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2348
2349 let mut buffer = Vec::with_capacity(1024);
2350 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2351 writer.write(&batch).unwrap();
2352 writer.close().unwrap();
2353 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2354 .unwrap()
2355 .collect::<Result<Vec<_>, _>>()
2356 .unwrap();
2357
2358 assert_eq!(read.len(), 1);
2359 assert_eq!(&batch, &read[0])
2360 }
2361
2362 #[derive(Clone)]
2364 struct TestOptions {
2365 num_row_groups: usize,
2368 num_rows: usize,
2370 record_batch_size: usize,
2372 null_percent: Option<usize>,
2374 write_batch_size: usize,
2379 max_data_page_size: usize,
2381 max_dict_page_size: usize,
2383 writer_version: WriterVersion,
2385 enabled_statistics: EnabledStatistics,
2387 encoding: Encoding,
2389 row_selections: Option<(RowSelection, usize)>,
2391 row_filter: Option<Vec<bool>>,
2393 limit: Option<usize>,
2395 offset: Option<usize>,
2397 }
2398
2399 impl std::fmt::Debug for TestOptions {
2401 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2402 f.debug_struct("TestOptions")
2403 .field("num_row_groups", &self.num_row_groups)
2404 .field("num_rows", &self.num_rows)
2405 .field("record_batch_size", &self.record_batch_size)
2406 .field("null_percent", &self.null_percent)
2407 .field("write_batch_size", &self.write_batch_size)
2408 .field("max_data_page_size", &self.max_data_page_size)
2409 .field("max_dict_page_size", &self.max_dict_page_size)
2410 .field("writer_version", &self.writer_version)
2411 .field("enabled_statistics", &self.enabled_statistics)
2412 .field("encoding", &self.encoding)
2413 .field("row_selections", &self.row_selections.is_some())
2414 .field("row_filter", &self.row_filter.is_some())
2415 .field("limit", &self.limit)
2416 .field("offset", &self.offset)
2417 .finish()
2418 }
2419 }
2420
2421 impl Default for TestOptions {
2422 fn default() -> Self {
2423 Self {
2424 num_row_groups: 2,
2425 num_rows: 100,
2426 record_batch_size: 15,
2427 null_percent: None,
2428 write_batch_size: 64,
2429 max_data_page_size: 1024 * 1024,
2430 max_dict_page_size: 1024 * 1024,
2431 writer_version: WriterVersion::PARQUET_1_0,
2432 enabled_statistics: EnabledStatistics::Page,
2433 encoding: Encoding::PLAIN,
2434 row_selections: None,
2435 row_filter: None,
2436 limit: None,
2437 offset: None,
2438 }
2439 }
2440 }
2441
2442 impl TestOptions {
2443 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2444 Self {
2445 num_row_groups,
2446 num_rows,
2447 record_batch_size,
2448 ..Default::default()
2449 }
2450 }
2451
2452 fn with_null_percent(self, null_percent: usize) -> Self {
2453 Self {
2454 null_percent: Some(null_percent),
2455 ..self
2456 }
2457 }
2458
2459 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2460 Self {
2461 max_data_page_size,
2462 ..self
2463 }
2464 }
2465
2466 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2467 Self {
2468 max_dict_page_size,
2469 ..self
2470 }
2471 }
2472
2473 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2474 Self {
2475 enabled_statistics,
2476 ..self
2477 }
2478 }
2479
2480 fn with_row_selections(self) -> Self {
2481 assert!(self.row_filter.is_none(), "Must set row selection first");
2482
2483 let mut rng = rng();
2484 let step = rng.random_range(self.record_batch_size..self.num_rows);
2485 let row_selections = create_test_selection(
2486 step,
2487 self.num_row_groups * self.num_rows,
2488 rng.random::<bool>(),
2489 );
2490 Self {
2491 row_selections: Some(row_selections),
2492 ..self
2493 }
2494 }
2495
2496 fn with_row_filter(self) -> Self {
2497 let row_count = match &self.row_selections {
2498 Some((_, count)) => *count,
2499 None => self.num_row_groups * self.num_rows,
2500 };
2501
2502 let mut rng = rng();
2503 Self {
2504 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2505 ..self
2506 }
2507 }
2508
2509 fn with_limit(self, limit: usize) -> Self {
2510 Self {
2511 limit: Some(limit),
2512 ..self
2513 }
2514 }
2515
2516 fn with_offset(self, offset: usize) -> Self {
2517 Self {
2518 offset: Some(offset),
2519 ..self
2520 }
2521 }
2522
2523 fn writer_props(&self) -> WriterProperties {
2524 let builder = WriterProperties::builder()
2525 .set_data_page_size_limit(self.max_data_page_size)
2526 .set_write_batch_size(self.write_batch_size)
2527 .set_writer_version(self.writer_version)
2528 .set_statistics_enabled(self.enabled_statistics);
2529
2530 let builder = match self.encoding {
2531 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2532 .set_dictionary_enabled(true)
2533 .set_dictionary_page_size_limit(self.max_dict_page_size),
2534 _ => builder
2535 .set_dictionary_enabled(false)
2536 .set_encoding(self.encoding),
2537 };
2538
2539 builder.build()
2540 }
2541 }
2542
2543 fn run_single_column_reader_tests<T, F, G>(
2550 rand_max: i32,
2551 converted_type: ConvertedType,
2552 arrow_type: Option<ArrowDataType>,
2553 converter: F,
2554 encodings: &[Encoding],
2555 ) where
2556 T: DataType,
2557 G: RandGen<T>,
2558 F: Fn(&[Option<T::T>]) -> ArrayRef,
2559 {
2560 let all_options = vec![
2561 TestOptions::new(2, 100, 15),
2564 TestOptions::new(3, 25, 5),
2569 TestOptions::new(4, 100, 25),
2573 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2575 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2577 TestOptions::new(2, 256, 127).with_null_percent(0),
2579 TestOptions::new(2, 256, 93).with_null_percent(25),
2581 TestOptions::new(4, 100, 25).with_limit(0),
2583 TestOptions::new(4, 100, 25).with_limit(50),
2585 TestOptions::new(4, 100, 25).with_limit(10),
2587 TestOptions::new(4, 100, 25).with_limit(101),
2589 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2591 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2593 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2595 TestOptions::new(2, 256, 91)
2597 .with_null_percent(25)
2598 .with_enabled_statistics(EnabledStatistics::Chunk),
2599 TestOptions::new(2, 256, 91)
2601 .with_null_percent(25)
2602 .with_enabled_statistics(EnabledStatistics::None),
2603 TestOptions::new(2, 128, 91)
2605 .with_null_percent(100)
2606 .with_enabled_statistics(EnabledStatistics::None),
2607 TestOptions::new(2, 100, 15).with_row_selections(),
2612 TestOptions::new(3, 25, 5).with_row_selections(),
2617 TestOptions::new(4, 100, 25).with_row_selections(),
2621 TestOptions::new(3, 256, 73)
2623 .with_max_data_page_size(128)
2624 .with_row_selections(),
2625 TestOptions::new(3, 256, 57)
2627 .with_max_dict_page_size(128)
2628 .with_row_selections(),
2629 TestOptions::new(2, 256, 127)
2631 .with_null_percent(0)
2632 .with_row_selections(),
2633 TestOptions::new(2, 256, 93)
2635 .with_null_percent(25)
2636 .with_row_selections(),
2637 TestOptions::new(2, 256, 93)
2639 .with_null_percent(25)
2640 .with_row_selections()
2641 .with_limit(10),
2642 TestOptions::new(2, 256, 93)
2644 .with_null_percent(25)
2645 .with_row_selections()
2646 .with_offset(20)
2647 .with_limit(10),
2648 TestOptions::new(4, 100, 25).with_row_filter(),
2652 TestOptions::new(4, 100, 25)
2654 .with_row_selections()
2655 .with_row_filter(),
2656 TestOptions::new(2, 256, 93)
2658 .with_null_percent(25)
2659 .with_max_data_page_size(10)
2660 .with_row_filter(),
2661 TestOptions::new(2, 256, 93)
2663 .with_null_percent(25)
2664 .with_max_data_page_size(10)
2665 .with_row_selections()
2666 .with_row_filter(),
2667 TestOptions::new(2, 256, 93)
2669 .with_enabled_statistics(EnabledStatistics::None)
2670 .with_max_data_page_size(10)
2671 .with_row_selections(),
2672 ];
2673
2674 all_options.into_iter().for_each(|opts| {
2675 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2676 for encoding in encodings {
2677 let opts = TestOptions {
2678 writer_version,
2679 encoding: *encoding,
2680 ..opts.clone()
2681 };
2682
2683 single_column_reader_test::<T, _, G>(
2684 opts,
2685 rand_max,
2686 converted_type,
2687 arrow_type.clone(),
2688 &converter,
2689 )
2690 }
2691 }
2692 });
2693 }
2694
2695 fn single_column_reader_test<T, F, G>(
2699 opts: TestOptions,
2700 rand_max: i32,
2701 converted_type: ConvertedType,
2702 arrow_type: Option<ArrowDataType>,
2703 converter: F,
2704 ) where
2705 T: DataType,
2706 G: RandGen<T>,
2707 F: Fn(&[Option<T::T>]) -> ArrayRef,
2708 {
2709 println!(
2711 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2712 T::get_physical_type(), converted_type, arrow_type, opts
2713 );
2714
2715 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2717 Some(null_percent) => {
2718 let mut rng = rng();
2719
2720 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2721 .map(|_| {
2722 std::iter::from_fn(|| {
2723 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2724 })
2725 .take(opts.num_rows)
2726 .collect()
2727 })
2728 .collect();
2729 (Repetition::OPTIONAL, Some(def_levels))
2730 }
2731 None => (Repetition::REQUIRED, None),
2732 };
2733
2734 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2736 .map(|idx| {
2737 let null_count = match def_levels.as_ref() {
2738 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2739 None => 0,
2740 };
2741 G::gen_vec(rand_max, opts.num_rows - null_count)
2742 })
2743 .collect();
2744
2745 let len = match T::get_physical_type() {
2746 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2747 crate::basic::Type::INT96 => 12,
2748 _ => -1,
2749 };
2750
2751 let fields = vec![Arc::new(
2752 Type::primitive_type_builder("leaf", T::get_physical_type())
2753 .with_repetition(repetition)
2754 .with_converted_type(converted_type)
2755 .with_length(len)
2756 .build()
2757 .unwrap(),
2758 )];
2759
2760 let schema = Arc::new(
2761 Type::group_type_builder("test_schema")
2762 .with_fields(fields)
2763 .build()
2764 .unwrap(),
2765 );
2766
2767 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2768
2769 let mut file = tempfile::tempfile().unwrap();
2770
2771 generate_single_column_file_with_data::<T>(
2772 &values,
2773 def_levels.as_ref(),
2774 file.try_clone().unwrap(), schema,
2776 arrow_field,
2777 &opts,
2778 )
2779 .unwrap();
2780
2781 file.rewind().unwrap();
2782
2783 let options = ArrowReaderOptions::new()
2784 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2785
2786 let mut builder =
2787 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2788
2789 let expected_data = match opts.row_selections {
2790 Some((selections, row_count)) => {
2791 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2792
2793 let mut skip_data: Vec<Option<T::T>> = vec![];
2794 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2795 for select in dequeue {
2796 if select.skip {
2797 without_skip_data.drain(0..select.row_count);
2798 } else {
2799 skip_data.extend(without_skip_data.drain(0..select.row_count));
2800 }
2801 }
2802 builder = builder.with_row_selection(selections);
2803
2804 assert_eq!(skip_data.len(), row_count);
2805 skip_data
2806 }
2807 None => {
2808 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2810 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2811 expected_data
2812 }
2813 };
2814
2815 let mut expected_data = match opts.row_filter {
2816 Some(filter) => {
2817 let expected_data = expected_data
2818 .into_iter()
2819 .zip(filter.iter())
2820 .filter_map(|(d, f)| f.then(|| d))
2821 .collect();
2822
2823 let mut filter_offset = 0;
2824 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2825 ProjectionMask::all(),
2826 move |b| {
2827 let array = BooleanArray::from_iter(
2828 filter
2829 .iter()
2830 .skip(filter_offset)
2831 .take(b.num_rows())
2832 .map(|x| Some(*x)),
2833 );
2834 filter_offset += b.num_rows();
2835 Ok(array)
2836 },
2837 ))]);
2838
2839 builder = builder.with_row_filter(filter);
2840 expected_data
2841 }
2842 None => expected_data,
2843 };
2844
2845 if let Some(offset) = opts.offset {
2846 builder = builder.with_offset(offset);
2847 expected_data = expected_data.into_iter().skip(offset).collect();
2848 }
2849
2850 if let Some(limit) = opts.limit {
2851 builder = builder.with_limit(limit);
2852 expected_data = expected_data.into_iter().take(limit).collect();
2853 }
2854
2855 let mut record_reader = builder
2856 .with_batch_size(opts.record_batch_size)
2857 .build()
2858 .unwrap();
2859
2860 let mut total_read = 0;
2861 loop {
2862 let maybe_batch = record_reader.next();
2863 if total_read < expected_data.len() {
2864 let end = min(total_read + opts.record_batch_size, expected_data.len());
2865 let batch = maybe_batch.unwrap().unwrap();
2866 assert_eq!(end - total_read, batch.num_rows());
2867
2868 let a = converter(&expected_data[total_read..end]);
2869 let b = Arc::clone(batch.column(0));
2870
2871 assert_eq!(a.data_type(), b.data_type());
2872 assert_eq!(a.to_data(), b.to_data());
2873 assert_eq!(
2874 a.as_any().type_id(),
2875 b.as_any().type_id(),
2876 "incorrect type ids"
2877 );
2878
2879 total_read = end;
2880 } else {
2881 assert!(maybe_batch.is_none());
2882 break;
2883 }
2884 }
2885 }
2886
2887 fn gen_expected_data<T: DataType>(
2888 def_levels: Option<&Vec<Vec<i16>>>,
2889 values: &[Vec<T::T>],
2890 ) -> Vec<Option<T::T>> {
2891 let data: Vec<Option<T::T>> = match def_levels {
2892 Some(levels) => {
2893 let mut values_iter = values.iter().flatten();
2894 levels
2895 .iter()
2896 .flatten()
2897 .map(|d| match d {
2898 1 => Some(values_iter.next().cloned().unwrap()),
2899 0 => None,
2900 _ => unreachable!(),
2901 })
2902 .collect()
2903 }
2904 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2905 };
2906 data
2907 }
2908
2909 fn generate_single_column_file_with_data<T: DataType>(
2910 values: &[Vec<T::T>],
2911 def_levels: Option<&Vec<Vec<i16>>>,
2912 file: File,
2913 schema: TypePtr,
2914 field: Option<Field>,
2915 opts: &TestOptions,
2916 ) -> Result<crate::format::FileMetaData> {
2917 let mut writer_props = opts.writer_props();
2918 if let Some(field) = field {
2919 let arrow_schema = Schema::new(vec![field]);
2920 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2921 }
2922
2923 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2924
2925 for (idx, v) in values.iter().enumerate() {
2926 let def_levels = def_levels.map(|d| d[idx].as_slice());
2927 let mut row_group_writer = writer.next_row_group()?;
2928 {
2929 let mut column_writer = row_group_writer
2930 .next_column()?
2931 .expect("Column writer is none!");
2932
2933 column_writer
2934 .typed::<T>()
2935 .write_batch(v, def_levels, None)?;
2936
2937 column_writer.close()?;
2938 }
2939 row_group_writer.close()?;
2940 }
2941
2942 writer.close()
2943 }
2944
2945 fn get_test_file(file_name: &str) -> File {
2946 let mut path = PathBuf::new();
2947 path.push(arrow::util::test_util::arrow_test_data());
2948 path.push(file_name);
2949
2950 File::open(path.as_path()).expect("File not found!")
2951 }
2952
2953 #[test]
2954 fn test_read_structs() {
2955 let testdata = arrow::util::test_util::parquet_test_data();
2959 let path = format!("{testdata}/nested_structs.rust.parquet");
2960 let file = File::open(&path).unwrap();
2961 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2962
2963 for batch in record_batch_reader {
2964 batch.unwrap();
2965 }
2966
2967 let file = File::open(&path).unwrap();
2968 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2969
2970 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2971 let projected_reader = builder
2972 .with_projection(mask)
2973 .with_batch_size(60)
2974 .build()
2975 .unwrap();
2976
2977 let expected_schema = Schema::new(vec![
2978 Field::new(
2979 "roll_num",
2980 ArrowDataType::Struct(Fields::from(vec![Field::new(
2981 "count",
2982 ArrowDataType::UInt64,
2983 false,
2984 )])),
2985 false,
2986 ),
2987 Field::new(
2988 "PC_CUR",
2989 ArrowDataType::Struct(Fields::from(vec![
2990 Field::new("mean", ArrowDataType::Int64, false),
2991 Field::new("sum", ArrowDataType::Int64, false),
2992 ])),
2993 false,
2994 ),
2995 ]);
2996
2997 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2999
3000 for batch in projected_reader {
3001 let batch = batch.unwrap();
3002 assert_eq!(batch.schema().as_ref(), &expected_schema);
3003 }
3004 }
3005
3006 #[test]
3007 fn test_read_structs_by_name() {
3009 let testdata = arrow::util::test_util::parquet_test_data();
3010 let path = format!("{testdata}/nested_structs.rust.parquet");
3011 let file = File::open(&path).unwrap();
3012 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3013
3014 for batch in record_batch_reader {
3015 batch.unwrap();
3016 }
3017
3018 let file = File::open(&path).unwrap();
3019 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3020
3021 let mask = ProjectionMask::columns(
3022 builder.parquet_schema(),
3023 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3024 );
3025 let projected_reader = builder
3026 .with_projection(mask)
3027 .with_batch_size(60)
3028 .build()
3029 .unwrap();
3030
3031 let expected_schema = Schema::new(vec![
3032 Field::new(
3033 "roll_num",
3034 ArrowDataType::Struct(Fields::from(vec![Field::new(
3035 "count",
3036 ArrowDataType::UInt64,
3037 false,
3038 )])),
3039 false,
3040 ),
3041 Field::new(
3042 "PC_CUR",
3043 ArrowDataType::Struct(Fields::from(vec![
3044 Field::new("mean", ArrowDataType::Int64, false),
3045 Field::new("sum", ArrowDataType::Int64, false),
3046 ])),
3047 false,
3048 ),
3049 ]);
3050
3051 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3052
3053 for batch in projected_reader {
3054 let batch = batch.unwrap();
3055 assert_eq!(batch.schema().as_ref(), &expected_schema);
3056 }
3057 }
3058
3059 #[test]
3060 fn test_read_maps() {
3061 let testdata = arrow::util::test_util::parquet_test_data();
3062 let path = format!("{testdata}/nested_maps.snappy.parquet");
3063 let file = File::open(path).unwrap();
3064 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3065
3066 for batch in record_batch_reader {
3067 batch.unwrap();
3068 }
3069 }
3070
3071 #[test]
3072 fn test_nested_nullability() {
3073 let message_type = "message nested {
3074 OPTIONAL Group group {
3075 REQUIRED INT32 leaf;
3076 }
3077 }";
3078
3079 let file = tempfile::tempfile().unwrap();
3080 let schema = Arc::new(parse_message_type(message_type).unwrap());
3081
3082 {
3083 let mut writer =
3085 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3086 .unwrap();
3087
3088 {
3089 let mut row_group_writer = writer.next_row_group().unwrap();
3090 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3091
3092 column_writer
3093 .typed::<Int32Type>()
3094 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3095 .unwrap();
3096
3097 column_writer.close().unwrap();
3098 row_group_writer.close().unwrap();
3099 }
3100
3101 writer.close().unwrap();
3102 }
3103
3104 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3105 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3106
3107 let reader = builder.with_projection(mask).build().unwrap();
3108
3109 let expected_schema = Schema::new(Fields::from(vec![Field::new(
3110 "group",
3111 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3112 true,
3113 )]));
3114
3115 let batch = reader.into_iter().next().unwrap().unwrap();
3116 assert_eq!(batch.schema().as_ref(), &expected_schema);
3117 assert_eq!(batch.num_rows(), 4);
3118 assert_eq!(batch.column(0).null_count(), 2);
3119 }
3120
3121 #[test]
3122 fn test_invalid_utf8() {
3123 let data = vec![
3125 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3126 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3127 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3128 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3129 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3130 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3131 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3132 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3133 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3134 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3135 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3136 82, 49,
3137 ];
3138
3139 let file = Bytes::from(data);
3140 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3141
3142 let error = record_batch_reader.next().unwrap().unwrap_err();
3143
3144 assert!(
3145 error.to_string().contains("invalid utf-8 sequence"),
3146 "{}",
3147 error
3148 );
3149 }
3150
3151 #[test]
3152 fn test_invalid_utf8_string_array() {
3153 test_invalid_utf8_string_array_inner::<i32>();
3154 }
3155
3156 #[test]
3157 fn test_invalid_utf8_large_string_array() {
3158 test_invalid_utf8_string_array_inner::<i64>();
3159 }
3160
3161 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3162 let cases = [
3163 invalid_utf8_first_char::<O>(),
3164 invalid_utf8_first_char_long_strings::<O>(),
3165 invalid_utf8_later_char::<O>(),
3166 invalid_utf8_later_char_long_strings::<O>(),
3167 invalid_utf8_later_char_really_long_strings::<O>(),
3168 invalid_utf8_later_char_really_long_strings2::<O>(),
3169 ];
3170 for array in &cases {
3171 for encoding in STRING_ENCODINGS {
3172 let array = unsafe {
3175 GenericStringArray::<O>::new_unchecked(
3176 array.offsets().clone(),
3177 array.values().clone(),
3178 array.nulls().cloned(),
3179 )
3180 };
3181 let data_type = array.data_type().clone();
3182 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3183 let err = read_from_parquet(data).unwrap_err();
3184 let expected_err =
3185 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3186 assert!(
3187 err.to_string().contains(expected_err),
3188 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3189 );
3190 }
3191 }
3192 }
3193
3194 #[test]
3195 fn test_invalid_utf8_string_view_array() {
3196 let cases = [
3197 invalid_utf8_first_char::<i32>(),
3198 invalid_utf8_first_char_long_strings::<i32>(),
3199 invalid_utf8_later_char::<i32>(),
3200 invalid_utf8_later_char_long_strings::<i32>(),
3201 invalid_utf8_later_char_really_long_strings::<i32>(),
3202 invalid_utf8_later_char_really_long_strings2::<i32>(),
3203 ];
3204
3205 for encoding in STRING_ENCODINGS {
3206 for array in &cases {
3207 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3208 let array = array.as_binary_view();
3209
3210 let array = unsafe {
3213 StringViewArray::new_unchecked(
3214 array.views().clone(),
3215 array.data_buffers().to_vec(),
3216 array.nulls().cloned(),
3217 )
3218 };
3219
3220 let data_type = array.data_type().clone();
3221 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3222 let err = read_from_parquet(data).unwrap_err();
3223 let expected_err =
3224 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3225 assert!(
3226 err.to_string().contains(expected_err),
3227 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3228 );
3229 }
3230 }
3231 }
3232
3233 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3235 None,
3236 Some(Encoding::PLAIN),
3237 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3238 Some(Encoding::DELTA_BYTE_ARRAY),
3239 ];
3240
3241 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3244
3245 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3248
3249 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3251 let valid: &[u8] = b" ";
3252 let invalid = INVALID_UTF8_FIRST_CHAR;
3253 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3254 }
3255
3256 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3260 let valid: &[u8] = b" ";
3261 let mut invalid = vec![];
3262 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3263 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3264 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3265 }
3266
3267 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3270 let valid: &[u8] = b" ";
3271 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3272 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3273 }
3274
3275 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3279 let valid: &[u8] = b" ";
3280 let mut invalid = vec![];
3281 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3282 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3283 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3284 }
3285
3286 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3290 let valid: &[u8] = b" ";
3291 let mut invalid = vec![];
3292 for _ in 0..10 {
3293 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3295 }
3296 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3297 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3298 }
3299
3300 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3303 let valid: &[u8] = b" ";
3304 let mut valid_long = vec![];
3305 for _ in 0..10 {
3306 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3308 }
3309 let invalid = INVALID_UTF8_LATER_CHAR;
3310 GenericBinaryArray::<O>::from_iter(vec![
3311 None,
3312 Some(valid),
3313 Some(invalid),
3314 None,
3315 Some(&valid_long),
3316 Some(valid),
3317 ])
3318 }
3319
3320 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3325 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3326 let mut data = vec![];
3327 let schema = batch.schema();
3328 let props = encoding.map(|encoding| {
3329 WriterProperties::builder()
3330 .set_dictionary_enabled(false)
3332 .set_encoding(encoding)
3333 .build()
3334 });
3335
3336 {
3337 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3338 writer.write(&batch).unwrap();
3339 writer.flush().unwrap();
3340 writer.close().unwrap();
3341 };
3342 data
3343 }
3344
3345 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3347 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3348 .unwrap()
3349 .build()
3350 .unwrap();
3351
3352 reader.collect()
3353 }
3354
3355 #[test]
3356 fn test_dictionary_preservation() {
3357 let fields = vec![Arc::new(
3358 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3359 .with_repetition(Repetition::OPTIONAL)
3360 .with_converted_type(ConvertedType::UTF8)
3361 .build()
3362 .unwrap(),
3363 )];
3364
3365 let schema = Arc::new(
3366 Type::group_type_builder("test_schema")
3367 .with_fields(fields)
3368 .build()
3369 .unwrap(),
3370 );
3371
3372 let dict_type = ArrowDataType::Dictionary(
3373 Box::new(ArrowDataType::Int32),
3374 Box::new(ArrowDataType::Utf8),
3375 );
3376
3377 let arrow_field = Field::new("leaf", dict_type, true);
3378
3379 let mut file = tempfile::tempfile().unwrap();
3380
3381 let values = vec![
3382 vec![
3383 ByteArray::from("hello"),
3384 ByteArray::from("a"),
3385 ByteArray::from("b"),
3386 ByteArray::from("d"),
3387 ],
3388 vec![
3389 ByteArray::from("c"),
3390 ByteArray::from("a"),
3391 ByteArray::from("b"),
3392 ],
3393 ];
3394
3395 let def_levels = vec![
3396 vec![1, 0, 0, 1, 0, 0, 1, 1],
3397 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3398 ];
3399
3400 let opts = TestOptions {
3401 encoding: Encoding::RLE_DICTIONARY,
3402 ..Default::default()
3403 };
3404
3405 generate_single_column_file_with_data::<ByteArrayType>(
3406 &values,
3407 Some(&def_levels),
3408 file.try_clone().unwrap(), schema,
3410 Some(arrow_field),
3411 &opts,
3412 )
3413 .unwrap();
3414
3415 file.rewind().unwrap();
3416
3417 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3418
3419 let batches = record_reader
3420 .collect::<Result<Vec<RecordBatch>, _>>()
3421 .unwrap();
3422
3423 assert_eq!(batches.len(), 6);
3424 assert!(batches.iter().all(|x| x.num_columns() == 1));
3425
3426 let row_counts = batches
3427 .iter()
3428 .map(|x| (x.num_rows(), x.column(0).null_count()))
3429 .collect::<Vec<_>>();
3430
3431 assert_eq!(
3432 row_counts,
3433 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3434 );
3435
3436 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3437
3438 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3440 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3442 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3443 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3445 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3446 }
3447
3448 #[test]
3449 fn test_read_null_list() {
3450 let testdata = arrow::util::test_util::parquet_test_data();
3451 let path = format!("{testdata}/null_list.parquet");
3452 let file = File::open(path).unwrap();
3453 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3454
3455 let batch = record_batch_reader.next().unwrap().unwrap();
3456 assert_eq!(batch.num_rows(), 1);
3457 assert_eq!(batch.num_columns(), 1);
3458 assert_eq!(batch.column(0).len(), 1);
3459
3460 let list = batch
3461 .column(0)
3462 .as_any()
3463 .downcast_ref::<ListArray>()
3464 .unwrap();
3465 assert_eq!(list.len(), 1);
3466 assert!(list.is_valid(0));
3467
3468 let val = list.value(0);
3469 assert_eq!(val.len(), 0);
3470 }
3471
3472 #[test]
3473 fn test_null_schema_inference() {
3474 let testdata = arrow::util::test_util::parquet_test_data();
3475 let path = format!("{testdata}/null_list.parquet");
3476 let file = File::open(path).unwrap();
3477
3478 let arrow_field = Field::new(
3479 "emptylist",
3480 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3481 true,
3482 );
3483
3484 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3485 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3486 let schema = builder.schema();
3487 assert_eq!(schema.fields().len(), 1);
3488 assert_eq!(schema.field(0), &arrow_field);
3489 }
3490
3491 #[test]
3492 fn test_skip_metadata() {
3493 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3494 let field = Field::new("col", col.data_type().clone(), true);
3495
3496 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3497
3498 let metadata = [("key".to_string(), "value".to_string())]
3499 .into_iter()
3500 .collect();
3501
3502 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3503
3504 assert_ne!(schema_with_metadata, schema_without_metadata);
3505
3506 let batch =
3507 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3508
3509 let file = |version: WriterVersion| {
3510 let props = WriterProperties::builder()
3511 .set_writer_version(version)
3512 .build();
3513
3514 let file = tempfile().unwrap();
3515 let mut writer =
3516 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3517 .unwrap();
3518 writer.write(&batch).unwrap();
3519 writer.close().unwrap();
3520 file
3521 };
3522
3523 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3524
3525 let v1_reader = file(WriterVersion::PARQUET_1_0);
3526 let v2_reader = file(WriterVersion::PARQUET_2_0);
3527
3528 let arrow_reader =
3529 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3530 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3531
3532 let reader =
3533 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3534 .unwrap()
3535 .build()
3536 .unwrap();
3537 assert_eq!(reader.schema(), schema_without_metadata);
3538
3539 let arrow_reader =
3540 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3541 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3542
3543 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3544 .unwrap()
3545 .build()
3546 .unwrap();
3547 assert_eq!(reader.schema(), schema_without_metadata);
3548 }
3549
3550 fn write_parquet_from_iter<I, F>(value: I) -> File
3551 where
3552 I: IntoIterator<Item = (F, ArrayRef)>,
3553 F: AsRef<str>,
3554 {
3555 let batch = RecordBatch::try_from_iter(value).unwrap();
3556 let file = tempfile().unwrap();
3557 let mut writer =
3558 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3559 writer.write(&batch).unwrap();
3560 writer.close().unwrap();
3561 file
3562 }
3563
3564 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3565 where
3566 I: IntoIterator<Item = (F, ArrayRef)>,
3567 F: AsRef<str>,
3568 {
3569 let file = write_parquet_from_iter(value);
3570 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3571 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3572 file.try_clone().unwrap(),
3573 options_with_schema,
3574 );
3575 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3576 }
3577
3578 #[test]
3579 fn test_schema_too_few_columns() {
3580 run_schema_test_with_error(
3581 vec![
3582 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3583 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3584 ],
3585 Arc::new(Schema::new(vec![Field::new(
3586 "int64",
3587 ArrowDataType::Int64,
3588 false,
3589 )])),
3590 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3591 );
3592 }
3593
3594 #[test]
3595 fn test_schema_too_many_columns() {
3596 run_schema_test_with_error(
3597 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3598 Arc::new(Schema::new(vec![
3599 Field::new("int64", ArrowDataType::Int64, false),
3600 Field::new("int32", ArrowDataType::Int32, false),
3601 ])),
3602 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3603 );
3604 }
3605
3606 #[test]
3607 fn test_schema_mismatched_column_names() {
3608 run_schema_test_with_error(
3609 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3610 Arc::new(Schema::new(vec![Field::new(
3611 "other",
3612 ArrowDataType::Int64,
3613 false,
3614 )])),
3615 "Arrow: incompatible arrow schema, expected field named int64 got other",
3616 );
3617 }
3618
3619 #[test]
3620 fn test_schema_incompatible_columns() {
3621 run_schema_test_with_error(
3622 vec![
3623 (
3624 "col1_invalid",
3625 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3626 ),
3627 (
3628 "col2_valid",
3629 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3630 ),
3631 (
3632 "col3_invalid",
3633 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3634 ),
3635 ],
3636 Arc::new(Schema::new(vec![
3637 Field::new("col1_invalid", ArrowDataType::Int32, false),
3638 Field::new("col2_valid", ArrowDataType::Int32, false),
3639 Field::new("col3_invalid", ArrowDataType::Int32, false),
3640 ])),
3641 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3642 );
3643 }
3644
3645 #[test]
3646 fn test_one_incompatible_nested_column() {
3647 let nested_fields = Fields::from(vec![
3648 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3649 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3650 ]);
3651 let nested = StructArray::try_new(
3652 nested_fields,
3653 vec![
3654 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3655 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3656 ],
3657 None,
3658 )
3659 .expect("struct array");
3660 let supplied_nested_fields = Fields::from(vec![
3661 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3662 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3663 ]);
3664 run_schema_test_with_error(
3665 vec![
3666 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3667 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3668 ("nested", Arc::new(nested) as ArrayRef),
3669 ],
3670 Arc::new(Schema::new(vec![
3671 Field::new("col1", ArrowDataType::Int64, false),
3672 Field::new("col2", ArrowDataType::Int32, false),
3673 Field::new(
3674 "nested",
3675 ArrowDataType::Struct(supplied_nested_fields),
3676 false,
3677 ),
3678 ])),
3679 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3680 requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3681 but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
3682 );
3683 }
3684
3685 fn utf8_parquet() -> Bytes {
3687 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3688 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3689 let props = None;
3690 let mut parquet_data = vec![];
3692 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3693 writer.write(&batch).unwrap();
3694 writer.close().unwrap();
3695 Bytes::from(parquet_data)
3696 }
3697
3698 #[test]
3699 fn test_schema_error_bad_types() {
3700 let parquet_data = utf8_parquet();
3702
3703 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3705 "column1",
3706 arrow::datatypes::DataType::Int32,
3707 false,
3708 )]));
3709
3710 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3712 let err =
3713 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3714 .unwrap_err();
3715 assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3716 }
3717
3718 #[test]
3719 fn test_schema_error_bad_nullability() {
3720 let parquet_data = utf8_parquet();
3722
3723 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3725 "column1",
3726 arrow::datatypes::DataType::Utf8,
3727 true,
3728 )]));
3729
3730 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3732 let err =
3733 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3734 .unwrap_err();
3735 assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3736 }
3737
3738 #[test]
3739 fn test_read_binary_as_utf8() {
3740 let file = write_parquet_from_iter(vec![
3741 (
3742 "binary_to_utf8",
3743 Arc::new(BinaryArray::from(vec![
3744 b"one".as_ref(),
3745 b"two".as_ref(),
3746 b"three".as_ref(),
3747 ])) as ArrayRef,
3748 ),
3749 (
3750 "large_binary_to_large_utf8",
3751 Arc::new(LargeBinaryArray::from(vec![
3752 b"one".as_ref(),
3753 b"two".as_ref(),
3754 b"three".as_ref(),
3755 ])) as ArrayRef,
3756 ),
3757 (
3758 "binary_view_to_utf8_view",
3759 Arc::new(BinaryViewArray::from(vec![
3760 b"one".as_ref(),
3761 b"two".as_ref(),
3762 b"three".as_ref(),
3763 ])) as ArrayRef,
3764 ),
3765 ]);
3766 let supplied_fields = Fields::from(vec![
3767 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3768 Field::new(
3769 "large_binary_to_large_utf8",
3770 ArrowDataType::LargeUtf8,
3771 false,
3772 ),
3773 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3774 ]);
3775
3776 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3777 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3778 file.try_clone().unwrap(),
3779 options,
3780 )
3781 .expect("reader builder with schema")
3782 .build()
3783 .expect("reader with schema");
3784
3785 let batch = arrow_reader.next().unwrap().unwrap();
3786 assert_eq!(batch.num_columns(), 3);
3787 assert_eq!(batch.num_rows(), 3);
3788 assert_eq!(
3789 batch
3790 .column(0)
3791 .as_string::<i32>()
3792 .iter()
3793 .collect::<Vec<_>>(),
3794 vec![Some("one"), Some("two"), Some("three")]
3795 );
3796
3797 assert_eq!(
3798 batch
3799 .column(1)
3800 .as_string::<i64>()
3801 .iter()
3802 .collect::<Vec<_>>(),
3803 vec![Some("one"), Some("two"), Some("three")]
3804 );
3805
3806 assert_eq!(
3807 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3808 vec![Some("one"), Some("two"), Some("three")]
3809 );
3810 }
3811
3812 #[test]
3813 #[should_panic(expected = "Invalid UTF8 sequence at")]
3814 fn test_read_non_utf8_binary_as_utf8() {
3815 let file = write_parquet_from_iter(vec![(
3816 "non_utf8_binary",
3817 Arc::new(BinaryArray::from(vec![
3818 b"\xDE\x00\xFF".as_ref(),
3819 b"\xDE\x01\xAA".as_ref(),
3820 b"\xDE\x02\xFF".as_ref(),
3821 ])) as ArrayRef,
3822 )]);
3823 let supplied_fields = Fields::from(vec![Field::new(
3824 "non_utf8_binary",
3825 ArrowDataType::Utf8,
3826 false,
3827 )]);
3828
3829 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3830 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3831 file.try_clone().unwrap(),
3832 options,
3833 )
3834 .expect("reader builder with schema")
3835 .build()
3836 .expect("reader with schema");
3837 arrow_reader.next().unwrap().unwrap_err();
3838 }
3839
3840 #[test]
3841 fn test_with_schema() {
3842 let nested_fields = Fields::from(vec![
3843 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3844 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3845 ]);
3846
3847 let nested_arrays: Vec<ArrayRef> = vec![
3848 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3849 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3850 ];
3851
3852 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3853
3854 let file = write_parquet_from_iter(vec![
3855 (
3856 "int32_to_ts_second",
3857 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3858 ),
3859 (
3860 "date32_to_date64",
3861 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3862 ),
3863 ("nested", Arc::new(nested) as ArrayRef),
3864 ]);
3865
3866 let supplied_nested_fields = Fields::from(vec![
3867 Field::new(
3868 "utf8_to_dict",
3869 ArrowDataType::Dictionary(
3870 Box::new(ArrowDataType::Int32),
3871 Box::new(ArrowDataType::Utf8),
3872 ),
3873 false,
3874 ),
3875 Field::new(
3876 "int64_to_ts_nano",
3877 ArrowDataType::Timestamp(
3878 arrow::datatypes::TimeUnit::Nanosecond,
3879 Some("+10:00".into()),
3880 ),
3881 false,
3882 ),
3883 ]);
3884
3885 let supplied_schema = Arc::new(Schema::new(vec![
3886 Field::new(
3887 "int32_to_ts_second",
3888 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3889 false,
3890 ),
3891 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3892 Field::new(
3893 "nested",
3894 ArrowDataType::Struct(supplied_nested_fields),
3895 false,
3896 ),
3897 ]));
3898
3899 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3900 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3901 file.try_clone().unwrap(),
3902 options,
3903 )
3904 .expect("reader builder with schema")
3905 .build()
3906 .expect("reader with schema");
3907
3908 assert_eq!(arrow_reader.schema(), supplied_schema);
3909 let batch = arrow_reader.next().unwrap().unwrap();
3910 assert_eq!(batch.num_columns(), 3);
3911 assert_eq!(batch.num_rows(), 4);
3912 assert_eq!(
3913 batch
3914 .column(0)
3915 .as_any()
3916 .downcast_ref::<TimestampSecondArray>()
3917 .expect("downcast to timestamp second")
3918 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3919 .map(|v| v.to_string())
3920 .expect("value as datetime"),
3921 "1970-01-01 01:00:00 +01:00"
3922 );
3923 assert_eq!(
3924 batch
3925 .column(1)
3926 .as_any()
3927 .downcast_ref::<Date64Array>()
3928 .expect("downcast to date64")
3929 .value_as_date(0)
3930 .map(|v| v.to_string())
3931 .expect("value as date"),
3932 "1970-01-01"
3933 );
3934
3935 let nested = batch
3936 .column(2)
3937 .as_any()
3938 .downcast_ref::<StructArray>()
3939 .expect("downcast to struct");
3940
3941 let nested_dict = nested
3942 .column(0)
3943 .as_any()
3944 .downcast_ref::<Int32DictionaryArray>()
3945 .expect("downcast to dictionary");
3946
3947 assert_eq!(
3948 nested_dict
3949 .values()
3950 .as_any()
3951 .downcast_ref::<StringArray>()
3952 .expect("downcast to string")
3953 .iter()
3954 .collect::<Vec<_>>(),
3955 vec![Some("a"), Some("b")]
3956 );
3957
3958 assert_eq!(
3959 nested_dict.keys().iter().collect::<Vec<_>>(),
3960 vec![Some(0), Some(0), Some(0), Some(1)]
3961 );
3962
3963 assert_eq!(
3964 nested
3965 .column(1)
3966 .as_any()
3967 .downcast_ref::<TimestampNanosecondArray>()
3968 .expect("downcast to timestamp nanosecond")
3969 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3970 .map(|v| v.to_string())
3971 .expect("value as datetime"),
3972 "1970-01-01 10:00:00.000000001 +10:00"
3973 );
3974 }
3975
3976 #[test]
3977 fn test_empty_projection() {
3978 let testdata = arrow::util::test_util::parquet_test_data();
3979 let path = format!("{testdata}/alltypes_plain.parquet");
3980 let file = File::open(path).unwrap();
3981
3982 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3983 let file_metadata = builder.metadata().file_metadata();
3984 let expected_rows = file_metadata.num_rows() as usize;
3985
3986 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3987 let batch_reader = builder
3988 .with_projection(mask)
3989 .with_batch_size(2)
3990 .build()
3991 .unwrap();
3992
3993 let mut total_rows = 0;
3994 for maybe_batch in batch_reader {
3995 let batch = maybe_batch.unwrap();
3996 total_rows += batch.num_rows();
3997 assert_eq!(batch.num_columns(), 0);
3998 assert!(batch.num_rows() <= 2);
3999 }
4000
4001 assert_eq!(total_rows, expected_rows);
4002 }
4003
4004 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4005 let schema = Arc::new(Schema::new(vec![Field::new(
4006 "list",
4007 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4008 true,
4009 )]));
4010
4011 let mut buf = Vec::with_capacity(1024);
4012
4013 let mut writer = ArrowWriter::try_new(
4014 &mut buf,
4015 schema.clone(),
4016 Some(
4017 WriterProperties::builder()
4018 .set_max_row_group_size(row_group_size)
4019 .build(),
4020 ),
4021 )
4022 .unwrap();
4023 for _ in 0..2 {
4024 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4025 for _ in 0..(batch_size) {
4026 list_builder.append(true);
4027 }
4028 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4029 .unwrap();
4030 writer.write(&batch).unwrap();
4031 }
4032 writer.close().unwrap();
4033
4034 let mut record_reader =
4035 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4036 assert_eq!(
4037 batch_size,
4038 record_reader.next().unwrap().unwrap().num_rows()
4039 );
4040 assert_eq!(
4041 batch_size,
4042 record_reader.next().unwrap().unwrap().num_rows()
4043 );
4044 }
4045
4046 #[test]
4047 fn test_row_group_exact_multiple() {
4048 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4049 test_row_group_batch(8, 8);
4050 test_row_group_batch(10, 8);
4051 test_row_group_batch(8, 10);
4052 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4053 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4054 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4055 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4056 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4057 }
4058
4059 fn get_expected_batches(
4062 column: &RecordBatch,
4063 selection: &RowSelection,
4064 batch_size: usize,
4065 ) -> Vec<RecordBatch> {
4066 let mut expected_batches = vec![];
4067
4068 let mut selection: VecDeque<_> = selection.clone().into();
4069 let mut row_offset = 0;
4070 let mut last_start = None;
4071 while row_offset < column.num_rows() && !selection.is_empty() {
4072 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4073 while batch_remaining > 0 && !selection.is_empty() {
4074 let (to_read, skip) = match selection.front_mut() {
4075 Some(selection) if selection.row_count > batch_remaining => {
4076 selection.row_count -= batch_remaining;
4077 (batch_remaining, selection.skip)
4078 }
4079 Some(_) => {
4080 let select = selection.pop_front().unwrap();
4081 (select.row_count, select.skip)
4082 }
4083 None => break,
4084 };
4085
4086 batch_remaining -= to_read;
4087
4088 match skip {
4089 true => {
4090 if let Some(last_start) = last_start.take() {
4091 expected_batches.push(column.slice(last_start, row_offset - last_start))
4092 }
4093 row_offset += to_read
4094 }
4095 false => {
4096 last_start.get_or_insert(row_offset);
4097 row_offset += to_read
4098 }
4099 }
4100 }
4101 }
4102
4103 if let Some(last_start) = last_start.take() {
4104 expected_batches.push(column.slice(last_start, row_offset - last_start))
4105 }
4106
4107 for batch in &expected_batches[..expected_batches.len() - 1] {
4109 assert_eq!(batch.num_rows(), batch_size);
4110 }
4111
4112 expected_batches
4113 }
4114
4115 fn create_test_selection(
4116 step_len: usize,
4117 total_len: usize,
4118 skip_first: bool,
4119 ) -> (RowSelection, usize) {
4120 let mut remaining = total_len;
4121 let mut skip = skip_first;
4122 let mut vec = vec![];
4123 let mut selected_count = 0;
4124 while remaining != 0 {
4125 let step = if remaining > step_len {
4126 step_len
4127 } else {
4128 remaining
4129 };
4130 vec.push(RowSelector {
4131 row_count: step,
4132 skip,
4133 });
4134 remaining -= step;
4135 if !skip {
4136 selected_count += step;
4137 }
4138 skip = !skip;
4139 }
4140 (vec.into(), selected_count)
4141 }
4142
4143 #[test]
4144 fn test_scan_row_with_selection() {
4145 let testdata = arrow::util::test_util::parquet_test_data();
4146 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4147 let test_file = File::open(&path).unwrap();
4148
4149 let mut serial_reader =
4150 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4151 let data = serial_reader.next().unwrap().unwrap();
4152
4153 let do_test = |batch_size: usize, selection_len: usize| {
4154 for skip_first in [false, true] {
4155 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4156
4157 let expected = get_expected_batches(&data, &selections, batch_size);
4158 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4159 assert_eq!(
4160 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4161 expected,
4162 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4163 );
4164 }
4165 };
4166
4167 do_test(1000, 1000);
4170
4171 do_test(20, 20);
4173
4174 do_test(20, 5);
4176
4177 do_test(20, 5);
4180
4181 fn create_skip_reader(
4182 test_file: &File,
4183 batch_size: usize,
4184 selections: RowSelection,
4185 ) -> ParquetRecordBatchReader {
4186 let options = ArrowReaderOptions::new().with_page_index(true);
4187 let file = test_file.try_clone().unwrap();
4188 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4189 .unwrap()
4190 .with_batch_size(batch_size)
4191 .with_row_selection(selections)
4192 .build()
4193 .unwrap()
4194 }
4195 }
4196
4197 #[test]
4198 fn test_batch_size_overallocate() {
4199 let testdata = arrow::util::test_util::parquet_test_data();
4200 let path = format!("{testdata}/alltypes_plain.parquet");
4202 let test_file = File::open(path).unwrap();
4203
4204 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4205 let num_rows = builder.metadata.file_metadata().num_rows();
4206 let reader = builder
4207 .with_batch_size(1024)
4208 .with_projection(ProjectionMask::all())
4209 .build()
4210 .unwrap();
4211 assert_ne!(1024, num_rows);
4212 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4213 }
4214
4215 #[test]
4216 fn test_read_with_page_index_enabled() {
4217 let testdata = arrow::util::test_util::parquet_test_data();
4218
4219 {
4220 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4222 let test_file = File::open(path).unwrap();
4223 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4224 test_file,
4225 ArrowReaderOptions::new().with_page_index(true),
4226 )
4227 .unwrap();
4228 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4229 let reader = builder.build().unwrap();
4230 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4231 assert_eq!(batches.len(), 8);
4232 }
4233
4234 {
4235 let path = format!("{testdata}/alltypes_plain.parquet");
4237 let test_file = File::open(path).unwrap();
4238 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4239 test_file,
4240 ArrowReaderOptions::new().with_page_index(true),
4241 )
4242 .unwrap();
4243 assert!(builder.metadata().offset_index().is_none());
4246 let reader = builder.build().unwrap();
4247 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4248 assert_eq!(batches.len(), 1);
4249 }
4250 }
4251
4252 #[test]
4253 fn test_raw_repetition() {
4254 const MESSAGE_TYPE: &str = "
4255 message Log {
4256 OPTIONAL INT32 eventType;
4257 REPEATED INT32 category;
4258 REPEATED group filter {
4259 OPTIONAL INT32 error;
4260 }
4261 }
4262 ";
4263 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4264 let props = Default::default();
4265
4266 let mut buf = Vec::with_capacity(1024);
4267 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4268 let mut row_group_writer = writer.next_row_group().unwrap();
4269
4270 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4272 col_writer
4273 .typed::<Int32Type>()
4274 .write_batch(&[1], Some(&[1]), None)
4275 .unwrap();
4276 col_writer.close().unwrap();
4277 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4279 col_writer
4280 .typed::<Int32Type>()
4281 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4282 .unwrap();
4283 col_writer.close().unwrap();
4284 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4286 col_writer
4287 .typed::<Int32Type>()
4288 .write_batch(&[1], Some(&[1]), Some(&[0]))
4289 .unwrap();
4290 col_writer.close().unwrap();
4291
4292 let rg_md = row_group_writer.close().unwrap();
4293 assert_eq!(rg_md.num_rows(), 1);
4294 writer.close().unwrap();
4295
4296 let bytes = Bytes::from(buf);
4297
4298 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4299 let full = no_mask.next().unwrap().unwrap();
4300
4301 assert_eq!(full.num_columns(), 3);
4302
4303 for idx in 0..3 {
4304 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4305 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4306 let mut reader = b.with_projection(mask).build().unwrap();
4307 let projected = reader.next().unwrap().unwrap();
4308
4309 assert_eq!(projected.num_columns(), 1);
4310 assert_eq!(full.column(idx), projected.column(0));
4311 }
4312 }
4313
4314 #[test]
4315 fn test_read_lz4_raw() {
4316 let testdata = arrow::util::test_util::parquet_test_data();
4317 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4318 let file = File::open(path).unwrap();
4319
4320 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4321 .unwrap()
4322 .collect::<Result<Vec<_>, _>>()
4323 .unwrap();
4324 assert_eq!(batches.len(), 1);
4325 let batch = &batches[0];
4326
4327 assert_eq!(batch.num_columns(), 3);
4328 assert_eq!(batch.num_rows(), 4);
4329
4330 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4332 assert_eq!(
4333 a.values(),
4334 &[1593604800, 1593604800, 1593604801, 1593604801]
4335 );
4336
4337 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4338 let a: Vec<_> = a.iter().flatten().collect();
4339 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4340
4341 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4342 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4343 }
4344
4345 #[test]
4355 fn test_read_lz4_hadoop_fallback() {
4356 for file in [
4357 "hadoop_lz4_compressed.parquet",
4358 "non_hadoop_lz4_compressed.parquet",
4359 ] {
4360 let testdata = arrow::util::test_util::parquet_test_data();
4361 let path = format!("{testdata}/{file}");
4362 let file = File::open(path).unwrap();
4363 let expected_rows = 4;
4364
4365 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4366 .unwrap()
4367 .collect::<Result<Vec<_>, _>>()
4368 .unwrap();
4369 assert_eq!(batches.len(), 1);
4370 let batch = &batches[0];
4371
4372 assert_eq!(batch.num_columns(), 3);
4373 assert_eq!(batch.num_rows(), expected_rows);
4374
4375 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4376 assert_eq!(
4377 a.values(),
4378 &[1593604800, 1593604800, 1593604801, 1593604801]
4379 );
4380
4381 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4382 let b: Vec<_> = b.iter().flatten().collect();
4383 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4384
4385 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4386 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4387 }
4388 }
4389
4390 #[test]
4391 fn test_read_lz4_hadoop_large() {
4392 let testdata = arrow::util::test_util::parquet_test_data();
4393 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4394 let file = File::open(path).unwrap();
4395 let expected_rows = 10000;
4396
4397 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4398 .unwrap()
4399 .collect::<Result<Vec<_>, _>>()
4400 .unwrap();
4401 assert_eq!(batches.len(), 1);
4402 let batch = &batches[0];
4403
4404 assert_eq!(batch.num_columns(), 1);
4405 assert_eq!(batch.num_rows(), expected_rows);
4406
4407 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4408 let a: Vec<_> = a.iter().flatten().collect();
4409 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4410 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4411 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4412 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4413 }
4414
4415 #[test]
4416 #[cfg(feature = "snap")]
4417 fn test_read_nested_lists() {
4418 let testdata = arrow::util::test_util::parquet_test_data();
4419 let path = format!("{testdata}/nested_lists.snappy.parquet");
4420 let file = File::open(path).unwrap();
4421
4422 let f = file.try_clone().unwrap();
4423 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4424 let expected = reader.next().unwrap().unwrap();
4425 assert_eq!(expected.num_rows(), 3);
4426
4427 let selection = RowSelection::from(vec![
4428 RowSelector::skip(1),
4429 RowSelector::select(1),
4430 RowSelector::skip(1),
4431 ]);
4432 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4433 .unwrap()
4434 .with_row_selection(selection)
4435 .build()
4436 .unwrap();
4437
4438 let actual = reader.next().unwrap().unwrap();
4439 assert_eq!(actual.num_rows(), 1);
4440 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4441 }
4442
4443 #[test]
4444 fn test_arbitrary_decimal() {
4445 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4446 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4447 .with_precision_and_scale(19, 0)
4448 .unwrap();
4449 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4450 .with_precision_and_scale(12, 0)
4451 .unwrap();
4452 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4453 .with_precision_and_scale(17, 10)
4454 .unwrap();
4455
4456 let written = RecordBatch::try_from_iter([
4457 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4458 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4459 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4460 ])
4461 .unwrap();
4462
4463 let mut buffer = Vec::with_capacity(1024);
4464 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4465 writer.write(&written).unwrap();
4466 writer.close().unwrap();
4467
4468 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4469 .unwrap()
4470 .collect::<Result<Vec<_>, _>>()
4471 .unwrap();
4472
4473 assert_eq!(&written.slice(0, 8), &read[0]);
4474 }
4475
4476 #[test]
4477 fn test_list_skip() {
4478 let mut list = ListBuilder::new(Int32Builder::new());
4479 list.append_value([Some(1), Some(2)]);
4480 list.append_value([Some(3)]);
4481 list.append_value([Some(4)]);
4482 let list = list.finish();
4483 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4484
4485 let props = WriterProperties::builder()
4487 .set_data_page_row_count_limit(1)
4488 .set_write_batch_size(2)
4489 .build();
4490
4491 let mut buffer = Vec::with_capacity(1024);
4492 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4493 writer.write(&batch).unwrap();
4494 writer.close().unwrap();
4495
4496 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4497 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4498 .unwrap()
4499 .with_row_selection(selection.into())
4500 .build()
4501 .unwrap();
4502 let out = reader.next().unwrap().unwrap();
4503 assert_eq!(out.num_rows(), 1);
4504 assert_eq!(out, batch.slice(2, 1));
4505 }
4506
4507 fn test_decimal32_roundtrip() {
4508 let d = |values: Vec<i32>, p: u8| {
4509 let iter = values.into_iter();
4510 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4511 .with_precision_and_scale(p, 2)
4512 .unwrap()
4513 };
4514
4515 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4516 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4517
4518 let mut buffer = Vec::with_capacity(1024);
4519 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4520 writer.write(&batch).unwrap();
4521 writer.close().unwrap();
4522
4523 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4524 let t1 = builder.parquet_schema().columns()[0].physical_type();
4525 assert_eq!(t1, PhysicalType::INT32);
4526
4527 let mut reader = builder.build().unwrap();
4528 assert_eq!(batch.schema(), reader.schema());
4529
4530 let out = reader.next().unwrap().unwrap();
4531 assert_eq!(batch, out);
4532 }
4533
4534 fn test_decimal64_roundtrip() {
4535 let d = |values: Vec<i64>, p: u8| {
4539 let iter = values.into_iter();
4540 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4541 .with_precision_and_scale(p, 2)
4542 .unwrap()
4543 };
4544
4545 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4546 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4547 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4548
4549 let batch = RecordBatch::try_from_iter([
4550 ("d1", Arc::new(d1) as ArrayRef),
4551 ("d2", Arc::new(d2) as ArrayRef),
4552 ("d3", Arc::new(d3) as ArrayRef),
4553 ])
4554 .unwrap();
4555
4556 let mut buffer = Vec::with_capacity(1024);
4557 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4558 writer.write(&batch).unwrap();
4559 writer.close().unwrap();
4560
4561 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4562 let t1 = builder.parquet_schema().columns()[0].physical_type();
4563 assert_eq!(t1, PhysicalType::INT32);
4564 let t2 = builder.parquet_schema().columns()[1].physical_type();
4565 assert_eq!(t2, PhysicalType::INT64);
4566 let t3 = builder.parquet_schema().columns()[2].physical_type();
4567 assert_eq!(t3, PhysicalType::INT64);
4568
4569 let mut reader = builder.build().unwrap();
4570 assert_eq!(batch.schema(), reader.schema());
4571
4572 let out = reader.next().unwrap().unwrap();
4573 assert_eq!(batch, out);
4574 }
4575
4576 fn test_decimal_roundtrip<T: DecimalType>() {
4577 let d = |values: Vec<usize>, p: u8| {
4582 let iter = values.into_iter().map(T::Native::usize_as);
4583 PrimitiveArray::<T>::from_iter_values(iter)
4584 .with_precision_and_scale(p, 2)
4585 .unwrap()
4586 };
4587
4588 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4589 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4590 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4591 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4592
4593 let batch = RecordBatch::try_from_iter([
4594 ("d1", Arc::new(d1) as ArrayRef),
4595 ("d2", Arc::new(d2) as ArrayRef),
4596 ("d3", Arc::new(d3) as ArrayRef),
4597 ("d4", Arc::new(d4) as ArrayRef),
4598 ])
4599 .unwrap();
4600
4601 let mut buffer = Vec::with_capacity(1024);
4602 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4603 writer.write(&batch).unwrap();
4604 writer.close().unwrap();
4605
4606 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4607 let t1 = builder.parquet_schema().columns()[0].physical_type();
4608 assert_eq!(t1, PhysicalType::INT32);
4609 let t2 = builder.parquet_schema().columns()[1].physical_type();
4610 assert_eq!(t2, PhysicalType::INT64);
4611 let t3 = builder.parquet_schema().columns()[2].physical_type();
4612 assert_eq!(t3, PhysicalType::INT64);
4613 let t4 = builder.parquet_schema().columns()[3].physical_type();
4614 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4615
4616 let mut reader = builder.build().unwrap();
4617 assert_eq!(batch.schema(), reader.schema());
4618
4619 let out = reader.next().unwrap().unwrap();
4620 assert_eq!(batch, out);
4621 }
4622
4623 #[test]
4624 fn test_decimal() {
4625 test_decimal32_roundtrip();
4626 test_decimal64_roundtrip();
4627 test_decimal_roundtrip::<Decimal128Type>();
4628 test_decimal_roundtrip::<Decimal256Type>();
4629 }
4630
4631 #[test]
4632 fn test_list_selection() {
4633 let schema = Arc::new(Schema::new(vec![Field::new_list(
4634 "list",
4635 Field::new_list_field(ArrowDataType::Utf8, true),
4636 false,
4637 )]));
4638 let mut buf = Vec::with_capacity(1024);
4639
4640 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4641
4642 for i in 0..2 {
4643 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4644 for j in 0..1024 {
4645 list_a_builder.values().append_value(format!("{i} {j}"));
4646 list_a_builder.append(true);
4647 }
4648 let batch =
4649 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4650 .unwrap();
4651 writer.write(&batch).unwrap();
4652 }
4653 let _metadata = writer.close().unwrap();
4654
4655 let buf = Bytes::from(buf);
4656 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4657 .unwrap()
4658 .with_row_selection(RowSelection::from(vec![
4659 RowSelector::skip(100),
4660 RowSelector::select(924),
4661 RowSelector::skip(100),
4662 RowSelector::select(924),
4663 ]))
4664 .build()
4665 .unwrap();
4666
4667 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4668 let batch = concat_batches(&schema, &batches).unwrap();
4669
4670 assert_eq!(batch.num_rows(), 924 * 2);
4671 let list = batch.column(0).as_list::<i32>();
4672
4673 for w in list.value_offsets().windows(2) {
4674 assert_eq!(w[0] + 1, w[1])
4675 }
4676 let mut values = list.values().as_string::<i32>().iter();
4677
4678 for i in 0..2 {
4679 for j in 100..1024 {
4680 let expected = format!("{i} {j}");
4681 assert_eq!(values.next().unwrap().unwrap(), &expected);
4682 }
4683 }
4684 }
4685
4686 #[test]
4687 fn test_list_selection_fuzz() {
4688 let mut rng = rng();
4689 let schema = Arc::new(Schema::new(vec![Field::new_list(
4690 "list",
4691 Field::new_list(
4692 Field::LIST_FIELD_DEFAULT_NAME,
4693 Field::new_list_field(ArrowDataType::Int32, true),
4694 true,
4695 ),
4696 true,
4697 )]));
4698 let mut buf = Vec::with_capacity(1024);
4699 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4700
4701 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4702
4703 for _ in 0..2048 {
4704 if rng.random_bool(0.2) {
4705 list_a_builder.append(false);
4706 continue;
4707 }
4708
4709 let list_a_len = rng.random_range(0..10);
4710 let list_b_builder = list_a_builder.values();
4711
4712 for _ in 0..list_a_len {
4713 if rng.random_bool(0.2) {
4714 list_b_builder.append(false);
4715 continue;
4716 }
4717
4718 let list_b_len = rng.random_range(0..10);
4719 let int_builder = list_b_builder.values();
4720 for _ in 0..list_b_len {
4721 match rng.random_bool(0.2) {
4722 true => int_builder.append_null(),
4723 false => int_builder.append_value(rng.random()),
4724 }
4725 }
4726 list_b_builder.append(true)
4727 }
4728 list_a_builder.append(true);
4729 }
4730
4731 let array = Arc::new(list_a_builder.finish());
4732 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4733
4734 writer.write(&batch).unwrap();
4735 let _metadata = writer.close().unwrap();
4736
4737 let buf = Bytes::from(buf);
4738
4739 let cases = [
4740 vec![
4741 RowSelector::skip(100),
4742 RowSelector::select(924),
4743 RowSelector::skip(100),
4744 RowSelector::select(924),
4745 ],
4746 vec![
4747 RowSelector::select(924),
4748 RowSelector::skip(100),
4749 RowSelector::select(924),
4750 RowSelector::skip(100),
4751 ],
4752 vec![
4753 RowSelector::skip(1023),
4754 RowSelector::select(1),
4755 RowSelector::skip(1023),
4756 RowSelector::select(1),
4757 ],
4758 vec![
4759 RowSelector::select(1),
4760 RowSelector::skip(1023),
4761 RowSelector::select(1),
4762 RowSelector::skip(1023),
4763 ],
4764 ];
4765
4766 for batch_size in [100, 1024, 2048] {
4767 for selection in &cases {
4768 let selection = RowSelection::from(selection.clone());
4769 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4770 .unwrap()
4771 .with_row_selection(selection.clone())
4772 .with_batch_size(batch_size)
4773 .build()
4774 .unwrap();
4775
4776 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4777 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4778 assert_eq!(actual.num_rows(), selection.row_count());
4779
4780 let mut batch_offset = 0;
4781 let mut actual_offset = 0;
4782 for selector in selection.iter() {
4783 if selector.skip {
4784 batch_offset += selector.row_count;
4785 continue;
4786 }
4787
4788 assert_eq!(
4789 batch.slice(batch_offset, selector.row_count),
4790 actual.slice(actual_offset, selector.row_count)
4791 );
4792
4793 batch_offset += selector.row_count;
4794 actual_offset += selector.row_count;
4795 }
4796 }
4797 }
4798 }
4799
4800 #[test]
4801 fn test_read_old_nested_list() {
4802 use arrow::datatypes::DataType;
4803 use arrow::datatypes::ToByteSlice;
4804
4805 let testdata = arrow::util::test_util::parquet_test_data();
4806 let path = format!("{testdata}/old_list_structure.parquet");
4815 let test_file = File::open(path).unwrap();
4816
4817 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4819
4820 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4822
4823 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4825 "array",
4826 DataType::Int32,
4827 false,
4828 ))))
4829 .len(2)
4830 .add_buffer(a_value_offsets)
4831 .add_child_data(a_values.into_data())
4832 .build()
4833 .unwrap();
4834 let a = ListArray::from(a_list_data);
4835
4836 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4837 let mut reader = builder.build().unwrap();
4838 let out = reader.next().unwrap().unwrap();
4839 assert_eq!(out.num_rows(), 1);
4840 assert_eq!(out.num_columns(), 1);
4841 let c0 = out.column(0);
4843 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4844 let r0 = c0arr.value(0);
4846 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4847 assert_eq!(r0arr, &a);
4848 }
4849
4850 #[test]
4851 fn test_map_no_value() {
4852 let testdata = arrow::util::test_util::parquet_test_data();
4872 let path = format!("{testdata}/map_no_value.parquet");
4873 let file = File::open(path).unwrap();
4874
4875 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4876 .unwrap()
4877 .build()
4878 .unwrap();
4879 let out = reader.next().unwrap().unwrap();
4880 assert_eq!(out.num_rows(), 3);
4881 assert_eq!(out.num_columns(), 3);
4882 let c0 = out.column(1).as_list::<i32>();
4884 let c1 = out.column(2).as_list::<i32>();
4885 assert_eq!(c0.len(), c1.len());
4886 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4887 }
4888
4889 #[test]
4890 fn test_get_row_group_column_bloom_filter_with_length() {
4891 let testdata = arrow::util::test_util::parquet_test_data();
4893 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4894 let file = File::open(path).unwrap();
4895 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4896 let schema = builder.schema().clone();
4897 let reader = builder.build().unwrap();
4898
4899 let mut parquet_data = Vec::new();
4900 let props = WriterProperties::builder()
4901 .set_bloom_filter_enabled(true)
4902 .build();
4903 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4904 for batch in reader {
4905 let batch = batch.unwrap();
4906 writer.write(&batch).unwrap();
4907 }
4908 writer.close().unwrap();
4909
4910 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4912 }
4913
4914 #[test]
4915 fn test_get_row_group_column_bloom_filter_without_length() {
4916 let testdata = arrow::util::test_util::parquet_test_data();
4917 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4918 let data = Bytes::from(std::fs::read(path).unwrap());
4919 test_get_row_group_column_bloom_filter(data, false);
4920 }
4921
4922 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
4923 let mut builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
4924
4925 let metadata = builder.metadata();
4926 assert_eq!(metadata.num_row_groups(), 1);
4927 let row_group = metadata.row_group(0);
4928 let column = row_group.column(0);
4929 assert_eq!(column.bloom_filter_length().is_some(), with_length);
4930
4931 let sbbf = builder
4932 .get_row_group_column_bloom_filter(0, 0)
4933 .unwrap()
4934 .unwrap();
4935 assert!(sbbf.check(&"Hello"));
4936 assert!(!sbbf.check(&"Hello_Not_Exists"));
4937 }
4938}