1use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
32use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use crate::bloom_filter::{
34 chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
35};
36use crate::column::page::{PageIterator, PageReader};
37#[cfg(feature = "encryption")]
38use crate::encryption::decrypt::FileDecryptionProperties;
39use crate::errors::{ParquetError, Result};
40use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
41use crate::file::reader::{ChunkReader, SerializedPageReader};
42use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
43use crate::schema::types::SchemaDescriptor;
44
45use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
46pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
47
48mod filter;
49pub mod metrics;
50mod read_plan;
51mod selection;
52pub mod statistics;
53
54pub struct ArrowReaderBuilder<T> {
100 pub(crate) input: T,
101
102 pub(crate) metadata: Arc<ParquetMetaData>,
103
104 pub(crate) schema: SchemaRef,
105
106 pub(crate) fields: Option<Arc<ParquetField>>,
107
108 pub(crate) batch_size: usize,
109
110 pub(crate) row_groups: Option<Vec<usize>>,
111
112 pub(crate) projection: ProjectionMask,
113
114 pub(crate) filter: Option<RowFilter>,
115
116 pub(crate) selection: Option<RowSelection>,
117
118 pub(crate) limit: Option<usize>,
119
120 pub(crate) offset: Option<usize>,
121
122 pub(crate) metrics: ArrowReaderMetrics,
123
124 pub(crate) max_predicate_cache_size: usize,
125}
126
127impl<T: Debug> Debug for ArrowReaderBuilder<T> {
128 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("ArrowReaderBuilder<T>")
130 .field("input", &self.input)
131 .field("metadata", &self.metadata)
132 .field("schema", &self.schema)
133 .field("fields", &self.fields)
134 .field("batch_size", &self.batch_size)
135 .field("row_groups", &self.row_groups)
136 .field("projection", &self.projection)
137 .field("filter", &self.filter)
138 .field("selection", &self.selection)
139 .field("limit", &self.limit)
140 .field("offset", &self.offset)
141 .field("metrics", &self.metrics)
142 .finish()
143 }
144}
145
146impl<T> ArrowReaderBuilder<T> {
147 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
148 Self {
149 input,
150 metadata: metadata.metadata,
151 schema: metadata.schema,
152 fields: metadata.fields,
153 batch_size: 1024,
154 row_groups: None,
155 projection: ProjectionMask::all(),
156 filter: None,
157 selection: None,
158 limit: None,
159 offset: None,
160 metrics: ArrowReaderMetrics::Disabled,
161 max_predicate_cache_size: 100 * 1024 * 1024, }
163 }
164
165 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
167 &self.metadata
168 }
169
170 pub fn parquet_schema(&self) -> &SchemaDescriptor {
172 self.metadata.file_metadata().schema_descr()
173 }
174
175 pub fn schema(&self) -> &SchemaRef {
177 &self.schema
178 }
179
180 pub fn with_batch_size(self, batch_size: usize) -> Self {
183 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
185 Self { batch_size, ..self }
186 }
187
188 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
192 Self {
193 row_groups: Some(row_groups),
194 ..self
195 }
196 }
197
198 pub fn with_projection(self, mask: ProjectionMask) -> Self {
200 Self {
201 projection: mask,
202 ..self
203 }
204 }
205
206 pub fn with_row_selection(self, selection: RowSelection) -> Self {
266 Self {
267 selection: Some(selection),
268 ..self
269 }
270 }
271
272 pub fn with_row_filter(self, filter: RowFilter) -> Self {
279 Self {
280 filter: Some(filter),
281 ..self
282 }
283 }
284
285 pub fn with_limit(self, limit: usize) -> Self {
293 Self {
294 limit: Some(limit),
295 ..self
296 }
297 }
298
299 pub fn with_offset(self, offset: usize) -> Self {
307 Self {
308 offset: Some(offset),
309 ..self
310 }
311 }
312
313 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
348 Self { metrics, ..self }
349 }
350
351 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
366 Self {
367 max_predicate_cache_size,
368 ..self
369 }
370 }
371}
372
373#[derive(Debug, Clone, Default)]
378pub struct ArrowReaderOptions {
379 skip_arrow_metadata: bool,
381 supplied_schema: Option<SchemaRef>,
386 pub(crate) page_index: bool,
388 #[cfg(feature = "encryption")]
390 pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
391}
392
393impl ArrowReaderOptions {
394 pub fn new() -> Self {
396 Self::default()
397 }
398
399 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
406 Self {
407 skip_arrow_metadata,
408 ..self
409 }
410 }
411
412 pub fn with_schema(self, schema: SchemaRef) -> Self {
469 Self {
470 supplied_schema: Some(schema),
471 skip_arrow_metadata: true,
472 ..self
473 }
474 }
475
476 pub fn with_page_index(self, page_index: bool) -> Self {
489 Self { page_index, ..self }
490 }
491
492 #[cfg(feature = "encryption")]
496 pub fn with_file_decryption_properties(
497 self,
498 file_decryption_properties: FileDecryptionProperties,
499 ) -> Self {
500 Self {
501 file_decryption_properties: Some(file_decryption_properties),
502 ..self
503 }
504 }
505
506 pub fn page_index(&self) -> bool {
510 self.page_index
511 }
512
513 #[cfg(feature = "encryption")]
518 pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
519 self.file_decryption_properties.as_ref()
520 }
521}
522
523#[derive(Debug, Clone)]
538pub struct ArrowReaderMetadata {
539 pub(crate) metadata: Arc<ParquetMetaData>,
541 pub(crate) schema: SchemaRef,
543
544 pub(crate) fields: Option<Arc<ParquetField>>,
545}
546
547impl ArrowReaderMetadata {
548 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
559 let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
560 #[cfg(feature = "encryption")]
561 let metadata =
562 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
563 let metadata = metadata.parse_and_finish(reader)?;
564 Self::try_new(Arc::new(metadata), options)
565 }
566
567 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
574 match options.supplied_schema {
575 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
576 None => {
577 let kv_metadata = match options.skip_arrow_metadata {
578 true => None,
579 false => metadata.file_metadata().key_value_metadata(),
580 };
581
582 let (schema, fields) = parquet_to_arrow_schema_and_fields(
583 metadata.file_metadata().schema_descr(),
584 ProjectionMask::all(),
585 kv_metadata,
586 )?;
587
588 Ok(Self {
589 metadata,
590 schema: Arc::new(schema),
591 fields: fields.map(Arc::new),
592 })
593 }
594 }
595 }
596
597 fn with_supplied_schema(
598 metadata: Arc<ParquetMetaData>,
599 supplied_schema: SchemaRef,
600 ) -> Result<Self> {
601 let parquet_schema = metadata.file_metadata().schema_descr();
602 let field_levels = parquet_to_arrow_field_levels(
603 parquet_schema,
604 ProjectionMask::all(),
605 Some(supplied_schema.fields()),
606 )?;
607 let fields = field_levels.fields;
608 let inferred_len = fields.len();
609 let supplied_len = supplied_schema.fields().len();
610 if inferred_len != supplied_len {
614 return Err(arrow_err!(format!(
615 "Incompatible supplied Arrow schema: expected {} columns received {}",
616 inferred_len, supplied_len
617 )));
618 }
619
620 let mut errors = Vec::new();
621
622 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
623
624 for (field1, field2) in field_iter {
625 if field1.data_type() != field2.data_type() {
626 errors.push(format!(
627 "data type mismatch for field {}: requested {:?} but found {:?}",
628 field1.name(),
629 field1.data_type(),
630 field2.data_type()
631 ));
632 }
633 if field1.is_nullable() != field2.is_nullable() {
634 errors.push(format!(
635 "nullability mismatch for field {}: expected {:?} but found {:?}",
636 field1.name(),
637 field1.is_nullable(),
638 field2.is_nullable()
639 ));
640 }
641 if field1.metadata() != field2.metadata() {
642 errors.push(format!(
643 "metadata mismatch for field {}: expected {:?} but found {:?}",
644 field1.name(),
645 field1.metadata(),
646 field2.metadata()
647 ));
648 }
649 }
650
651 if !errors.is_empty() {
652 let message = errors.join(", ");
653 return Err(ParquetError::ArrowError(format!(
654 "Incompatible supplied Arrow schema: {message}",
655 )));
656 }
657
658 Ok(Self {
659 metadata,
660 schema: supplied_schema,
661 fields: field_levels.levels.map(Arc::new),
662 })
663 }
664
665 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
667 &self.metadata
668 }
669
670 pub fn parquet_schema(&self) -> &SchemaDescriptor {
672 self.metadata.file_metadata().schema_descr()
673 }
674
675 pub fn schema(&self) -> &SchemaRef {
677 &self.schema
678 }
679}
680
681#[doc(hidden)]
682pub struct SyncReader<T: ChunkReader>(T);
684
685impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
686 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
687 f.debug_tuple("SyncReader").field(&self.0).finish()
688 }
689}
690
691pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
697
698impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
699 pub fn try_new(reader: T) -> Result<Self> {
728 Self::try_new_with_options(reader, Default::default())
729 }
730
731 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
733 let metadata = ArrowReaderMetadata::load(&reader, options)?;
734 Ok(Self::new_with_metadata(reader, metadata))
735 }
736
737 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
775 Self::new_builder(SyncReader(input), metadata)
776 }
777
778 pub fn get_row_group_column_bloom_filter(
784 &mut self,
785 row_group_idx: usize,
786 column_idx: usize,
787 ) -> Result<Option<Sbbf>> {
788 let metadata = self.metadata.row_group(row_group_idx);
789 let column_metadata = metadata.column(column_idx);
790
791 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
792 offset
793 .try_into()
794 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
795 } else {
796 return Ok(None);
797 };
798
799 let buffer = match column_metadata.bloom_filter_length() {
800 Some(length) => self.input.0.get_bytes(offset, length as usize),
801 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
802 }?;
803
804 let (header, bitset_offset) =
805 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
806
807 match header.algorithm {
808 BloomFilterAlgorithm::BLOCK(_) => {
809 }
811 }
812 match header.compression {
813 BloomFilterCompression::UNCOMPRESSED(_) => {
814 }
816 }
817 match header.hash {
818 BloomFilterHash::XXHASH(_) => {
819 }
821 }
822
823 let bitset = match column_metadata.bloom_filter_length() {
824 Some(_) => buffer.slice(
825 (TryInto::<usize>::try_into(bitset_offset).unwrap()
826 - TryInto::<usize>::try_into(offset).unwrap())..,
827 ),
828 None => {
829 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
830 ParquetError::General("Bloom filter length is invalid".to_string())
831 })?;
832 self.input.0.get_bytes(bitset_offset, bitset_length)?
833 }
834 };
835 Ok(Some(Sbbf::new(&bitset)))
836 }
837
838 pub fn build(self) -> Result<ParquetRecordBatchReader> {
842 let Self {
843 input,
844 metadata,
845 schema: _,
846 fields,
847 batch_size: _,
848 row_groups,
849 projection,
850 mut filter,
851 selection,
852 limit,
853 offset,
854 metrics,
855 max_predicate_cache_size: _,
857 } = self;
858
859 let batch_size = self
861 .batch_size
862 .min(metadata.file_metadata().num_rows() as usize);
863
864 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
865
866 let reader = ReaderRowGroups {
867 reader: Arc::new(input.0),
868 metadata,
869 row_groups,
870 };
871
872 let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection);
873
874 if let Some(filter) = filter.as_mut() {
876 for predicate in filter.predicates.iter_mut() {
877 if !plan_builder.selects_any() {
879 break;
880 }
881
882 let mut cache_projection = predicate.projection().clone();
883 cache_projection.intersect(&projection);
884
885 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
886 .build_array_reader(fields.as_deref(), predicate.projection())?;
887
888 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
889 }
890 }
891
892 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
893 .build_array_reader(fields.as_deref(), &projection)?;
894
895 let read_plan = plan_builder
896 .limited(reader.num_rows())
897 .with_offset(offset)
898 .with_limit(limit)
899 .build_limited()
900 .build();
901
902 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
903 }
904}
905
906struct ReaderRowGroups<T: ChunkReader> {
907 reader: Arc<T>,
908
909 metadata: Arc<ParquetMetaData>,
910 row_groups: Vec<usize>,
912}
913
914impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
915 fn num_rows(&self) -> usize {
916 let meta = self.metadata.row_groups();
917 self.row_groups
918 .iter()
919 .map(|x| meta[*x].num_rows() as usize)
920 .sum()
921 }
922
923 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
924 Ok(Box::new(ReaderPageIterator {
925 column_idx: i,
926 reader: self.reader.clone(),
927 metadata: self.metadata.clone(),
928 row_groups: self.row_groups.clone().into_iter(),
929 }))
930 }
931}
932
933struct ReaderPageIterator<T: ChunkReader> {
934 reader: Arc<T>,
935 column_idx: usize,
936 row_groups: std::vec::IntoIter<usize>,
937 metadata: Arc<ParquetMetaData>,
938}
939
940impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
941 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
943 let rg = self.metadata.row_group(rg_idx);
944 let column_chunk_metadata = rg.column(self.column_idx);
945 let offset_index = self.metadata.offset_index();
946 let page_locations = offset_index
949 .filter(|i| !i[rg_idx].is_empty())
950 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
951 let total_rows = rg.num_rows() as usize;
952 let reader = self.reader.clone();
953
954 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
955 .add_crypto_context(
956 rg_idx,
957 self.column_idx,
958 self.metadata.as_ref(),
959 column_chunk_metadata,
960 )
961 }
962}
963
964impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
965 type Item = Result<Box<dyn PageReader>>;
966
967 fn next(&mut self) -> Option<Self::Item> {
968 let rg_idx = self.row_groups.next()?;
969 let page_reader = self
970 .next_page_reader(rg_idx)
971 .map(|page_reader| Box::new(page_reader) as _);
972 Some(page_reader)
973 }
974}
975
976impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
977
978pub struct ParquetRecordBatchReader {
981 array_reader: Box<dyn ArrayReader>,
982 schema: SchemaRef,
983 read_plan: ReadPlan,
984}
985
986impl Iterator for ParquetRecordBatchReader {
987 type Item = Result<RecordBatch, ArrowError>;
988
989 fn next(&mut self) -> Option<Self::Item> {
990 self.next_inner()
991 .map_err(|arrow_err| arrow_err.into())
992 .transpose()
993 }
994}
995
996impl ParquetRecordBatchReader {
997 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1003 let mut read_records = 0;
1004 let batch_size = self.batch_size();
1005 match self.read_plan.selection_mut() {
1006 Some(selection) => {
1007 while read_records < batch_size && !selection.is_empty() {
1008 let front = selection.pop_front().unwrap();
1009 if front.skip {
1010 let skipped = self.array_reader.skip_records(front.row_count)?;
1011
1012 if skipped != front.row_count {
1013 return Err(general_err!(
1014 "failed to skip rows, expected {}, got {}",
1015 front.row_count,
1016 skipped
1017 ));
1018 }
1019 continue;
1020 }
1021
1022 if front.row_count == 0 {
1025 continue;
1026 }
1027
1028 let need_read = batch_size - read_records;
1030 let to_read = match front.row_count.checked_sub(need_read) {
1031 Some(remaining) if remaining != 0 => {
1032 selection.push_front(RowSelector::select(remaining));
1035 need_read
1036 }
1037 _ => front.row_count,
1038 };
1039 match self.array_reader.read_records(to_read)? {
1040 0 => break,
1041 rec => read_records += rec,
1042 };
1043 }
1044 }
1045 None => {
1046 self.array_reader.read_records(batch_size)?;
1047 }
1048 };
1049
1050 let array = self.array_reader.consume_batch()?;
1051 let struct_array = array.as_struct_opt().ok_or_else(|| {
1052 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1053 })?;
1054
1055 Ok(if struct_array.len() > 0 {
1056 Some(RecordBatch::from(struct_array))
1057 } else {
1058 None
1059 })
1060 }
1061}
1062
1063impl RecordBatchReader for ParquetRecordBatchReader {
1064 fn schema(&self) -> SchemaRef {
1069 self.schema.clone()
1070 }
1071}
1072
1073impl ParquetRecordBatchReader {
1074 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1078 ParquetRecordBatchReaderBuilder::try_new(reader)?
1079 .with_batch_size(batch_size)
1080 .build()
1081 }
1082
1083 pub fn try_new_with_row_groups(
1088 levels: &FieldLevels,
1089 row_groups: &dyn RowGroups,
1090 batch_size: usize,
1091 selection: Option<RowSelection>,
1092 ) -> Result<Self> {
1093 let metrics = ArrowReaderMetrics::disabled();
1095 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1096 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1097
1098 let read_plan = ReadPlanBuilder::new(batch_size)
1099 .with_selection(selection)
1100 .build();
1101
1102 Ok(Self {
1103 array_reader,
1104 schema: Arc::new(Schema::new(levels.fields.clone())),
1105 read_plan,
1106 })
1107 }
1108
1109 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1113 let schema = match array_reader.get_data_type() {
1114 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
1115 _ => unreachable!("Struct array reader's data type is not struct!"),
1116 };
1117
1118 Self {
1119 array_reader,
1120 schema: Arc::new(schema),
1121 read_plan,
1122 }
1123 }
1124
1125 #[inline(always)]
1126 pub(crate) fn batch_size(&self) -> usize {
1127 self.read_plan.batch_size()
1128 }
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133 use std::cmp::min;
1134 use std::collections::{HashMap, VecDeque};
1135 use std::fmt::Formatter;
1136 use std::fs::File;
1137 use std::io::Seek;
1138 use std::path::PathBuf;
1139 use std::sync::Arc;
1140
1141 use arrow_array::builder::*;
1142 use arrow_array::cast::AsArray;
1143 use arrow_array::types::{
1144 Date32Type, Date64Type, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type,
1145 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1146 Time64MicrosecondType,
1147 };
1148 use arrow_array::*;
1149 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
1150 use arrow_data::{ArrayData, ArrayDataBuilder};
1151 use arrow_schema::{
1152 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1153 };
1154 use arrow_select::concat::concat_batches;
1155 use bytes::Bytes;
1156 use half::f16;
1157 use num::PrimInt;
1158 use rand::{rng, Rng, RngCore};
1159 use tempfile::tempfile;
1160
1161 use crate::arrow::arrow_reader::{
1162 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1163 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1164 };
1165 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1166 use crate::arrow::{ArrowWriter, ProjectionMask};
1167 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1168 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1169 use crate::data_type::{
1170 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1171 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1172 };
1173 use crate::errors::Result;
1174 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1175 use crate::file::writer::SerializedFileWriter;
1176 use crate::schema::parser::parse_message_type;
1177 use crate::schema::types::{Type, TypePtr};
1178 use crate::util::test_common::rand_gen::RandGen;
1179
1180 #[test]
1181 fn test_arrow_reader_all_columns() {
1182 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1183
1184 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1185 let original_schema = Arc::clone(builder.schema());
1186 let reader = builder.build().unwrap();
1187
1188 assert_eq!(original_schema.fields(), reader.schema().fields());
1190 }
1191
1192 #[test]
1193 fn test_arrow_reader_single_column() {
1194 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1195
1196 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1197 let original_schema = Arc::clone(builder.schema());
1198
1199 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1200 let reader = builder.with_projection(mask).build().unwrap();
1201
1202 assert_eq!(1, reader.schema().fields().len());
1204 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1205 }
1206
1207 #[test]
1208 fn test_arrow_reader_single_column_by_name() {
1209 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1210
1211 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1212 let original_schema = Arc::clone(builder.schema());
1213
1214 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1215 let reader = builder.with_projection(mask).build().unwrap();
1216
1217 assert_eq!(1, reader.schema().fields().len());
1219 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1220 }
1221
1222 #[test]
1223 fn test_null_column_reader_test() {
1224 let mut file = tempfile::tempfile().unwrap();
1225
1226 let schema = "
1227 message message {
1228 OPTIONAL INT32 int32;
1229 }
1230 ";
1231 let schema = Arc::new(parse_message_type(schema).unwrap());
1232
1233 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1234 generate_single_column_file_with_data::<Int32Type>(
1235 &[vec![], vec![]],
1236 Some(&def_levels),
1237 file.try_clone().unwrap(), schema,
1239 Some(Field::new("int32", ArrowDataType::Null, true)),
1240 &Default::default(),
1241 )
1242 .unwrap();
1243
1244 file.rewind().unwrap();
1245
1246 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1247 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1248
1249 assert_eq!(batches.len(), 4);
1250 for batch in &batches[0..3] {
1251 assert_eq!(batch.num_rows(), 2);
1252 assert_eq!(batch.num_columns(), 1);
1253 assert_eq!(batch.column(0).null_count(), 2);
1254 }
1255
1256 assert_eq!(batches[3].num_rows(), 1);
1257 assert_eq!(batches[3].num_columns(), 1);
1258 assert_eq!(batches[3].column(0).null_count(), 1);
1259 }
1260
1261 #[test]
1262 fn test_primitive_single_column_reader_test() {
1263 run_single_column_reader_tests::<BoolType, _, BoolType>(
1264 2,
1265 ConvertedType::NONE,
1266 None,
1267 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1268 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1269 );
1270 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1271 2,
1272 ConvertedType::NONE,
1273 None,
1274 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1275 &[
1276 Encoding::PLAIN,
1277 Encoding::RLE_DICTIONARY,
1278 Encoding::DELTA_BINARY_PACKED,
1279 Encoding::BYTE_STREAM_SPLIT,
1280 ],
1281 );
1282 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1283 2,
1284 ConvertedType::NONE,
1285 None,
1286 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1287 &[
1288 Encoding::PLAIN,
1289 Encoding::RLE_DICTIONARY,
1290 Encoding::DELTA_BINARY_PACKED,
1291 Encoding::BYTE_STREAM_SPLIT,
1292 ],
1293 );
1294 run_single_column_reader_tests::<FloatType, _, FloatType>(
1295 2,
1296 ConvertedType::NONE,
1297 None,
1298 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1299 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1300 );
1301 }
1302
1303 #[test]
1304 fn test_unsigned_primitive_single_column_reader_test() {
1305 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1306 2,
1307 ConvertedType::UINT_32,
1308 Some(ArrowDataType::UInt32),
1309 |vals| {
1310 Arc::new(UInt32Array::from_iter(
1311 vals.iter().map(|x| x.map(|x| x as u32)),
1312 ))
1313 },
1314 &[
1315 Encoding::PLAIN,
1316 Encoding::RLE_DICTIONARY,
1317 Encoding::DELTA_BINARY_PACKED,
1318 ],
1319 );
1320 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1321 2,
1322 ConvertedType::UINT_64,
1323 Some(ArrowDataType::UInt64),
1324 |vals| {
1325 Arc::new(UInt64Array::from_iter(
1326 vals.iter().map(|x| x.map(|x| x as u64)),
1327 ))
1328 },
1329 &[
1330 Encoding::PLAIN,
1331 Encoding::RLE_DICTIONARY,
1332 Encoding::DELTA_BINARY_PACKED,
1333 ],
1334 );
1335 }
1336
1337 #[test]
1338 fn test_unsigned_roundtrip() {
1339 let schema = Arc::new(Schema::new(vec![
1340 Field::new("uint32", ArrowDataType::UInt32, true),
1341 Field::new("uint64", ArrowDataType::UInt64, true),
1342 ]));
1343
1344 let mut buf = Vec::with_capacity(1024);
1345 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1346
1347 let original = RecordBatch::try_new(
1348 schema,
1349 vec![
1350 Arc::new(UInt32Array::from_iter_values([
1351 0,
1352 i32::MAX as u32,
1353 u32::MAX,
1354 ])),
1355 Arc::new(UInt64Array::from_iter_values([
1356 0,
1357 i64::MAX as u64,
1358 u64::MAX,
1359 ])),
1360 ],
1361 )
1362 .unwrap();
1363
1364 writer.write(&original).unwrap();
1365 writer.close().unwrap();
1366
1367 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1368 let ret = reader.next().unwrap().unwrap();
1369 assert_eq!(ret, original);
1370
1371 ret.column(0)
1373 .as_any()
1374 .downcast_ref::<UInt32Array>()
1375 .unwrap();
1376
1377 ret.column(1)
1378 .as_any()
1379 .downcast_ref::<UInt64Array>()
1380 .unwrap();
1381 }
1382
1383 #[test]
1384 fn test_float16_roundtrip() -> Result<()> {
1385 let schema = Arc::new(Schema::new(vec![
1386 Field::new("float16", ArrowDataType::Float16, false),
1387 Field::new("float16-nullable", ArrowDataType::Float16, true),
1388 ]));
1389
1390 let mut buf = Vec::with_capacity(1024);
1391 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1392
1393 let original = RecordBatch::try_new(
1394 schema,
1395 vec![
1396 Arc::new(Float16Array::from_iter_values([
1397 f16::EPSILON,
1398 f16::MIN,
1399 f16::MAX,
1400 f16::NAN,
1401 f16::INFINITY,
1402 f16::NEG_INFINITY,
1403 f16::ONE,
1404 f16::NEG_ONE,
1405 f16::ZERO,
1406 f16::NEG_ZERO,
1407 f16::E,
1408 f16::PI,
1409 f16::FRAC_1_PI,
1410 ])),
1411 Arc::new(Float16Array::from(vec![
1412 None,
1413 None,
1414 None,
1415 Some(f16::NAN),
1416 Some(f16::INFINITY),
1417 Some(f16::NEG_INFINITY),
1418 None,
1419 None,
1420 None,
1421 None,
1422 None,
1423 None,
1424 Some(f16::FRAC_1_PI),
1425 ])),
1426 ],
1427 )?;
1428
1429 writer.write(&original)?;
1430 writer.close()?;
1431
1432 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1433 let ret = reader.next().unwrap()?;
1434 assert_eq!(ret, original);
1435
1436 ret.column(0).as_primitive::<Float16Type>();
1438 ret.column(1).as_primitive::<Float16Type>();
1439
1440 Ok(())
1441 }
1442
1443 #[test]
1444 fn test_time_utc_roundtrip() -> Result<()> {
1445 let schema = Arc::new(Schema::new(vec![
1446 Field::new(
1447 "time_millis",
1448 ArrowDataType::Time32(TimeUnit::Millisecond),
1449 true,
1450 )
1451 .with_metadata(HashMap::from_iter(vec![(
1452 "adjusted_to_utc".to_string(),
1453 "".to_string(),
1454 )])),
1455 Field::new(
1456 "time_micros",
1457 ArrowDataType::Time64(TimeUnit::Microsecond),
1458 true,
1459 )
1460 .with_metadata(HashMap::from_iter(vec![(
1461 "adjusted_to_utc".to_string(),
1462 "".to_string(),
1463 )])),
1464 ]));
1465
1466 let mut buf = Vec::with_capacity(1024);
1467 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1468
1469 let original = RecordBatch::try_new(
1470 schema,
1471 vec![
1472 Arc::new(Time32MillisecondArray::from(vec![
1473 Some(-1),
1474 Some(0),
1475 Some(86_399_000),
1476 Some(86_400_000),
1477 Some(86_401_000),
1478 None,
1479 ])),
1480 Arc::new(Time64MicrosecondArray::from(vec![
1481 Some(-1),
1482 Some(0),
1483 Some(86_399 * 1_000_000),
1484 Some(86_400 * 1_000_000),
1485 Some(86_401 * 1_000_000),
1486 None,
1487 ])),
1488 ],
1489 )?;
1490
1491 writer.write(&original)?;
1492 writer.close()?;
1493
1494 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1495 let ret = reader.next().unwrap()?;
1496 assert_eq!(ret, original);
1497
1498 ret.column(0).as_primitive::<Time32MillisecondType>();
1500 ret.column(1).as_primitive::<Time64MicrosecondType>();
1501
1502 Ok(())
1503 }
1504
1505 #[test]
1506 fn test_date32_roundtrip() -> Result<()> {
1507 use arrow_array::Date32Array;
1508
1509 let schema = Arc::new(Schema::new(vec![Field::new(
1510 "date32",
1511 ArrowDataType::Date32,
1512 false,
1513 )]));
1514
1515 let mut buf = Vec::with_capacity(1024);
1516
1517 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1518
1519 let original = RecordBatch::try_new(
1520 schema,
1521 vec![Arc::new(Date32Array::from(vec![
1522 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1523 ]))],
1524 )?;
1525
1526 writer.write(&original)?;
1527 writer.close()?;
1528
1529 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1530 let ret = reader.next().unwrap()?;
1531 assert_eq!(ret, original);
1532
1533 ret.column(0).as_primitive::<Date32Type>();
1535
1536 Ok(())
1537 }
1538
1539 #[test]
1540 fn test_date64_roundtrip() -> Result<()> {
1541 use arrow_array::Date64Array;
1542
1543 let schema = Arc::new(Schema::new(vec![
1544 Field::new("small-date64", ArrowDataType::Date64, false),
1545 Field::new("big-date64", ArrowDataType::Date64, false),
1546 Field::new("invalid-date64", ArrowDataType::Date64, false),
1547 ]));
1548
1549 let mut default_buf = Vec::with_capacity(1024);
1550 let mut coerce_buf = Vec::with_capacity(1024);
1551
1552 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1553
1554 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1555 let mut coerce_writer =
1556 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1557
1558 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1559
1560 let original = RecordBatch::try_new(
1561 schema,
1562 vec![
1563 Arc::new(Date64Array::from(vec![
1565 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1566 -1_000 * NUM_MILLISECONDS_IN_DAY,
1567 0,
1568 1_000 * NUM_MILLISECONDS_IN_DAY,
1569 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1570 ])),
1571 Arc::new(Date64Array::from(vec![
1573 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1574 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1575 0,
1576 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1577 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1578 ])),
1579 Arc::new(Date64Array::from(vec![
1581 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1582 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1583 1,
1584 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1585 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1586 ])),
1587 ],
1588 )?;
1589
1590 default_writer.write(&original)?;
1591 coerce_writer.write(&original)?;
1592
1593 default_writer.close()?;
1594 coerce_writer.close()?;
1595
1596 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1597 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1598
1599 let default_ret = default_reader.next().unwrap()?;
1600 let coerce_ret = coerce_reader.next().unwrap()?;
1601
1602 assert_eq!(default_ret, original);
1604
1605 assert_eq!(coerce_ret.column(0), original.column(0));
1607 assert_ne!(coerce_ret.column(1), original.column(1));
1608 assert_ne!(coerce_ret.column(2), original.column(2));
1609
1610 default_ret.column(0).as_primitive::<Date64Type>();
1612 coerce_ret.column(0).as_primitive::<Date64Type>();
1613
1614 Ok(())
1615 }
1616 struct RandFixedLenGen {}
1617
1618 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1619 fn gen(len: i32) -> FixedLenByteArray {
1620 let mut v = vec![0u8; len as usize];
1621 rng().fill_bytes(&mut v);
1622 ByteArray::from(v).into()
1623 }
1624 }
1625
1626 #[test]
1627 fn test_fixed_length_binary_column_reader() {
1628 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1629 20,
1630 ConvertedType::NONE,
1631 None,
1632 |vals| {
1633 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1634 for val in vals {
1635 match val {
1636 Some(b) => builder.append_value(b).unwrap(),
1637 None => builder.append_null(),
1638 }
1639 }
1640 Arc::new(builder.finish())
1641 },
1642 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1643 );
1644 }
1645
1646 #[test]
1647 fn test_interval_day_time_column_reader() {
1648 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1649 12,
1650 ConvertedType::INTERVAL,
1651 None,
1652 |vals| {
1653 Arc::new(
1654 vals.iter()
1655 .map(|x| {
1656 x.as_ref().map(|b| IntervalDayTime {
1657 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1658 milliseconds: i32::from_le_bytes(
1659 b.as_ref()[8..12].try_into().unwrap(),
1660 ),
1661 })
1662 })
1663 .collect::<IntervalDayTimeArray>(),
1664 )
1665 },
1666 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1667 );
1668 }
1669
1670 #[test]
1671 fn test_int96_single_column_reader_test() {
1672 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1673
1674 type TypeHintAndConversionFunction =
1675 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1676
1677 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1678 (None, |vals: &[Option<Int96>]| {
1680 Arc::new(TimestampNanosecondArray::from_iter(
1681 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1682 )) as ArrayRef
1683 }),
1684 (
1686 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1687 |vals: &[Option<Int96>]| {
1688 Arc::new(TimestampSecondArray::from_iter(
1689 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1690 )) as ArrayRef
1691 },
1692 ),
1693 (
1694 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1695 |vals: &[Option<Int96>]| {
1696 Arc::new(TimestampMillisecondArray::from_iter(
1697 vals.iter().map(|x| x.map(|x| x.to_millis())),
1698 )) as ArrayRef
1699 },
1700 ),
1701 (
1702 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1703 |vals: &[Option<Int96>]| {
1704 Arc::new(TimestampMicrosecondArray::from_iter(
1705 vals.iter().map(|x| x.map(|x| x.to_micros())),
1706 )) as ArrayRef
1707 },
1708 ),
1709 (
1710 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1711 |vals: &[Option<Int96>]| {
1712 Arc::new(TimestampNanosecondArray::from_iter(
1713 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1714 )) as ArrayRef
1715 },
1716 ),
1717 (
1719 Some(ArrowDataType::Timestamp(
1720 TimeUnit::Second,
1721 Some(Arc::from("-05:00")),
1722 )),
1723 |vals: &[Option<Int96>]| {
1724 Arc::new(
1725 TimestampSecondArray::from_iter(
1726 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1727 )
1728 .with_timezone("-05:00"),
1729 ) as ArrayRef
1730 },
1731 ),
1732 ];
1733
1734 resolutions.iter().for_each(|(arrow_type, converter)| {
1735 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1736 2,
1737 ConvertedType::NONE,
1738 arrow_type.clone(),
1739 converter,
1740 encodings,
1741 );
1742 })
1743 }
1744
1745 #[test]
1746 fn test_int96_from_spark_file_with_provided_schema() {
1747 use arrow_schema::DataType::Timestamp;
1751 let test_data = arrow::util::test_util::parquet_test_data();
1752 let path = format!("{test_data}/int96_from_spark.parquet");
1753 let file = File::open(path).unwrap();
1754
1755 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1756 "a",
1757 Timestamp(TimeUnit::Microsecond, None),
1758 true,
1759 )]));
1760 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1761
1762 let mut record_reader =
1763 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1764 .unwrap()
1765 .build()
1766 .unwrap();
1767
1768 let batch = record_reader.next().unwrap().unwrap();
1769 assert_eq!(batch.num_columns(), 1);
1770 let column = batch.column(0);
1771 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1772
1773 let expected = Arc::new(Int64Array::from(vec![
1774 Some(1704141296123456),
1775 Some(1704070800000000),
1776 Some(253402225200000000),
1777 Some(1735599600000000),
1778 None,
1779 Some(9089380393200000000),
1780 ]));
1781
1782 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1787 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1788
1789 assert_eq!(casted_timestamps.len(), expected.len());
1790
1791 casted_timestamps
1792 .iter()
1793 .zip(expected.iter())
1794 .for_each(|(lhs, rhs)| {
1795 assert_eq!(lhs, rhs);
1796 });
1797 }
1798
1799 #[test]
1800 fn test_int96_from_spark_file_without_provided_schema() {
1801 use arrow_schema::DataType::Timestamp;
1805 let test_data = arrow::util::test_util::parquet_test_data();
1806 let path = format!("{test_data}/int96_from_spark.parquet");
1807 let file = File::open(path).unwrap();
1808
1809 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1810 .unwrap()
1811 .build()
1812 .unwrap();
1813
1814 let batch = record_reader.next().unwrap().unwrap();
1815 assert_eq!(batch.num_columns(), 1);
1816 let column = batch.column(0);
1817 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1818
1819 let expected = Arc::new(Int64Array::from(vec![
1820 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1825 Some(-4864435138808946688), ]));
1827
1828 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1833 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1834
1835 assert_eq!(casted_timestamps.len(), expected.len());
1836
1837 casted_timestamps
1838 .iter()
1839 .zip(expected.iter())
1840 .for_each(|(lhs, rhs)| {
1841 assert_eq!(lhs, rhs);
1842 });
1843 }
1844
1845 struct RandUtf8Gen {}
1846
1847 impl RandGen<ByteArrayType> for RandUtf8Gen {
1848 fn gen(len: i32) -> ByteArray {
1849 Int32Type::gen(len).to_string().as_str().into()
1850 }
1851 }
1852
1853 #[test]
1854 fn test_utf8_single_column_reader_test() {
1855 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1856 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1857 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1858 })))
1859 }
1860
1861 let encodings = &[
1862 Encoding::PLAIN,
1863 Encoding::RLE_DICTIONARY,
1864 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1865 Encoding::DELTA_BYTE_ARRAY,
1866 ];
1867
1868 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1869 2,
1870 ConvertedType::NONE,
1871 None,
1872 |vals| {
1873 Arc::new(BinaryArray::from_iter(
1874 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1875 ))
1876 },
1877 encodings,
1878 );
1879
1880 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1881 2,
1882 ConvertedType::UTF8,
1883 None,
1884 string_converter::<i32>,
1885 encodings,
1886 );
1887
1888 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1889 2,
1890 ConvertedType::UTF8,
1891 Some(ArrowDataType::Utf8),
1892 string_converter::<i32>,
1893 encodings,
1894 );
1895
1896 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1897 2,
1898 ConvertedType::UTF8,
1899 Some(ArrowDataType::LargeUtf8),
1900 string_converter::<i64>,
1901 encodings,
1902 );
1903
1904 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1905 for key in &small_key_types {
1906 for encoding in encodings {
1907 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1908 opts.encoding = *encoding;
1909
1910 let data_type =
1911 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1912
1913 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1915 opts,
1916 2,
1917 ConvertedType::UTF8,
1918 Some(data_type.clone()),
1919 move |vals| {
1920 let vals = string_converter::<i32>(vals);
1921 arrow::compute::cast(&vals, &data_type).unwrap()
1922 },
1923 );
1924 }
1925 }
1926
1927 let key_types = [
1928 ArrowDataType::Int16,
1929 ArrowDataType::UInt16,
1930 ArrowDataType::Int32,
1931 ArrowDataType::UInt32,
1932 ArrowDataType::Int64,
1933 ArrowDataType::UInt64,
1934 ];
1935
1936 for key in &key_types {
1937 let data_type =
1938 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1939
1940 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1941 2,
1942 ConvertedType::UTF8,
1943 Some(data_type.clone()),
1944 move |vals| {
1945 let vals = string_converter::<i32>(vals);
1946 arrow::compute::cast(&vals, &data_type).unwrap()
1947 },
1948 encodings,
1949 );
1950
1951 }
1968 }
1969
1970 #[test]
1971 fn test_decimal_nullable_struct() {
1972 let decimals = Decimal256Array::from_iter_values(
1973 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1974 );
1975
1976 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1977 "decimals",
1978 decimals.data_type().clone(),
1979 false,
1980 )])))
1981 .len(8)
1982 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1983 .child_data(vec![decimals.into_data()])
1984 .build()
1985 .unwrap();
1986
1987 let written =
1988 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1989 .unwrap();
1990
1991 let mut buffer = Vec::with_capacity(1024);
1992 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1993 writer.write(&written).unwrap();
1994 writer.close().unwrap();
1995
1996 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1997 .unwrap()
1998 .collect::<Result<Vec<_>, _>>()
1999 .unwrap();
2000
2001 assert_eq!(&written.slice(0, 3), &read[0]);
2002 assert_eq!(&written.slice(3, 3), &read[1]);
2003 assert_eq!(&written.slice(6, 2), &read[2]);
2004 }
2005
2006 #[test]
2007 fn test_int32_nullable_struct() {
2008 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2009 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2010 "int32",
2011 int32.data_type().clone(),
2012 false,
2013 )])))
2014 .len(8)
2015 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2016 .child_data(vec![int32.into_data()])
2017 .build()
2018 .unwrap();
2019
2020 let written =
2021 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2022 .unwrap();
2023
2024 let mut buffer = Vec::with_capacity(1024);
2025 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2026 writer.write(&written).unwrap();
2027 writer.close().unwrap();
2028
2029 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2030 .unwrap()
2031 .collect::<Result<Vec<_>, _>>()
2032 .unwrap();
2033
2034 assert_eq!(&written.slice(0, 3), &read[0]);
2035 assert_eq!(&written.slice(3, 3), &read[1]);
2036 assert_eq!(&written.slice(6, 2), &read[2]);
2037 }
2038
2039 #[test]
2040 fn test_decimal_list() {
2041 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2042
2043 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2045 decimals.data_type().clone(),
2046 false,
2047 ))))
2048 .len(7)
2049 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2050 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2051 .child_data(vec![decimals.into_data()])
2052 .build()
2053 .unwrap();
2054
2055 let written =
2056 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2057 .unwrap();
2058
2059 let mut buffer = Vec::with_capacity(1024);
2060 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2061 writer.write(&written).unwrap();
2062 writer.close().unwrap();
2063
2064 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2065 .unwrap()
2066 .collect::<Result<Vec<_>, _>>()
2067 .unwrap();
2068
2069 assert_eq!(&written.slice(0, 3), &read[0]);
2070 assert_eq!(&written.slice(3, 3), &read[1]);
2071 assert_eq!(&written.slice(6, 1), &read[2]);
2072 }
2073
2074 #[test]
2075 fn test_read_decimal_file() {
2076 use arrow_array::Decimal128Array;
2077 let testdata = arrow::util::test_util::parquet_test_data();
2078 let file_variants = vec![
2079 ("byte_array", 4),
2080 ("fixed_length", 25),
2081 ("int32", 4),
2082 ("int64", 10),
2083 ];
2084 for (prefix, target_precision) in file_variants {
2085 let path = format!("{testdata}/{prefix}_decimal.parquet");
2086 let file = File::open(path).unwrap();
2087 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2088
2089 let batch = record_reader.next().unwrap().unwrap();
2090 assert_eq!(batch.num_rows(), 24);
2091 let col = batch
2092 .column(0)
2093 .as_any()
2094 .downcast_ref::<Decimal128Array>()
2095 .unwrap();
2096
2097 let expected = 1..25;
2098
2099 assert_eq!(col.precision(), target_precision);
2100 assert_eq!(col.scale(), 2);
2101
2102 for (i, v) in expected.enumerate() {
2103 assert_eq!(col.value(i), v * 100_i128);
2104 }
2105 }
2106 }
2107
2108 #[test]
2109 fn test_read_float16_nonzeros_file() {
2110 use arrow_array::Float16Array;
2111 let testdata = arrow::util::test_util::parquet_test_data();
2112 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2114 let file = File::open(path).unwrap();
2115 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2116
2117 let batch = record_reader.next().unwrap().unwrap();
2118 assert_eq!(batch.num_rows(), 8);
2119 let col = batch
2120 .column(0)
2121 .as_any()
2122 .downcast_ref::<Float16Array>()
2123 .unwrap();
2124
2125 let f16_two = f16::ONE + f16::ONE;
2126
2127 assert_eq!(col.null_count(), 1);
2128 assert!(col.is_null(0));
2129 assert_eq!(col.value(1), f16::ONE);
2130 assert_eq!(col.value(2), -f16_two);
2131 assert!(col.value(3).is_nan());
2132 assert_eq!(col.value(4), f16::ZERO);
2133 assert!(col.value(4).is_sign_positive());
2134 assert_eq!(col.value(5), f16::NEG_ONE);
2135 assert_eq!(col.value(6), f16::NEG_ZERO);
2136 assert!(col.value(6).is_sign_negative());
2137 assert_eq!(col.value(7), f16_two);
2138 }
2139
2140 #[test]
2141 fn test_read_float16_zeros_file() {
2142 use arrow_array::Float16Array;
2143 let testdata = arrow::util::test_util::parquet_test_data();
2144 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2146 let file = File::open(path).unwrap();
2147 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2148
2149 let batch = record_reader.next().unwrap().unwrap();
2150 assert_eq!(batch.num_rows(), 3);
2151 let col = batch
2152 .column(0)
2153 .as_any()
2154 .downcast_ref::<Float16Array>()
2155 .unwrap();
2156
2157 assert_eq!(col.null_count(), 1);
2158 assert!(col.is_null(0));
2159 assert_eq!(col.value(1), f16::ZERO);
2160 assert!(col.value(1).is_sign_positive());
2161 assert!(col.value(2).is_nan());
2162 }
2163
2164 #[test]
2165 fn test_read_float32_float64_byte_stream_split() {
2166 let path = format!(
2167 "{}/byte_stream_split.zstd.parquet",
2168 arrow::util::test_util::parquet_test_data(),
2169 );
2170 let file = File::open(path).unwrap();
2171 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2172
2173 let mut row_count = 0;
2174 for batch in record_reader {
2175 let batch = batch.unwrap();
2176 row_count += batch.num_rows();
2177 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2178 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2179
2180 for &x in f32_col.values() {
2182 assert!(x > -10.0);
2183 assert!(x < 10.0);
2184 }
2185 for &x in f64_col.values() {
2186 assert!(x > -10.0);
2187 assert!(x < 10.0);
2188 }
2189 }
2190 assert_eq!(row_count, 300);
2191 }
2192
2193 #[test]
2194 fn test_read_extended_byte_stream_split() {
2195 let path = format!(
2196 "{}/byte_stream_split_extended.gzip.parquet",
2197 arrow::util::test_util::parquet_test_data(),
2198 );
2199 let file = File::open(path).unwrap();
2200 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2201
2202 let mut row_count = 0;
2203 for batch in record_reader {
2204 let batch = batch.unwrap();
2205 row_count += batch.num_rows();
2206
2207 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2209 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2210 assert_eq!(f16_col.len(), f16_bss.len());
2211 f16_col
2212 .iter()
2213 .zip(f16_bss.iter())
2214 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2215
2216 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2218 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2219 assert_eq!(f32_col.len(), f32_bss.len());
2220 f32_col
2221 .iter()
2222 .zip(f32_bss.iter())
2223 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2224
2225 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2227 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2228 assert_eq!(f64_col.len(), f64_bss.len());
2229 f64_col
2230 .iter()
2231 .zip(f64_bss.iter())
2232 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2233
2234 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2236 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2237 assert_eq!(i32_col.len(), i32_bss.len());
2238 i32_col
2239 .iter()
2240 .zip(i32_bss.iter())
2241 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2242
2243 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2245 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2246 assert_eq!(i64_col.len(), i64_bss.len());
2247 i64_col
2248 .iter()
2249 .zip(i64_bss.iter())
2250 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2251
2252 let flba_col = batch.column(10).as_fixed_size_binary();
2254 let flba_bss = batch.column(11).as_fixed_size_binary();
2255 assert_eq!(flba_col.len(), flba_bss.len());
2256 flba_col
2257 .iter()
2258 .zip(flba_bss.iter())
2259 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2260
2261 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2263 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2264 assert_eq!(dec_col.len(), dec_bss.len());
2265 dec_col
2266 .iter()
2267 .zip(dec_bss.iter())
2268 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2269 }
2270 assert_eq!(row_count, 200);
2271 }
2272
2273 #[test]
2274 fn test_read_incorrect_map_schema_file() {
2275 let testdata = arrow::util::test_util::parquet_test_data();
2276 let path = format!("{testdata}/incorrect_map_schema.parquet");
2278 let file = File::open(path).unwrap();
2279 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2280
2281 let batch = record_reader.next().unwrap().unwrap();
2282 assert_eq!(batch.num_rows(), 1);
2283
2284 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2285 "my_map",
2286 ArrowDataType::Map(
2287 Arc::new(Field::new(
2288 "key_value",
2289 ArrowDataType::Struct(Fields::from(vec![
2290 Field::new("key", ArrowDataType::Utf8, false),
2291 Field::new("value", ArrowDataType::Utf8, true),
2292 ])),
2293 false,
2294 )),
2295 false,
2296 ),
2297 true,
2298 )]));
2299 assert_eq!(batch.schema().as_ref(), &expected_schema);
2300
2301 assert_eq!(batch.num_rows(), 1);
2302 assert_eq!(batch.column(0).null_count(), 0);
2303 assert_eq!(
2304 batch.column(0).as_map().keys().as_ref(),
2305 &StringArray::from(vec!["parent", "name"])
2306 );
2307 assert_eq!(
2308 batch.column(0).as_map().values().as_ref(),
2309 &StringArray::from(vec!["another", "report"])
2310 );
2311 }
2312
2313 #[test]
2314 fn test_read_dict_fixed_size_binary() {
2315 let schema = Arc::new(Schema::new(vec![Field::new(
2316 "a",
2317 ArrowDataType::Dictionary(
2318 Box::new(ArrowDataType::UInt8),
2319 Box::new(ArrowDataType::FixedSizeBinary(8)),
2320 ),
2321 true,
2322 )]));
2323 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2324 let values = FixedSizeBinaryArray::try_from_iter(
2325 vec![
2326 (0u8..8u8).collect::<Vec<u8>>(),
2327 (24u8..32u8).collect::<Vec<u8>>(),
2328 ]
2329 .into_iter(),
2330 )
2331 .unwrap();
2332 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2333 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2334
2335 let mut buffer = Vec::with_capacity(1024);
2336 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2337 writer.write(&batch).unwrap();
2338 writer.close().unwrap();
2339 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2340 .unwrap()
2341 .collect::<Result<Vec<_>, _>>()
2342 .unwrap();
2343
2344 assert_eq!(read.len(), 1);
2345 assert_eq!(&batch, &read[0])
2346 }
2347
2348 #[derive(Clone)]
2350 struct TestOptions {
2351 num_row_groups: usize,
2354 num_rows: usize,
2356 record_batch_size: usize,
2358 null_percent: Option<usize>,
2360 write_batch_size: usize,
2365 max_data_page_size: usize,
2367 max_dict_page_size: usize,
2369 writer_version: WriterVersion,
2371 enabled_statistics: EnabledStatistics,
2373 encoding: Encoding,
2375 row_selections: Option<(RowSelection, usize)>,
2377 row_filter: Option<Vec<bool>>,
2379 limit: Option<usize>,
2381 offset: Option<usize>,
2383 }
2384
2385 impl std::fmt::Debug for TestOptions {
2387 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2388 f.debug_struct("TestOptions")
2389 .field("num_row_groups", &self.num_row_groups)
2390 .field("num_rows", &self.num_rows)
2391 .field("record_batch_size", &self.record_batch_size)
2392 .field("null_percent", &self.null_percent)
2393 .field("write_batch_size", &self.write_batch_size)
2394 .field("max_data_page_size", &self.max_data_page_size)
2395 .field("max_dict_page_size", &self.max_dict_page_size)
2396 .field("writer_version", &self.writer_version)
2397 .field("enabled_statistics", &self.enabled_statistics)
2398 .field("encoding", &self.encoding)
2399 .field("row_selections", &self.row_selections.is_some())
2400 .field("row_filter", &self.row_filter.is_some())
2401 .field("limit", &self.limit)
2402 .field("offset", &self.offset)
2403 .finish()
2404 }
2405 }
2406
2407 impl Default for TestOptions {
2408 fn default() -> Self {
2409 Self {
2410 num_row_groups: 2,
2411 num_rows: 100,
2412 record_batch_size: 15,
2413 null_percent: None,
2414 write_batch_size: 64,
2415 max_data_page_size: 1024 * 1024,
2416 max_dict_page_size: 1024 * 1024,
2417 writer_version: WriterVersion::PARQUET_1_0,
2418 enabled_statistics: EnabledStatistics::Page,
2419 encoding: Encoding::PLAIN,
2420 row_selections: None,
2421 row_filter: None,
2422 limit: None,
2423 offset: None,
2424 }
2425 }
2426 }
2427
2428 impl TestOptions {
2429 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2430 Self {
2431 num_row_groups,
2432 num_rows,
2433 record_batch_size,
2434 ..Default::default()
2435 }
2436 }
2437
2438 fn with_null_percent(self, null_percent: usize) -> Self {
2439 Self {
2440 null_percent: Some(null_percent),
2441 ..self
2442 }
2443 }
2444
2445 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2446 Self {
2447 max_data_page_size,
2448 ..self
2449 }
2450 }
2451
2452 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2453 Self {
2454 max_dict_page_size,
2455 ..self
2456 }
2457 }
2458
2459 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2460 Self {
2461 enabled_statistics,
2462 ..self
2463 }
2464 }
2465
2466 fn with_row_selections(self) -> Self {
2467 assert!(self.row_filter.is_none(), "Must set row selection first");
2468
2469 let mut rng = rng();
2470 let step = rng.random_range(self.record_batch_size..self.num_rows);
2471 let row_selections = create_test_selection(
2472 step,
2473 self.num_row_groups * self.num_rows,
2474 rng.random::<bool>(),
2475 );
2476 Self {
2477 row_selections: Some(row_selections),
2478 ..self
2479 }
2480 }
2481
2482 fn with_row_filter(self) -> Self {
2483 let row_count = match &self.row_selections {
2484 Some((_, count)) => *count,
2485 None => self.num_row_groups * self.num_rows,
2486 };
2487
2488 let mut rng = rng();
2489 Self {
2490 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2491 ..self
2492 }
2493 }
2494
2495 fn with_limit(self, limit: usize) -> Self {
2496 Self {
2497 limit: Some(limit),
2498 ..self
2499 }
2500 }
2501
2502 fn with_offset(self, offset: usize) -> Self {
2503 Self {
2504 offset: Some(offset),
2505 ..self
2506 }
2507 }
2508
2509 fn writer_props(&self) -> WriterProperties {
2510 let builder = WriterProperties::builder()
2511 .set_data_page_size_limit(self.max_data_page_size)
2512 .set_write_batch_size(self.write_batch_size)
2513 .set_writer_version(self.writer_version)
2514 .set_statistics_enabled(self.enabled_statistics);
2515
2516 let builder = match self.encoding {
2517 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2518 .set_dictionary_enabled(true)
2519 .set_dictionary_page_size_limit(self.max_dict_page_size),
2520 _ => builder
2521 .set_dictionary_enabled(false)
2522 .set_encoding(self.encoding),
2523 };
2524
2525 builder.build()
2526 }
2527 }
2528
2529 fn run_single_column_reader_tests<T, F, G>(
2536 rand_max: i32,
2537 converted_type: ConvertedType,
2538 arrow_type: Option<ArrowDataType>,
2539 converter: F,
2540 encodings: &[Encoding],
2541 ) where
2542 T: DataType,
2543 G: RandGen<T>,
2544 F: Fn(&[Option<T::T>]) -> ArrayRef,
2545 {
2546 let all_options = vec![
2547 TestOptions::new(2, 100, 15),
2550 TestOptions::new(3, 25, 5),
2555 TestOptions::new(4, 100, 25),
2559 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2561 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2563 TestOptions::new(2, 256, 127).with_null_percent(0),
2565 TestOptions::new(2, 256, 93).with_null_percent(25),
2567 TestOptions::new(4, 100, 25).with_limit(0),
2569 TestOptions::new(4, 100, 25).with_limit(50),
2571 TestOptions::new(4, 100, 25).with_limit(10),
2573 TestOptions::new(4, 100, 25).with_limit(101),
2575 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2577 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2579 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2581 TestOptions::new(2, 256, 91)
2583 .with_null_percent(25)
2584 .with_enabled_statistics(EnabledStatistics::Chunk),
2585 TestOptions::new(2, 256, 91)
2587 .with_null_percent(25)
2588 .with_enabled_statistics(EnabledStatistics::None),
2589 TestOptions::new(2, 128, 91)
2591 .with_null_percent(100)
2592 .with_enabled_statistics(EnabledStatistics::None),
2593 TestOptions::new(2, 100, 15).with_row_selections(),
2598 TestOptions::new(3, 25, 5).with_row_selections(),
2603 TestOptions::new(4, 100, 25).with_row_selections(),
2607 TestOptions::new(3, 256, 73)
2609 .with_max_data_page_size(128)
2610 .with_row_selections(),
2611 TestOptions::new(3, 256, 57)
2613 .with_max_dict_page_size(128)
2614 .with_row_selections(),
2615 TestOptions::new(2, 256, 127)
2617 .with_null_percent(0)
2618 .with_row_selections(),
2619 TestOptions::new(2, 256, 93)
2621 .with_null_percent(25)
2622 .with_row_selections(),
2623 TestOptions::new(2, 256, 93)
2625 .with_null_percent(25)
2626 .with_row_selections()
2627 .with_limit(10),
2628 TestOptions::new(2, 256, 93)
2630 .with_null_percent(25)
2631 .with_row_selections()
2632 .with_offset(20)
2633 .with_limit(10),
2634 TestOptions::new(4, 100, 25).with_row_filter(),
2638 TestOptions::new(4, 100, 25)
2640 .with_row_selections()
2641 .with_row_filter(),
2642 TestOptions::new(2, 256, 93)
2644 .with_null_percent(25)
2645 .with_max_data_page_size(10)
2646 .with_row_filter(),
2647 TestOptions::new(2, 256, 93)
2649 .with_null_percent(25)
2650 .with_max_data_page_size(10)
2651 .with_row_selections()
2652 .with_row_filter(),
2653 TestOptions::new(2, 256, 93)
2655 .with_enabled_statistics(EnabledStatistics::None)
2656 .with_max_data_page_size(10)
2657 .with_row_selections(),
2658 ];
2659
2660 all_options.into_iter().for_each(|opts| {
2661 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2662 for encoding in encodings {
2663 let opts = TestOptions {
2664 writer_version,
2665 encoding: *encoding,
2666 ..opts.clone()
2667 };
2668
2669 single_column_reader_test::<T, _, G>(
2670 opts,
2671 rand_max,
2672 converted_type,
2673 arrow_type.clone(),
2674 &converter,
2675 )
2676 }
2677 }
2678 });
2679 }
2680
2681 fn single_column_reader_test<T, F, G>(
2685 opts: TestOptions,
2686 rand_max: i32,
2687 converted_type: ConvertedType,
2688 arrow_type: Option<ArrowDataType>,
2689 converter: F,
2690 ) where
2691 T: DataType,
2692 G: RandGen<T>,
2693 F: Fn(&[Option<T::T>]) -> ArrayRef,
2694 {
2695 println!(
2697 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2698 T::get_physical_type(), converted_type, arrow_type, opts
2699 );
2700
2701 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2703 Some(null_percent) => {
2704 let mut rng = rng();
2705
2706 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2707 .map(|_| {
2708 std::iter::from_fn(|| {
2709 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2710 })
2711 .take(opts.num_rows)
2712 .collect()
2713 })
2714 .collect();
2715 (Repetition::OPTIONAL, Some(def_levels))
2716 }
2717 None => (Repetition::REQUIRED, None),
2718 };
2719
2720 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2722 .map(|idx| {
2723 let null_count = match def_levels.as_ref() {
2724 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2725 None => 0,
2726 };
2727 G::gen_vec(rand_max, opts.num_rows - null_count)
2728 })
2729 .collect();
2730
2731 let len = match T::get_physical_type() {
2732 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2733 crate::basic::Type::INT96 => 12,
2734 _ => -1,
2735 };
2736
2737 let fields = vec![Arc::new(
2738 Type::primitive_type_builder("leaf", T::get_physical_type())
2739 .with_repetition(repetition)
2740 .with_converted_type(converted_type)
2741 .with_length(len)
2742 .build()
2743 .unwrap(),
2744 )];
2745
2746 let schema = Arc::new(
2747 Type::group_type_builder("test_schema")
2748 .with_fields(fields)
2749 .build()
2750 .unwrap(),
2751 );
2752
2753 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2754
2755 let mut file = tempfile::tempfile().unwrap();
2756
2757 generate_single_column_file_with_data::<T>(
2758 &values,
2759 def_levels.as_ref(),
2760 file.try_clone().unwrap(), schema,
2762 arrow_field,
2763 &opts,
2764 )
2765 .unwrap();
2766
2767 file.rewind().unwrap();
2768
2769 let options = ArrowReaderOptions::new()
2770 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2771
2772 let mut builder =
2773 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2774
2775 let expected_data = match opts.row_selections {
2776 Some((selections, row_count)) => {
2777 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2778
2779 let mut skip_data: Vec<Option<T::T>> = vec![];
2780 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2781 for select in dequeue {
2782 if select.skip {
2783 without_skip_data.drain(0..select.row_count);
2784 } else {
2785 skip_data.extend(without_skip_data.drain(0..select.row_count));
2786 }
2787 }
2788 builder = builder.with_row_selection(selections);
2789
2790 assert_eq!(skip_data.len(), row_count);
2791 skip_data
2792 }
2793 None => {
2794 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2796 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2797 expected_data
2798 }
2799 };
2800
2801 let mut expected_data = match opts.row_filter {
2802 Some(filter) => {
2803 let expected_data = expected_data
2804 .into_iter()
2805 .zip(filter.iter())
2806 .filter_map(|(d, f)| f.then(|| d))
2807 .collect();
2808
2809 let mut filter_offset = 0;
2810 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2811 ProjectionMask::all(),
2812 move |b| {
2813 let array = BooleanArray::from_iter(
2814 filter
2815 .iter()
2816 .skip(filter_offset)
2817 .take(b.num_rows())
2818 .map(|x| Some(*x)),
2819 );
2820 filter_offset += b.num_rows();
2821 Ok(array)
2822 },
2823 ))]);
2824
2825 builder = builder.with_row_filter(filter);
2826 expected_data
2827 }
2828 None => expected_data,
2829 };
2830
2831 if let Some(offset) = opts.offset {
2832 builder = builder.with_offset(offset);
2833 expected_data = expected_data.into_iter().skip(offset).collect();
2834 }
2835
2836 if let Some(limit) = opts.limit {
2837 builder = builder.with_limit(limit);
2838 expected_data = expected_data.into_iter().take(limit).collect();
2839 }
2840
2841 let mut record_reader = builder
2842 .with_batch_size(opts.record_batch_size)
2843 .build()
2844 .unwrap();
2845
2846 let mut total_read = 0;
2847 loop {
2848 let maybe_batch = record_reader.next();
2849 if total_read < expected_data.len() {
2850 let end = min(total_read + opts.record_batch_size, expected_data.len());
2851 let batch = maybe_batch.unwrap().unwrap();
2852 assert_eq!(end - total_read, batch.num_rows());
2853
2854 let a = converter(&expected_data[total_read..end]);
2855 let b = Arc::clone(batch.column(0));
2856
2857 assert_eq!(a.data_type(), b.data_type());
2858 assert_eq!(a.to_data(), b.to_data());
2859 assert_eq!(
2860 a.as_any().type_id(),
2861 b.as_any().type_id(),
2862 "incorrect type ids"
2863 );
2864
2865 total_read = end;
2866 } else {
2867 assert!(maybe_batch.is_none());
2868 break;
2869 }
2870 }
2871 }
2872
2873 fn gen_expected_data<T: DataType>(
2874 def_levels: Option<&Vec<Vec<i16>>>,
2875 values: &[Vec<T::T>],
2876 ) -> Vec<Option<T::T>> {
2877 let data: Vec<Option<T::T>> = match def_levels {
2878 Some(levels) => {
2879 let mut values_iter = values.iter().flatten();
2880 levels
2881 .iter()
2882 .flatten()
2883 .map(|d| match d {
2884 1 => Some(values_iter.next().cloned().unwrap()),
2885 0 => None,
2886 _ => unreachable!(),
2887 })
2888 .collect()
2889 }
2890 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2891 };
2892 data
2893 }
2894
2895 fn generate_single_column_file_with_data<T: DataType>(
2896 values: &[Vec<T::T>],
2897 def_levels: Option<&Vec<Vec<i16>>>,
2898 file: File,
2899 schema: TypePtr,
2900 field: Option<Field>,
2901 opts: &TestOptions,
2902 ) -> Result<crate::format::FileMetaData> {
2903 let mut writer_props = opts.writer_props();
2904 if let Some(field) = field {
2905 let arrow_schema = Schema::new(vec![field]);
2906 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2907 }
2908
2909 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2910
2911 for (idx, v) in values.iter().enumerate() {
2912 let def_levels = def_levels.map(|d| d[idx].as_slice());
2913 let mut row_group_writer = writer.next_row_group()?;
2914 {
2915 let mut column_writer = row_group_writer
2916 .next_column()?
2917 .expect("Column writer is none!");
2918
2919 column_writer
2920 .typed::<T>()
2921 .write_batch(v, def_levels, None)?;
2922
2923 column_writer.close()?;
2924 }
2925 row_group_writer.close()?;
2926 }
2927
2928 writer.close()
2929 }
2930
2931 fn get_test_file(file_name: &str) -> File {
2932 let mut path = PathBuf::new();
2933 path.push(arrow::util::test_util::arrow_test_data());
2934 path.push(file_name);
2935
2936 File::open(path.as_path()).expect("File not found!")
2937 }
2938
2939 #[test]
2940 fn test_read_structs() {
2941 let testdata = arrow::util::test_util::parquet_test_data();
2945 let path = format!("{testdata}/nested_structs.rust.parquet");
2946 let file = File::open(&path).unwrap();
2947 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2948
2949 for batch in record_batch_reader {
2950 batch.unwrap();
2951 }
2952
2953 let file = File::open(&path).unwrap();
2954 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2955
2956 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2957 let projected_reader = builder
2958 .with_projection(mask)
2959 .with_batch_size(60)
2960 .build()
2961 .unwrap();
2962
2963 let expected_schema = Schema::new(vec![
2964 Field::new(
2965 "roll_num",
2966 ArrowDataType::Struct(Fields::from(vec![Field::new(
2967 "count",
2968 ArrowDataType::UInt64,
2969 false,
2970 )])),
2971 false,
2972 ),
2973 Field::new(
2974 "PC_CUR",
2975 ArrowDataType::Struct(Fields::from(vec![
2976 Field::new("mean", ArrowDataType::Int64, false),
2977 Field::new("sum", ArrowDataType::Int64, false),
2978 ])),
2979 false,
2980 ),
2981 ]);
2982
2983 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2985
2986 for batch in projected_reader {
2987 let batch = batch.unwrap();
2988 assert_eq!(batch.schema().as_ref(), &expected_schema);
2989 }
2990 }
2991
2992 #[test]
2993 fn test_read_structs_by_name() {
2995 let testdata = arrow::util::test_util::parquet_test_data();
2996 let path = format!("{testdata}/nested_structs.rust.parquet");
2997 let file = File::open(&path).unwrap();
2998 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2999
3000 for batch in record_batch_reader {
3001 batch.unwrap();
3002 }
3003
3004 let file = File::open(&path).unwrap();
3005 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3006
3007 let mask = ProjectionMask::columns(
3008 builder.parquet_schema(),
3009 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3010 );
3011 let projected_reader = builder
3012 .with_projection(mask)
3013 .with_batch_size(60)
3014 .build()
3015 .unwrap();
3016
3017 let expected_schema = Schema::new(vec![
3018 Field::new(
3019 "roll_num",
3020 ArrowDataType::Struct(Fields::from(vec![Field::new(
3021 "count",
3022 ArrowDataType::UInt64,
3023 false,
3024 )])),
3025 false,
3026 ),
3027 Field::new(
3028 "PC_CUR",
3029 ArrowDataType::Struct(Fields::from(vec![
3030 Field::new("mean", ArrowDataType::Int64, false),
3031 Field::new("sum", ArrowDataType::Int64, false),
3032 ])),
3033 false,
3034 ),
3035 ]);
3036
3037 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3038
3039 for batch in projected_reader {
3040 let batch = batch.unwrap();
3041 assert_eq!(batch.schema().as_ref(), &expected_schema);
3042 }
3043 }
3044
3045 #[test]
3046 fn test_read_maps() {
3047 let testdata = arrow::util::test_util::parquet_test_data();
3048 let path = format!("{testdata}/nested_maps.snappy.parquet");
3049 let file = File::open(path).unwrap();
3050 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3051
3052 for batch in record_batch_reader {
3053 batch.unwrap();
3054 }
3055 }
3056
3057 #[test]
3058 fn test_nested_nullability() {
3059 let message_type = "message nested {
3060 OPTIONAL Group group {
3061 REQUIRED INT32 leaf;
3062 }
3063 }";
3064
3065 let file = tempfile::tempfile().unwrap();
3066 let schema = Arc::new(parse_message_type(message_type).unwrap());
3067
3068 {
3069 let mut writer =
3071 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3072 .unwrap();
3073
3074 {
3075 let mut row_group_writer = writer.next_row_group().unwrap();
3076 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3077
3078 column_writer
3079 .typed::<Int32Type>()
3080 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3081 .unwrap();
3082
3083 column_writer.close().unwrap();
3084 row_group_writer.close().unwrap();
3085 }
3086
3087 writer.close().unwrap();
3088 }
3089
3090 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3091 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3092
3093 let reader = builder.with_projection(mask).build().unwrap();
3094
3095 let expected_schema = Schema::new(Fields::from(vec![Field::new(
3096 "group",
3097 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3098 true,
3099 )]));
3100
3101 let batch = reader.into_iter().next().unwrap().unwrap();
3102 assert_eq!(batch.schema().as_ref(), &expected_schema);
3103 assert_eq!(batch.num_rows(), 4);
3104 assert_eq!(batch.column(0).null_count(), 2);
3105 }
3106
3107 #[test]
3108 fn test_invalid_utf8() {
3109 let data = vec![
3111 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3112 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3113 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3114 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3115 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3116 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3117 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3118 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3119 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3120 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3121 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3122 82, 49,
3123 ];
3124
3125 let file = Bytes::from(data);
3126 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3127
3128 let error = record_batch_reader.next().unwrap().unwrap_err();
3129
3130 assert!(
3131 error.to_string().contains("invalid utf-8 sequence"),
3132 "{}",
3133 error
3134 );
3135 }
3136
3137 #[test]
3138 fn test_invalid_utf8_string_array() {
3139 test_invalid_utf8_string_array_inner::<i32>();
3140 }
3141
3142 #[test]
3143 fn test_invalid_utf8_large_string_array() {
3144 test_invalid_utf8_string_array_inner::<i64>();
3145 }
3146
3147 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3148 let cases = [
3149 invalid_utf8_first_char::<O>(),
3150 invalid_utf8_first_char_long_strings::<O>(),
3151 invalid_utf8_later_char::<O>(),
3152 invalid_utf8_later_char_long_strings::<O>(),
3153 invalid_utf8_later_char_really_long_strings::<O>(),
3154 invalid_utf8_later_char_really_long_strings2::<O>(),
3155 ];
3156 for array in &cases {
3157 for encoding in STRING_ENCODINGS {
3158 let array = unsafe {
3161 GenericStringArray::<O>::new_unchecked(
3162 array.offsets().clone(),
3163 array.values().clone(),
3164 array.nulls().cloned(),
3165 )
3166 };
3167 let data_type = array.data_type().clone();
3168 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3169 let err = read_from_parquet(data).unwrap_err();
3170 let expected_err =
3171 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3172 assert!(
3173 err.to_string().contains(expected_err),
3174 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3175 );
3176 }
3177 }
3178 }
3179
3180 #[test]
3181 fn test_invalid_utf8_string_view_array() {
3182 let cases = [
3183 invalid_utf8_first_char::<i32>(),
3184 invalid_utf8_first_char_long_strings::<i32>(),
3185 invalid_utf8_later_char::<i32>(),
3186 invalid_utf8_later_char_long_strings::<i32>(),
3187 invalid_utf8_later_char_really_long_strings::<i32>(),
3188 invalid_utf8_later_char_really_long_strings2::<i32>(),
3189 ];
3190
3191 for encoding in STRING_ENCODINGS {
3192 for array in &cases {
3193 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3194 let array = array.as_binary_view();
3195
3196 let array = unsafe {
3199 StringViewArray::new_unchecked(
3200 array.views().clone(),
3201 array.data_buffers().to_vec(),
3202 array.nulls().cloned(),
3203 )
3204 };
3205
3206 let data_type = array.data_type().clone();
3207 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3208 let err = read_from_parquet(data).unwrap_err();
3209 let expected_err =
3210 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3211 assert!(
3212 err.to_string().contains(expected_err),
3213 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
3214 );
3215 }
3216 }
3217 }
3218
3219 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3221 None,
3222 Some(Encoding::PLAIN),
3223 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3224 Some(Encoding::DELTA_BYTE_ARRAY),
3225 ];
3226
3227 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3230
3231 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3234
3235 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3237 let valid: &[u8] = b" ";
3238 let invalid = INVALID_UTF8_FIRST_CHAR;
3239 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3240 }
3241
3242 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3246 let valid: &[u8] = b" ";
3247 let mut invalid = vec![];
3248 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3249 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3250 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3251 }
3252
3253 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3256 let valid: &[u8] = b" ";
3257 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3258 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3259 }
3260
3261 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3265 let valid: &[u8] = b" ";
3266 let mut invalid = vec![];
3267 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3268 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3269 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3270 }
3271
3272 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3276 let valid: &[u8] = b" ";
3277 let mut invalid = vec![];
3278 for _ in 0..10 {
3279 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3281 }
3282 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3283 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3284 }
3285
3286 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3289 let valid: &[u8] = b" ";
3290 let mut valid_long = vec![];
3291 for _ in 0..10 {
3292 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3294 }
3295 let invalid = INVALID_UTF8_LATER_CHAR;
3296 GenericBinaryArray::<O>::from_iter(vec![
3297 None,
3298 Some(valid),
3299 Some(invalid),
3300 None,
3301 Some(&valid_long),
3302 Some(valid),
3303 ])
3304 }
3305
3306 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3311 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3312 let mut data = vec![];
3313 let schema = batch.schema();
3314 let props = encoding.map(|encoding| {
3315 WriterProperties::builder()
3316 .set_dictionary_enabled(false)
3318 .set_encoding(encoding)
3319 .build()
3320 });
3321
3322 {
3323 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3324 writer.write(&batch).unwrap();
3325 writer.flush().unwrap();
3326 writer.close().unwrap();
3327 };
3328 data
3329 }
3330
3331 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3333 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3334 .unwrap()
3335 .build()
3336 .unwrap();
3337
3338 reader.collect()
3339 }
3340
3341 #[test]
3342 fn test_dictionary_preservation() {
3343 let fields = vec![Arc::new(
3344 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3345 .with_repetition(Repetition::OPTIONAL)
3346 .with_converted_type(ConvertedType::UTF8)
3347 .build()
3348 .unwrap(),
3349 )];
3350
3351 let schema = Arc::new(
3352 Type::group_type_builder("test_schema")
3353 .with_fields(fields)
3354 .build()
3355 .unwrap(),
3356 );
3357
3358 let dict_type = ArrowDataType::Dictionary(
3359 Box::new(ArrowDataType::Int32),
3360 Box::new(ArrowDataType::Utf8),
3361 );
3362
3363 let arrow_field = Field::new("leaf", dict_type, true);
3364
3365 let mut file = tempfile::tempfile().unwrap();
3366
3367 let values = vec![
3368 vec![
3369 ByteArray::from("hello"),
3370 ByteArray::from("a"),
3371 ByteArray::from("b"),
3372 ByteArray::from("d"),
3373 ],
3374 vec![
3375 ByteArray::from("c"),
3376 ByteArray::from("a"),
3377 ByteArray::from("b"),
3378 ],
3379 ];
3380
3381 let def_levels = vec![
3382 vec![1, 0, 0, 1, 0, 0, 1, 1],
3383 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3384 ];
3385
3386 let opts = TestOptions {
3387 encoding: Encoding::RLE_DICTIONARY,
3388 ..Default::default()
3389 };
3390
3391 generate_single_column_file_with_data::<ByteArrayType>(
3392 &values,
3393 Some(&def_levels),
3394 file.try_clone().unwrap(), schema,
3396 Some(arrow_field),
3397 &opts,
3398 )
3399 .unwrap();
3400
3401 file.rewind().unwrap();
3402
3403 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3404
3405 let batches = record_reader
3406 .collect::<Result<Vec<RecordBatch>, _>>()
3407 .unwrap();
3408
3409 assert_eq!(batches.len(), 6);
3410 assert!(batches.iter().all(|x| x.num_columns() == 1));
3411
3412 let row_counts = batches
3413 .iter()
3414 .map(|x| (x.num_rows(), x.column(0).null_count()))
3415 .collect::<Vec<_>>();
3416
3417 assert_eq!(
3418 row_counts,
3419 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3420 );
3421
3422 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3423
3424 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3426 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3428 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3429 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3431 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3432 }
3433
3434 #[test]
3435 fn test_read_null_list() {
3436 let testdata = arrow::util::test_util::parquet_test_data();
3437 let path = format!("{testdata}/null_list.parquet");
3438 let file = File::open(path).unwrap();
3439 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3440
3441 let batch = record_batch_reader.next().unwrap().unwrap();
3442 assert_eq!(batch.num_rows(), 1);
3443 assert_eq!(batch.num_columns(), 1);
3444 assert_eq!(batch.column(0).len(), 1);
3445
3446 let list = batch
3447 .column(0)
3448 .as_any()
3449 .downcast_ref::<ListArray>()
3450 .unwrap();
3451 assert_eq!(list.len(), 1);
3452 assert!(list.is_valid(0));
3453
3454 let val = list.value(0);
3455 assert_eq!(val.len(), 0);
3456 }
3457
3458 #[test]
3459 fn test_null_schema_inference() {
3460 let testdata = arrow::util::test_util::parquet_test_data();
3461 let path = format!("{testdata}/null_list.parquet");
3462 let file = File::open(path).unwrap();
3463
3464 let arrow_field = Field::new(
3465 "emptylist",
3466 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3467 true,
3468 );
3469
3470 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3471 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3472 let schema = builder.schema();
3473 assert_eq!(schema.fields().len(), 1);
3474 assert_eq!(schema.field(0), &arrow_field);
3475 }
3476
3477 #[test]
3478 fn test_skip_metadata() {
3479 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3480 let field = Field::new("col", col.data_type().clone(), true);
3481
3482 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3483
3484 let metadata = [("key".to_string(), "value".to_string())]
3485 .into_iter()
3486 .collect();
3487
3488 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3489
3490 assert_ne!(schema_with_metadata, schema_without_metadata);
3491
3492 let batch =
3493 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3494
3495 let file = |version: WriterVersion| {
3496 let props = WriterProperties::builder()
3497 .set_writer_version(version)
3498 .build();
3499
3500 let file = tempfile().unwrap();
3501 let mut writer =
3502 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3503 .unwrap();
3504 writer.write(&batch).unwrap();
3505 writer.close().unwrap();
3506 file
3507 };
3508
3509 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3510
3511 let v1_reader = file(WriterVersion::PARQUET_1_0);
3512 let v2_reader = file(WriterVersion::PARQUET_2_0);
3513
3514 let arrow_reader =
3515 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3516 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3517
3518 let reader =
3519 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3520 .unwrap()
3521 .build()
3522 .unwrap();
3523 assert_eq!(reader.schema(), schema_without_metadata);
3524
3525 let arrow_reader =
3526 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3527 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3528
3529 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3530 .unwrap()
3531 .build()
3532 .unwrap();
3533 assert_eq!(reader.schema(), schema_without_metadata);
3534 }
3535
3536 fn write_parquet_from_iter<I, F>(value: I) -> File
3537 where
3538 I: IntoIterator<Item = (F, ArrayRef)>,
3539 F: AsRef<str>,
3540 {
3541 let batch = RecordBatch::try_from_iter(value).unwrap();
3542 let file = tempfile().unwrap();
3543 let mut writer =
3544 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3545 writer.write(&batch).unwrap();
3546 writer.close().unwrap();
3547 file
3548 }
3549
3550 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3551 where
3552 I: IntoIterator<Item = (F, ArrayRef)>,
3553 F: AsRef<str>,
3554 {
3555 let file = write_parquet_from_iter(value);
3556 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3557 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3558 file.try_clone().unwrap(),
3559 options_with_schema,
3560 );
3561 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3562 }
3563
3564 #[test]
3565 fn test_schema_too_few_columns() {
3566 run_schema_test_with_error(
3567 vec![
3568 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3569 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3570 ],
3571 Arc::new(Schema::new(vec![Field::new(
3572 "int64",
3573 ArrowDataType::Int64,
3574 false,
3575 )])),
3576 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3577 );
3578 }
3579
3580 #[test]
3581 fn test_schema_too_many_columns() {
3582 run_schema_test_with_error(
3583 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3584 Arc::new(Schema::new(vec![
3585 Field::new("int64", ArrowDataType::Int64, false),
3586 Field::new("int32", ArrowDataType::Int32, false),
3587 ])),
3588 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3589 );
3590 }
3591
3592 #[test]
3593 fn test_schema_mismatched_column_names() {
3594 run_schema_test_with_error(
3595 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3596 Arc::new(Schema::new(vec![Field::new(
3597 "other",
3598 ArrowDataType::Int64,
3599 false,
3600 )])),
3601 "Arrow: incompatible arrow schema, expected field named int64 got other",
3602 );
3603 }
3604
3605 #[test]
3606 fn test_schema_incompatible_columns() {
3607 run_schema_test_with_error(
3608 vec![
3609 (
3610 "col1_invalid",
3611 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3612 ),
3613 (
3614 "col2_valid",
3615 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3616 ),
3617 (
3618 "col3_invalid",
3619 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3620 ),
3621 ],
3622 Arc::new(Schema::new(vec![
3623 Field::new("col1_invalid", ArrowDataType::Int32, false),
3624 Field::new("col2_valid", ArrowDataType::Int32, false),
3625 Field::new("col3_invalid", ArrowDataType::Int32, false),
3626 ])),
3627 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3628 );
3629 }
3630
3631 #[test]
3632 fn test_one_incompatible_nested_column() {
3633 let nested_fields = Fields::from(vec![
3634 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3635 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3636 ]);
3637 let nested = StructArray::try_new(
3638 nested_fields,
3639 vec![
3640 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3641 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3642 ],
3643 None,
3644 )
3645 .expect("struct array");
3646 let supplied_nested_fields = Fields::from(vec![
3647 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3648 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3649 ]);
3650 run_schema_test_with_error(
3651 vec![
3652 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3653 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3654 ("nested", Arc::new(nested) as ArrayRef),
3655 ],
3656 Arc::new(Schema::new(vec![
3657 Field::new("col1", ArrowDataType::Int64, false),
3658 Field::new("col2", ArrowDataType::Int32, false),
3659 Field::new(
3660 "nested",
3661 ArrowDataType::Struct(supplied_nested_fields),
3662 false,
3663 ),
3664 ])),
3665 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3666 requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3667 but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
3668 );
3669 }
3670
3671 fn utf8_parquet() -> Bytes {
3673 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3674 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3675 let props = None;
3676 let mut parquet_data = vec![];
3678 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3679 writer.write(&batch).unwrap();
3680 writer.close().unwrap();
3681 Bytes::from(parquet_data)
3682 }
3683
3684 #[test]
3685 fn test_schema_error_bad_types() {
3686 let parquet_data = utf8_parquet();
3688
3689 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3691 "column1",
3692 arrow::datatypes::DataType::Int32,
3693 false,
3694 )]));
3695
3696 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3698 let err =
3699 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3700 .unwrap_err();
3701 assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3702 }
3703
3704 #[test]
3705 fn test_schema_error_bad_nullability() {
3706 let parquet_data = utf8_parquet();
3708
3709 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3711 "column1",
3712 arrow::datatypes::DataType::Utf8,
3713 true,
3714 )]));
3715
3716 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3718 let err =
3719 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3720 .unwrap_err();
3721 assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3722 }
3723
3724 #[test]
3725 fn test_read_binary_as_utf8() {
3726 let file = write_parquet_from_iter(vec![
3727 (
3728 "binary_to_utf8",
3729 Arc::new(BinaryArray::from(vec![
3730 b"one".as_ref(),
3731 b"two".as_ref(),
3732 b"three".as_ref(),
3733 ])) as ArrayRef,
3734 ),
3735 (
3736 "large_binary_to_large_utf8",
3737 Arc::new(LargeBinaryArray::from(vec![
3738 b"one".as_ref(),
3739 b"two".as_ref(),
3740 b"three".as_ref(),
3741 ])) as ArrayRef,
3742 ),
3743 (
3744 "binary_view_to_utf8_view",
3745 Arc::new(BinaryViewArray::from(vec![
3746 b"one".as_ref(),
3747 b"two".as_ref(),
3748 b"three".as_ref(),
3749 ])) as ArrayRef,
3750 ),
3751 ]);
3752 let supplied_fields = Fields::from(vec![
3753 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3754 Field::new(
3755 "large_binary_to_large_utf8",
3756 ArrowDataType::LargeUtf8,
3757 false,
3758 ),
3759 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3760 ]);
3761
3762 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3763 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3764 file.try_clone().unwrap(),
3765 options,
3766 )
3767 .expect("reader builder with schema")
3768 .build()
3769 .expect("reader with schema");
3770
3771 let batch = arrow_reader.next().unwrap().unwrap();
3772 assert_eq!(batch.num_columns(), 3);
3773 assert_eq!(batch.num_rows(), 3);
3774 assert_eq!(
3775 batch
3776 .column(0)
3777 .as_string::<i32>()
3778 .iter()
3779 .collect::<Vec<_>>(),
3780 vec![Some("one"), Some("two"), Some("three")]
3781 );
3782
3783 assert_eq!(
3784 batch
3785 .column(1)
3786 .as_string::<i64>()
3787 .iter()
3788 .collect::<Vec<_>>(),
3789 vec![Some("one"), Some("two"), Some("three")]
3790 );
3791
3792 assert_eq!(
3793 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3794 vec![Some("one"), Some("two"), Some("three")]
3795 );
3796 }
3797
3798 #[test]
3799 #[should_panic(expected = "Invalid UTF8 sequence at")]
3800 fn test_read_non_utf8_binary_as_utf8() {
3801 let file = write_parquet_from_iter(vec![(
3802 "non_utf8_binary",
3803 Arc::new(BinaryArray::from(vec![
3804 b"\xDE\x00\xFF".as_ref(),
3805 b"\xDE\x01\xAA".as_ref(),
3806 b"\xDE\x02\xFF".as_ref(),
3807 ])) as ArrayRef,
3808 )]);
3809 let supplied_fields = Fields::from(vec![Field::new(
3810 "non_utf8_binary",
3811 ArrowDataType::Utf8,
3812 false,
3813 )]);
3814
3815 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3816 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3817 file.try_clone().unwrap(),
3818 options,
3819 )
3820 .expect("reader builder with schema")
3821 .build()
3822 .expect("reader with schema");
3823 arrow_reader.next().unwrap().unwrap_err();
3824 }
3825
3826 #[test]
3827 fn test_with_schema() {
3828 let nested_fields = Fields::from(vec![
3829 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3830 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3831 ]);
3832
3833 let nested_arrays: Vec<ArrayRef> = vec![
3834 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3835 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3836 ];
3837
3838 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3839
3840 let file = write_parquet_from_iter(vec![
3841 (
3842 "int32_to_ts_second",
3843 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3844 ),
3845 (
3846 "date32_to_date64",
3847 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3848 ),
3849 ("nested", Arc::new(nested) as ArrayRef),
3850 ]);
3851
3852 let supplied_nested_fields = Fields::from(vec![
3853 Field::new(
3854 "utf8_to_dict",
3855 ArrowDataType::Dictionary(
3856 Box::new(ArrowDataType::Int32),
3857 Box::new(ArrowDataType::Utf8),
3858 ),
3859 false,
3860 ),
3861 Field::new(
3862 "int64_to_ts_nano",
3863 ArrowDataType::Timestamp(
3864 arrow::datatypes::TimeUnit::Nanosecond,
3865 Some("+10:00".into()),
3866 ),
3867 false,
3868 ),
3869 ]);
3870
3871 let supplied_schema = Arc::new(Schema::new(vec![
3872 Field::new(
3873 "int32_to_ts_second",
3874 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3875 false,
3876 ),
3877 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3878 Field::new(
3879 "nested",
3880 ArrowDataType::Struct(supplied_nested_fields),
3881 false,
3882 ),
3883 ]));
3884
3885 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3886 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3887 file.try_clone().unwrap(),
3888 options,
3889 )
3890 .expect("reader builder with schema")
3891 .build()
3892 .expect("reader with schema");
3893
3894 assert_eq!(arrow_reader.schema(), supplied_schema);
3895 let batch = arrow_reader.next().unwrap().unwrap();
3896 assert_eq!(batch.num_columns(), 3);
3897 assert_eq!(batch.num_rows(), 4);
3898 assert_eq!(
3899 batch
3900 .column(0)
3901 .as_any()
3902 .downcast_ref::<TimestampSecondArray>()
3903 .expect("downcast to timestamp second")
3904 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3905 .map(|v| v.to_string())
3906 .expect("value as datetime"),
3907 "1970-01-01 01:00:00 +01:00"
3908 );
3909 assert_eq!(
3910 batch
3911 .column(1)
3912 .as_any()
3913 .downcast_ref::<Date64Array>()
3914 .expect("downcast to date64")
3915 .value_as_date(0)
3916 .map(|v| v.to_string())
3917 .expect("value as date"),
3918 "1970-01-01"
3919 );
3920
3921 let nested = batch
3922 .column(2)
3923 .as_any()
3924 .downcast_ref::<StructArray>()
3925 .expect("downcast to struct");
3926
3927 let nested_dict = nested
3928 .column(0)
3929 .as_any()
3930 .downcast_ref::<Int32DictionaryArray>()
3931 .expect("downcast to dictionary");
3932
3933 assert_eq!(
3934 nested_dict
3935 .values()
3936 .as_any()
3937 .downcast_ref::<StringArray>()
3938 .expect("downcast to string")
3939 .iter()
3940 .collect::<Vec<_>>(),
3941 vec![Some("a"), Some("b")]
3942 );
3943
3944 assert_eq!(
3945 nested_dict.keys().iter().collect::<Vec<_>>(),
3946 vec![Some(0), Some(0), Some(0), Some(1)]
3947 );
3948
3949 assert_eq!(
3950 nested
3951 .column(1)
3952 .as_any()
3953 .downcast_ref::<TimestampNanosecondArray>()
3954 .expect("downcast to timestamp nanosecond")
3955 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3956 .map(|v| v.to_string())
3957 .expect("value as datetime"),
3958 "1970-01-01 10:00:00.000000001 +10:00"
3959 );
3960 }
3961
3962 #[test]
3963 fn test_empty_projection() {
3964 let testdata = arrow::util::test_util::parquet_test_data();
3965 let path = format!("{testdata}/alltypes_plain.parquet");
3966 let file = File::open(path).unwrap();
3967
3968 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3969 let file_metadata = builder.metadata().file_metadata();
3970 let expected_rows = file_metadata.num_rows() as usize;
3971
3972 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3973 let batch_reader = builder
3974 .with_projection(mask)
3975 .with_batch_size(2)
3976 .build()
3977 .unwrap();
3978
3979 let mut total_rows = 0;
3980 for maybe_batch in batch_reader {
3981 let batch = maybe_batch.unwrap();
3982 total_rows += batch.num_rows();
3983 assert_eq!(batch.num_columns(), 0);
3984 assert!(batch.num_rows() <= 2);
3985 }
3986
3987 assert_eq!(total_rows, expected_rows);
3988 }
3989
3990 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3991 let schema = Arc::new(Schema::new(vec![Field::new(
3992 "list",
3993 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3994 true,
3995 )]));
3996
3997 let mut buf = Vec::with_capacity(1024);
3998
3999 let mut writer = ArrowWriter::try_new(
4000 &mut buf,
4001 schema.clone(),
4002 Some(
4003 WriterProperties::builder()
4004 .set_max_row_group_size(row_group_size)
4005 .build(),
4006 ),
4007 )
4008 .unwrap();
4009 for _ in 0..2 {
4010 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4011 for _ in 0..(batch_size) {
4012 list_builder.append(true);
4013 }
4014 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4015 .unwrap();
4016 writer.write(&batch).unwrap();
4017 }
4018 writer.close().unwrap();
4019
4020 let mut record_reader =
4021 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4022 assert_eq!(
4023 batch_size,
4024 record_reader.next().unwrap().unwrap().num_rows()
4025 );
4026 assert_eq!(
4027 batch_size,
4028 record_reader.next().unwrap().unwrap().num_rows()
4029 );
4030 }
4031
4032 #[test]
4033 fn test_row_group_exact_multiple() {
4034 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4035 test_row_group_batch(8, 8);
4036 test_row_group_batch(10, 8);
4037 test_row_group_batch(8, 10);
4038 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4039 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4040 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4041 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4042 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4043 }
4044
4045 fn get_expected_batches(
4048 column: &RecordBatch,
4049 selection: &RowSelection,
4050 batch_size: usize,
4051 ) -> Vec<RecordBatch> {
4052 let mut expected_batches = vec![];
4053
4054 let mut selection: VecDeque<_> = selection.clone().into();
4055 let mut row_offset = 0;
4056 let mut last_start = None;
4057 while row_offset < column.num_rows() && !selection.is_empty() {
4058 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4059 while batch_remaining > 0 && !selection.is_empty() {
4060 let (to_read, skip) = match selection.front_mut() {
4061 Some(selection) if selection.row_count > batch_remaining => {
4062 selection.row_count -= batch_remaining;
4063 (batch_remaining, selection.skip)
4064 }
4065 Some(_) => {
4066 let select = selection.pop_front().unwrap();
4067 (select.row_count, select.skip)
4068 }
4069 None => break,
4070 };
4071
4072 batch_remaining -= to_read;
4073
4074 match skip {
4075 true => {
4076 if let Some(last_start) = last_start.take() {
4077 expected_batches.push(column.slice(last_start, row_offset - last_start))
4078 }
4079 row_offset += to_read
4080 }
4081 false => {
4082 last_start.get_or_insert(row_offset);
4083 row_offset += to_read
4084 }
4085 }
4086 }
4087 }
4088
4089 if let Some(last_start) = last_start.take() {
4090 expected_batches.push(column.slice(last_start, row_offset - last_start))
4091 }
4092
4093 for batch in &expected_batches[..expected_batches.len() - 1] {
4095 assert_eq!(batch.num_rows(), batch_size);
4096 }
4097
4098 expected_batches
4099 }
4100
4101 fn create_test_selection(
4102 step_len: usize,
4103 total_len: usize,
4104 skip_first: bool,
4105 ) -> (RowSelection, usize) {
4106 let mut remaining = total_len;
4107 let mut skip = skip_first;
4108 let mut vec = vec![];
4109 let mut selected_count = 0;
4110 while remaining != 0 {
4111 let step = if remaining > step_len {
4112 step_len
4113 } else {
4114 remaining
4115 };
4116 vec.push(RowSelector {
4117 row_count: step,
4118 skip,
4119 });
4120 remaining -= step;
4121 if !skip {
4122 selected_count += step;
4123 }
4124 skip = !skip;
4125 }
4126 (vec.into(), selected_count)
4127 }
4128
4129 #[test]
4130 fn test_scan_row_with_selection() {
4131 let testdata = arrow::util::test_util::parquet_test_data();
4132 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4133 let test_file = File::open(&path).unwrap();
4134
4135 let mut serial_reader =
4136 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4137 let data = serial_reader.next().unwrap().unwrap();
4138
4139 let do_test = |batch_size: usize, selection_len: usize| {
4140 for skip_first in [false, true] {
4141 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4142
4143 let expected = get_expected_batches(&data, &selections, batch_size);
4144 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4145 assert_eq!(
4146 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4147 expected,
4148 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4149 );
4150 }
4151 };
4152
4153 do_test(1000, 1000);
4156
4157 do_test(20, 20);
4159
4160 do_test(20, 5);
4162
4163 do_test(20, 5);
4166
4167 fn create_skip_reader(
4168 test_file: &File,
4169 batch_size: usize,
4170 selections: RowSelection,
4171 ) -> ParquetRecordBatchReader {
4172 let options = ArrowReaderOptions::new().with_page_index(true);
4173 let file = test_file.try_clone().unwrap();
4174 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4175 .unwrap()
4176 .with_batch_size(batch_size)
4177 .with_row_selection(selections)
4178 .build()
4179 .unwrap()
4180 }
4181 }
4182
4183 #[test]
4184 fn test_batch_size_overallocate() {
4185 let testdata = arrow::util::test_util::parquet_test_data();
4186 let path = format!("{testdata}/alltypes_plain.parquet");
4188 let test_file = File::open(path).unwrap();
4189
4190 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4191 let num_rows = builder.metadata.file_metadata().num_rows();
4192 let reader = builder
4193 .with_batch_size(1024)
4194 .with_projection(ProjectionMask::all())
4195 .build()
4196 .unwrap();
4197 assert_ne!(1024, num_rows);
4198 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4199 }
4200
4201 #[test]
4202 fn test_read_with_page_index_enabled() {
4203 let testdata = arrow::util::test_util::parquet_test_data();
4204
4205 {
4206 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4208 let test_file = File::open(path).unwrap();
4209 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4210 test_file,
4211 ArrowReaderOptions::new().with_page_index(true),
4212 )
4213 .unwrap();
4214 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4215 let reader = builder.build().unwrap();
4216 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4217 assert_eq!(batches.len(), 8);
4218 }
4219
4220 {
4221 let path = format!("{testdata}/alltypes_plain.parquet");
4223 let test_file = File::open(path).unwrap();
4224 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4225 test_file,
4226 ArrowReaderOptions::new().with_page_index(true),
4227 )
4228 .unwrap();
4229 assert!(builder.metadata().offset_index().is_none());
4232 let reader = builder.build().unwrap();
4233 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4234 assert_eq!(batches.len(), 1);
4235 }
4236 }
4237
4238 #[test]
4239 fn test_raw_repetition() {
4240 const MESSAGE_TYPE: &str = "
4241 message Log {
4242 OPTIONAL INT32 eventType;
4243 REPEATED INT32 category;
4244 REPEATED group filter {
4245 OPTIONAL INT32 error;
4246 }
4247 }
4248 ";
4249 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4250 let props = Default::default();
4251
4252 let mut buf = Vec::with_capacity(1024);
4253 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4254 let mut row_group_writer = writer.next_row_group().unwrap();
4255
4256 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4258 col_writer
4259 .typed::<Int32Type>()
4260 .write_batch(&[1], Some(&[1]), None)
4261 .unwrap();
4262 col_writer.close().unwrap();
4263 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4265 col_writer
4266 .typed::<Int32Type>()
4267 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4268 .unwrap();
4269 col_writer.close().unwrap();
4270 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4272 col_writer
4273 .typed::<Int32Type>()
4274 .write_batch(&[1], Some(&[1]), Some(&[0]))
4275 .unwrap();
4276 col_writer.close().unwrap();
4277
4278 let rg_md = row_group_writer.close().unwrap();
4279 assert_eq!(rg_md.num_rows(), 1);
4280 writer.close().unwrap();
4281
4282 let bytes = Bytes::from(buf);
4283
4284 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4285 let full = no_mask.next().unwrap().unwrap();
4286
4287 assert_eq!(full.num_columns(), 3);
4288
4289 for idx in 0..3 {
4290 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4291 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4292 let mut reader = b.with_projection(mask).build().unwrap();
4293 let projected = reader.next().unwrap().unwrap();
4294
4295 assert_eq!(projected.num_columns(), 1);
4296 assert_eq!(full.column(idx), projected.column(0));
4297 }
4298 }
4299
4300 #[test]
4301 fn test_read_lz4_raw() {
4302 let testdata = arrow::util::test_util::parquet_test_data();
4303 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4304 let file = File::open(path).unwrap();
4305
4306 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4307 .unwrap()
4308 .collect::<Result<Vec<_>, _>>()
4309 .unwrap();
4310 assert_eq!(batches.len(), 1);
4311 let batch = &batches[0];
4312
4313 assert_eq!(batch.num_columns(), 3);
4314 assert_eq!(batch.num_rows(), 4);
4315
4316 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4318 assert_eq!(
4319 a.values(),
4320 &[1593604800, 1593604800, 1593604801, 1593604801]
4321 );
4322
4323 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4324 let a: Vec<_> = a.iter().flatten().collect();
4325 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4326
4327 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4328 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4329 }
4330
4331 #[test]
4341 fn test_read_lz4_hadoop_fallback() {
4342 for file in [
4343 "hadoop_lz4_compressed.parquet",
4344 "non_hadoop_lz4_compressed.parquet",
4345 ] {
4346 let testdata = arrow::util::test_util::parquet_test_data();
4347 let path = format!("{testdata}/{file}");
4348 let file = File::open(path).unwrap();
4349 let expected_rows = 4;
4350
4351 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4352 .unwrap()
4353 .collect::<Result<Vec<_>, _>>()
4354 .unwrap();
4355 assert_eq!(batches.len(), 1);
4356 let batch = &batches[0];
4357
4358 assert_eq!(batch.num_columns(), 3);
4359 assert_eq!(batch.num_rows(), expected_rows);
4360
4361 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4362 assert_eq!(
4363 a.values(),
4364 &[1593604800, 1593604800, 1593604801, 1593604801]
4365 );
4366
4367 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4368 let b: Vec<_> = b.iter().flatten().collect();
4369 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4370
4371 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4372 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4373 }
4374 }
4375
4376 #[test]
4377 fn test_read_lz4_hadoop_large() {
4378 let testdata = arrow::util::test_util::parquet_test_data();
4379 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4380 let file = File::open(path).unwrap();
4381 let expected_rows = 10000;
4382
4383 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4384 .unwrap()
4385 .collect::<Result<Vec<_>, _>>()
4386 .unwrap();
4387 assert_eq!(batches.len(), 1);
4388 let batch = &batches[0];
4389
4390 assert_eq!(batch.num_columns(), 1);
4391 assert_eq!(batch.num_rows(), expected_rows);
4392
4393 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4394 let a: Vec<_> = a.iter().flatten().collect();
4395 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4396 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4397 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4398 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4399 }
4400
4401 #[test]
4402 #[cfg(feature = "snap")]
4403 fn test_read_nested_lists() {
4404 let testdata = arrow::util::test_util::parquet_test_data();
4405 let path = format!("{testdata}/nested_lists.snappy.parquet");
4406 let file = File::open(path).unwrap();
4407
4408 let f = file.try_clone().unwrap();
4409 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4410 let expected = reader.next().unwrap().unwrap();
4411 assert_eq!(expected.num_rows(), 3);
4412
4413 let selection = RowSelection::from(vec![
4414 RowSelector::skip(1),
4415 RowSelector::select(1),
4416 RowSelector::skip(1),
4417 ]);
4418 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4419 .unwrap()
4420 .with_row_selection(selection)
4421 .build()
4422 .unwrap();
4423
4424 let actual = reader.next().unwrap().unwrap();
4425 assert_eq!(actual.num_rows(), 1);
4426 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4427 }
4428
4429 #[test]
4430 fn test_arbitrary_decimal() {
4431 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4432 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4433 .with_precision_and_scale(19, 0)
4434 .unwrap();
4435 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4436 .with_precision_and_scale(12, 0)
4437 .unwrap();
4438 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4439 .with_precision_and_scale(17, 10)
4440 .unwrap();
4441
4442 let written = RecordBatch::try_from_iter([
4443 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4444 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4445 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4446 ])
4447 .unwrap();
4448
4449 let mut buffer = Vec::with_capacity(1024);
4450 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4451 writer.write(&written).unwrap();
4452 writer.close().unwrap();
4453
4454 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4455 .unwrap()
4456 .collect::<Result<Vec<_>, _>>()
4457 .unwrap();
4458
4459 assert_eq!(&written.slice(0, 8), &read[0]);
4460 }
4461
4462 #[test]
4463 fn test_list_skip() {
4464 let mut list = ListBuilder::new(Int32Builder::new());
4465 list.append_value([Some(1), Some(2)]);
4466 list.append_value([Some(3)]);
4467 list.append_value([Some(4)]);
4468 let list = list.finish();
4469 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4470
4471 let props = WriterProperties::builder()
4473 .set_data_page_row_count_limit(1)
4474 .set_write_batch_size(2)
4475 .build();
4476
4477 let mut buffer = Vec::with_capacity(1024);
4478 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4479 writer.write(&batch).unwrap();
4480 writer.close().unwrap();
4481
4482 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4483 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4484 .unwrap()
4485 .with_row_selection(selection.into())
4486 .build()
4487 .unwrap();
4488 let out = reader.next().unwrap().unwrap();
4489 assert_eq!(out.num_rows(), 1);
4490 assert_eq!(out, batch.slice(2, 1));
4491 }
4492
4493 fn test_decimal32_roundtrip() {
4494 let d = |values: Vec<i32>, p: u8| {
4495 let iter = values.into_iter();
4496 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4497 .with_precision_and_scale(p, 2)
4498 .unwrap()
4499 };
4500
4501 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4502 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4503
4504 let mut buffer = Vec::with_capacity(1024);
4505 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4506 writer.write(&batch).unwrap();
4507 writer.close().unwrap();
4508
4509 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4510 let t1 = builder.parquet_schema().columns()[0].physical_type();
4511 assert_eq!(t1, PhysicalType::INT32);
4512
4513 let mut reader = builder.build().unwrap();
4514 assert_eq!(batch.schema(), reader.schema());
4515
4516 let out = reader.next().unwrap().unwrap();
4517 assert_eq!(batch, out);
4518 }
4519
4520 fn test_decimal64_roundtrip() {
4521 let d = |values: Vec<i64>, p: u8| {
4525 let iter = values.into_iter();
4526 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4527 .with_precision_and_scale(p, 2)
4528 .unwrap()
4529 };
4530
4531 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4532 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4533 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4534
4535 let batch = RecordBatch::try_from_iter([
4536 ("d1", Arc::new(d1) as ArrayRef),
4537 ("d2", Arc::new(d2) as ArrayRef),
4538 ("d3", Arc::new(d3) as ArrayRef),
4539 ])
4540 .unwrap();
4541
4542 let mut buffer = Vec::with_capacity(1024);
4543 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4544 writer.write(&batch).unwrap();
4545 writer.close().unwrap();
4546
4547 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4548 let t1 = builder.parquet_schema().columns()[0].physical_type();
4549 assert_eq!(t1, PhysicalType::INT32);
4550 let t2 = builder.parquet_schema().columns()[1].physical_type();
4551 assert_eq!(t2, PhysicalType::INT64);
4552 let t3 = builder.parquet_schema().columns()[2].physical_type();
4553 assert_eq!(t3, PhysicalType::INT64);
4554
4555 let mut reader = builder.build().unwrap();
4556 assert_eq!(batch.schema(), reader.schema());
4557
4558 let out = reader.next().unwrap().unwrap();
4559 assert_eq!(batch, out);
4560 }
4561
4562 fn test_decimal_roundtrip<T: DecimalType>() {
4563 let d = |values: Vec<usize>, p: u8| {
4568 let iter = values.into_iter().map(T::Native::usize_as);
4569 PrimitiveArray::<T>::from_iter_values(iter)
4570 .with_precision_and_scale(p, 2)
4571 .unwrap()
4572 };
4573
4574 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4575 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4576 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4577 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4578
4579 let batch = RecordBatch::try_from_iter([
4580 ("d1", Arc::new(d1) as ArrayRef),
4581 ("d2", Arc::new(d2) as ArrayRef),
4582 ("d3", Arc::new(d3) as ArrayRef),
4583 ("d4", Arc::new(d4) as ArrayRef),
4584 ])
4585 .unwrap();
4586
4587 let mut buffer = Vec::with_capacity(1024);
4588 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4589 writer.write(&batch).unwrap();
4590 writer.close().unwrap();
4591
4592 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4593 let t1 = builder.parquet_schema().columns()[0].physical_type();
4594 assert_eq!(t1, PhysicalType::INT32);
4595 let t2 = builder.parquet_schema().columns()[1].physical_type();
4596 assert_eq!(t2, PhysicalType::INT64);
4597 let t3 = builder.parquet_schema().columns()[2].physical_type();
4598 assert_eq!(t3, PhysicalType::INT64);
4599 let t4 = builder.parquet_schema().columns()[3].physical_type();
4600 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4601
4602 let mut reader = builder.build().unwrap();
4603 assert_eq!(batch.schema(), reader.schema());
4604
4605 let out = reader.next().unwrap().unwrap();
4606 assert_eq!(batch, out);
4607 }
4608
4609 #[test]
4610 fn test_decimal() {
4611 test_decimal32_roundtrip();
4612 test_decimal64_roundtrip();
4613 test_decimal_roundtrip::<Decimal128Type>();
4614 test_decimal_roundtrip::<Decimal256Type>();
4615 }
4616
4617 #[test]
4618 fn test_list_selection() {
4619 let schema = Arc::new(Schema::new(vec![Field::new_list(
4620 "list",
4621 Field::new_list_field(ArrowDataType::Utf8, true),
4622 false,
4623 )]));
4624 let mut buf = Vec::with_capacity(1024);
4625
4626 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4627
4628 for i in 0..2 {
4629 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4630 for j in 0..1024 {
4631 list_a_builder.values().append_value(format!("{i} {j}"));
4632 list_a_builder.append(true);
4633 }
4634 let batch =
4635 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4636 .unwrap();
4637 writer.write(&batch).unwrap();
4638 }
4639 let _metadata = writer.close().unwrap();
4640
4641 let buf = Bytes::from(buf);
4642 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4643 .unwrap()
4644 .with_row_selection(RowSelection::from(vec![
4645 RowSelector::skip(100),
4646 RowSelector::select(924),
4647 RowSelector::skip(100),
4648 RowSelector::select(924),
4649 ]))
4650 .build()
4651 .unwrap();
4652
4653 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4654 let batch = concat_batches(&schema, &batches).unwrap();
4655
4656 assert_eq!(batch.num_rows(), 924 * 2);
4657 let list = batch.column(0).as_list::<i32>();
4658
4659 for w in list.value_offsets().windows(2) {
4660 assert_eq!(w[0] + 1, w[1])
4661 }
4662 let mut values = list.values().as_string::<i32>().iter();
4663
4664 for i in 0..2 {
4665 for j in 100..1024 {
4666 let expected = format!("{i} {j}");
4667 assert_eq!(values.next().unwrap().unwrap(), &expected);
4668 }
4669 }
4670 }
4671
4672 #[test]
4673 fn test_list_selection_fuzz() {
4674 let mut rng = rng();
4675 let schema = Arc::new(Schema::new(vec![Field::new_list(
4676 "list",
4677 Field::new_list(
4678 Field::LIST_FIELD_DEFAULT_NAME,
4679 Field::new_list_field(ArrowDataType::Int32, true),
4680 true,
4681 ),
4682 true,
4683 )]));
4684 let mut buf = Vec::with_capacity(1024);
4685 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4686
4687 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4688
4689 for _ in 0..2048 {
4690 if rng.random_bool(0.2) {
4691 list_a_builder.append(false);
4692 continue;
4693 }
4694
4695 let list_a_len = rng.random_range(0..10);
4696 let list_b_builder = list_a_builder.values();
4697
4698 for _ in 0..list_a_len {
4699 if rng.random_bool(0.2) {
4700 list_b_builder.append(false);
4701 continue;
4702 }
4703
4704 let list_b_len = rng.random_range(0..10);
4705 let int_builder = list_b_builder.values();
4706 for _ in 0..list_b_len {
4707 match rng.random_bool(0.2) {
4708 true => int_builder.append_null(),
4709 false => int_builder.append_value(rng.random()),
4710 }
4711 }
4712 list_b_builder.append(true)
4713 }
4714 list_a_builder.append(true);
4715 }
4716
4717 let array = Arc::new(list_a_builder.finish());
4718 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4719
4720 writer.write(&batch).unwrap();
4721 let _metadata = writer.close().unwrap();
4722
4723 let buf = Bytes::from(buf);
4724
4725 let cases = [
4726 vec![
4727 RowSelector::skip(100),
4728 RowSelector::select(924),
4729 RowSelector::skip(100),
4730 RowSelector::select(924),
4731 ],
4732 vec![
4733 RowSelector::select(924),
4734 RowSelector::skip(100),
4735 RowSelector::select(924),
4736 RowSelector::skip(100),
4737 ],
4738 vec![
4739 RowSelector::skip(1023),
4740 RowSelector::select(1),
4741 RowSelector::skip(1023),
4742 RowSelector::select(1),
4743 ],
4744 vec![
4745 RowSelector::select(1),
4746 RowSelector::skip(1023),
4747 RowSelector::select(1),
4748 RowSelector::skip(1023),
4749 ],
4750 ];
4751
4752 for batch_size in [100, 1024, 2048] {
4753 for selection in &cases {
4754 let selection = RowSelection::from(selection.clone());
4755 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4756 .unwrap()
4757 .with_row_selection(selection.clone())
4758 .with_batch_size(batch_size)
4759 .build()
4760 .unwrap();
4761
4762 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4763 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4764 assert_eq!(actual.num_rows(), selection.row_count());
4765
4766 let mut batch_offset = 0;
4767 let mut actual_offset = 0;
4768 for selector in selection.iter() {
4769 if selector.skip {
4770 batch_offset += selector.row_count;
4771 continue;
4772 }
4773
4774 assert_eq!(
4775 batch.slice(batch_offset, selector.row_count),
4776 actual.slice(actual_offset, selector.row_count)
4777 );
4778
4779 batch_offset += selector.row_count;
4780 actual_offset += selector.row_count;
4781 }
4782 }
4783 }
4784 }
4785
4786 #[test]
4787 fn test_read_old_nested_list() {
4788 use arrow::datatypes::DataType;
4789 use arrow::datatypes::ToByteSlice;
4790
4791 let testdata = arrow::util::test_util::parquet_test_data();
4792 let path = format!("{testdata}/old_list_structure.parquet");
4801 let test_file = File::open(path).unwrap();
4802
4803 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4805
4806 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4808
4809 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4811 "array",
4812 DataType::Int32,
4813 false,
4814 ))))
4815 .len(2)
4816 .add_buffer(a_value_offsets)
4817 .add_child_data(a_values.into_data())
4818 .build()
4819 .unwrap();
4820 let a = ListArray::from(a_list_data);
4821
4822 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4823 let mut reader = builder.build().unwrap();
4824 let out = reader.next().unwrap().unwrap();
4825 assert_eq!(out.num_rows(), 1);
4826 assert_eq!(out.num_columns(), 1);
4827 let c0 = out.column(0);
4829 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4830 let r0 = c0arr.value(0);
4832 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4833 assert_eq!(r0arr, &a);
4834 }
4835
4836 #[test]
4837 fn test_map_no_value() {
4838 let testdata = arrow::util::test_util::parquet_test_data();
4858 let path = format!("{testdata}/map_no_value.parquet");
4859 let file = File::open(path).unwrap();
4860
4861 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4862 .unwrap()
4863 .build()
4864 .unwrap();
4865 let out = reader.next().unwrap().unwrap();
4866 assert_eq!(out.num_rows(), 3);
4867 assert_eq!(out.num_columns(), 3);
4868 let c0 = out.column(1).as_list::<i32>();
4870 let c1 = out.column(2).as_list::<i32>();
4871 assert_eq!(c0.len(), c1.len());
4872 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4873 }
4874
4875 #[test]
4876 fn test_get_row_group_column_bloom_filter_with_length() {
4877 let testdata = arrow::util::test_util::parquet_test_data();
4879 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4880 let file = File::open(path).unwrap();
4881 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4882 let schema = builder.schema().clone();
4883 let reader = builder.build().unwrap();
4884
4885 let mut parquet_data = Vec::new();
4886 let props = WriterProperties::builder()
4887 .set_bloom_filter_enabled(true)
4888 .build();
4889 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4890 for batch in reader {
4891 let batch = batch.unwrap();
4892 writer.write(&batch).unwrap();
4893 }
4894 writer.close().unwrap();
4895
4896 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4898 }
4899
4900 #[test]
4901 fn test_get_row_group_column_bloom_filter_without_length() {
4902 let testdata = arrow::util::test_util::parquet_test_data();
4903 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4904 let data = Bytes::from(std::fs::read(path).unwrap());
4905 test_get_row_group_column_bloom_filter(data, false);
4906 }
4907
4908 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
4909 let mut builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
4910
4911 let metadata = builder.metadata();
4912 assert_eq!(metadata.num_row_groups(), 1);
4913 let row_group = metadata.row_group(0);
4914 let column = row_group.column(0);
4915 assert_eq!(column.bloom_filter_length().is_some(), with_length);
4916
4917 let sbbf = builder
4918 .get_row_group_column_bloom_filter(0, 0)
4919 .unwrap()
4920 .unwrap();
4921 assert!(sbbf.check(&"Hello"));
4922 assert!(!sbbf.check(&"Hello_Not_Exists"));
4923 }
4924}