1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub struct ArrowReaderBuilder<T> {
108 pub(crate) input: T,
116
117 pub(crate) metadata: Arc<ParquetMetaData>,
118
119 pub(crate) schema: SchemaRef,
120
121 pub(crate) fields: Option<Arc<ParquetField>>,
122
123 pub(crate) batch_size: usize,
124
125 pub(crate) row_groups: Option<Vec<usize>>,
126
127 pub(crate) projection: ProjectionMask,
128
129 pub(crate) filter: Option<RowFilter>,
130
131 pub(crate) selection: Option<RowSelection>,
132
133 pub(crate) row_selection_policy: RowSelectionPolicy,
134
135 pub(crate) limit: Option<usize>,
136
137 pub(crate) offset: Option<usize>,
138
139 pub(crate) metrics: ArrowReaderMetrics,
140
141 pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("ArrowReaderBuilder<T>")
147 .field("input", &self.input)
148 .field("metadata", &self.metadata)
149 .field("schema", &self.schema)
150 .field("fields", &self.fields)
151 .field("batch_size", &self.batch_size)
152 .field("row_groups", &self.row_groups)
153 .field("projection", &self.projection)
154 .field("filter", &self.filter)
155 .field("selection", &self.selection)
156 .field("row_selection_policy", &self.row_selection_policy)
157 .field("limit", &self.limit)
158 .field("offset", &self.offset)
159 .field("metrics", &self.metrics)
160 .finish()
161 }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166 Self {
167 input,
168 metadata: metadata.metadata,
169 schema: metadata.schema,
170 fields: metadata.fields,
171 batch_size: 1024,
172 row_groups: None,
173 projection: ProjectionMask::all(),
174 filter: None,
175 selection: None,
176 row_selection_policy: RowSelectionPolicy::default(),
177 limit: None,
178 offset: None,
179 metrics: ArrowReaderMetrics::Disabled,
180 max_predicate_cache_size: 100 * 1024 * 1024, }
182 }
183
184 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186 &self.metadata
187 }
188
189 pub fn parquet_schema(&self) -> &SchemaDescriptor {
191 self.metadata.file_metadata().schema_descr()
192 }
193
194 pub fn schema(&self) -> &SchemaRef {
196 &self.schema
197 }
198
199 pub fn with_batch_size(self, batch_size: usize) -> Self {
202 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204 Self { batch_size, ..self }
205 }
206
207 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211 Self {
212 row_groups: Some(row_groups),
213 ..self
214 }
215 }
216
217 pub fn with_projection(self, mask: ProjectionMask) -> Self {
219 Self {
220 projection: mask,
221 ..self
222 }
223 }
224
225 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229 Self {
230 row_selection_policy: policy,
231 ..self
232 }
233 }
234
235 pub fn with_row_selection(self, selection: RowSelection) -> Self {
295 Self {
296 selection: Some(selection),
297 ..self
298 }
299 }
300
301 pub fn with_row_filter(self, filter: RowFilter) -> Self {
308 Self {
309 filter: Some(filter),
310 ..self
311 }
312 }
313
314 pub fn with_limit(self, limit: usize) -> Self {
322 Self {
323 limit: Some(limit),
324 ..self
325 }
326 }
327
328 pub fn with_offset(self, offset: usize) -> Self {
336 Self {
337 offset: Some(offset),
338 ..self
339 }
340 }
341
342 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
377 Self { metrics, ..self }
378 }
379
380 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
395 Self {
396 max_predicate_cache_size,
397 ..self
398 }
399 }
400}
401
402#[derive(Debug, Clone, Default)]
417pub struct ArrowReaderOptions {
418 skip_arrow_metadata: bool,
420 supplied_schema: Option<SchemaRef>,
425 pub(crate) page_index_policy: PageIndexPolicy,
427 metadata_options: ParquetMetaDataOptions,
429 #[cfg(feature = "encryption")]
431 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
432
433 virtual_columns: Vec<FieldRef>,
434}
435
436impl ArrowReaderOptions {
437 pub fn new() -> Self {
439 Self::default()
440 }
441
442 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
449 Self {
450 skip_arrow_metadata,
451 ..self
452 }
453 }
454
455 pub fn with_schema(self, schema: SchemaRef) -> Self {
512 Self {
513 supplied_schema: Some(schema),
514 skip_arrow_metadata: true,
515 ..self
516 }
517 }
518
519 pub fn with_page_index(self, page_index: bool) -> Self {
532 let page_index_policy = PageIndexPolicy::from(page_index);
533
534 Self {
535 page_index_policy,
536 ..self
537 }
538 }
539
540 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
544 Self {
545 page_index_policy: policy,
546 ..self
547 }
548 }
549
550 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
556 self.metadata_options.set_schema(schema);
557 self
558 }
559
560 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
571 self.metadata_options.set_encoding_stats_as_mask(val);
572 self
573 }
574
575 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
580 self.metadata_options.set_encoding_stats_policy(policy);
581 self
582 }
583
584 #[cfg(feature = "encryption")]
588 pub fn with_file_decryption_properties(
589 self,
590 file_decryption_properties: Arc<FileDecryptionProperties>,
591 ) -> Self {
592 Self {
593 file_decryption_properties: Some(file_decryption_properties),
594 ..self
595 }
596 }
597
598 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
650 for field in &virtual_columns {
652 if !is_virtual_column(field) {
653 return Err(ParquetError::General(format!(
654 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
655 field.name()
656 )));
657 }
658 }
659 Ok(Self {
660 virtual_columns,
661 ..self
662 })
663 }
664
665 pub fn page_index(&self) -> bool {
669 self.page_index_policy != PageIndexPolicy::Skip
670 }
671
672 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
674 &self.metadata_options
675 }
676
677 #[cfg(feature = "encryption")]
682 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
683 self.file_decryption_properties.as_ref()
684 }
685}
686
687#[derive(Debug, Clone)]
702pub struct ArrowReaderMetadata {
703 pub(crate) metadata: Arc<ParquetMetaData>,
705 pub(crate) schema: SchemaRef,
707 pub(crate) fields: Option<Arc<ParquetField>>,
709}
710
711impl ArrowReaderMetadata {
712 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
724 let metadata = ParquetMetaDataReader::new()
725 .with_page_index_policy(options.page_index_policy)
726 .with_metadata_options(Some(options.metadata_options.clone()));
727 #[cfg(feature = "encryption")]
728 let metadata = metadata.with_decryption_properties(
729 options.file_decryption_properties.as_ref().map(Arc::clone),
730 );
731 let metadata = metadata.parse_and_finish(reader)?;
732 Self::try_new(Arc::new(metadata), options)
733 }
734
735 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
743 match options.supplied_schema {
744 Some(supplied_schema) => Self::with_supplied_schema(
745 metadata,
746 supplied_schema.clone(),
747 &options.virtual_columns,
748 ),
749 None => {
750 let kv_metadata = match options.skip_arrow_metadata {
751 true => None,
752 false => metadata.file_metadata().key_value_metadata(),
753 };
754
755 let (schema, fields) = parquet_to_arrow_schema_and_fields(
756 metadata.file_metadata().schema_descr(),
757 ProjectionMask::all(),
758 kv_metadata,
759 &options.virtual_columns,
760 )?;
761
762 Ok(Self {
763 metadata,
764 schema: Arc::new(schema),
765 fields: fields.map(Arc::new),
766 })
767 }
768 }
769 }
770
771 fn with_supplied_schema(
772 metadata: Arc<ParquetMetaData>,
773 supplied_schema: SchemaRef,
774 virtual_columns: &[FieldRef],
775 ) -> Result<Self> {
776 let parquet_schema = metadata.file_metadata().schema_descr();
777 let field_levels = parquet_to_arrow_field_levels_with_virtual(
778 parquet_schema,
779 ProjectionMask::all(),
780 Some(supplied_schema.fields()),
781 virtual_columns,
782 )?;
783 let fields = field_levels.fields;
784 let inferred_len = fields.len();
785 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
786 if inferred_len != supplied_len {
790 return Err(arrow_err!(format!(
791 "Incompatible supplied Arrow schema: expected {} columns received {}",
792 inferred_len, supplied_len
793 )));
794 }
795
796 let mut errors = Vec::new();
797
798 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
799
800 for (field1, field2) in field_iter {
801 if field1.data_type() != field2.data_type() {
802 errors.push(format!(
803 "data type mismatch for field {}: requested {} but found {}",
804 field1.name(),
805 field1.data_type(),
806 field2.data_type()
807 ));
808 }
809 if field1.is_nullable() != field2.is_nullable() {
810 errors.push(format!(
811 "nullability mismatch for field {}: expected {:?} but found {:?}",
812 field1.name(),
813 field1.is_nullable(),
814 field2.is_nullable()
815 ));
816 }
817 if field1.metadata() != field2.metadata() {
818 errors.push(format!(
819 "metadata mismatch for field {}: expected {:?} but found {:?}",
820 field1.name(),
821 field1.metadata(),
822 field2.metadata()
823 ));
824 }
825 }
826
827 if !errors.is_empty() {
828 let message = errors.join(", ");
829 return Err(ParquetError::ArrowError(format!(
830 "Incompatible supplied Arrow schema: {message}",
831 )));
832 }
833
834 Ok(Self {
835 metadata,
836 schema: supplied_schema,
837 fields: field_levels.levels.map(Arc::new),
838 })
839 }
840
841 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
843 &self.metadata
844 }
845
846 pub fn parquet_schema(&self) -> &SchemaDescriptor {
848 self.metadata.file_metadata().schema_descr()
849 }
850
851 pub fn schema(&self) -> &SchemaRef {
853 &self.schema
854 }
855}
856
857#[doc(hidden)]
858pub struct SyncReader<T: ChunkReader>(T);
860
861impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
862 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
863 f.debug_tuple("SyncReader").field(&self.0).finish()
864 }
865}
866
867pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
874
875impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
876 pub fn try_new(reader: T) -> Result<Self> {
905 Self::try_new_with_options(reader, Default::default())
906 }
907
908 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
913 let metadata = ArrowReaderMetadata::load(&reader, options)?;
914 Ok(Self::new_with_metadata(reader, metadata))
915 }
916
917 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
956 Self::new_builder(SyncReader(input), metadata)
957 }
958
959 pub fn get_row_group_column_bloom_filter(
965 &self,
966 row_group_idx: usize,
967 column_idx: usize,
968 ) -> Result<Option<Sbbf>> {
969 let metadata = self.metadata.row_group(row_group_idx);
970 let column_metadata = metadata.column(column_idx);
971
972 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
973 offset
974 .try_into()
975 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
976 } else {
977 return Ok(None);
978 };
979
980 let buffer = match column_metadata.bloom_filter_length() {
981 Some(length) => self.input.0.get_bytes(offset, length as usize),
982 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
983 }?;
984
985 let (header, bitset_offset) =
986 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
987
988 match header.algorithm {
989 BloomFilterAlgorithm::BLOCK => {
990 }
992 }
993 match header.compression {
994 BloomFilterCompression::UNCOMPRESSED => {
995 }
997 }
998 match header.hash {
999 BloomFilterHash::XXHASH => {
1000 }
1002 }
1003
1004 let bitset = match column_metadata.bloom_filter_length() {
1005 Some(_) => buffer.slice(
1006 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1007 - TryInto::<usize>::try_into(offset).unwrap())..,
1008 ),
1009 None => {
1010 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1011 ParquetError::General("Bloom filter length is invalid".to_string())
1012 })?;
1013 self.input.0.get_bytes(bitset_offset, bitset_length)?
1014 }
1015 };
1016 Ok(Some(Sbbf::new(&bitset)))
1017 }
1018
1019 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1023 let Self {
1024 input,
1025 metadata,
1026 schema: _,
1027 fields,
1028 batch_size,
1029 row_groups,
1030 projection,
1031 mut filter,
1032 selection,
1033 row_selection_policy,
1034 limit,
1035 offset,
1036 metrics,
1037 max_predicate_cache_size: _,
1039 } = self;
1040
1041 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1043
1044 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1045
1046 let reader = ReaderRowGroups {
1047 reader: Arc::new(input.0),
1048 metadata,
1049 row_groups,
1050 };
1051
1052 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1053 .with_selection(selection)
1054 .with_row_selection_policy(row_selection_policy);
1055
1056 if let Some(filter) = filter.as_mut() {
1058 for predicate in filter.predicates.iter_mut() {
1059 if !plan_builder.selects_any() {
1061 break;
1062 }
1063
1064 let mut cache_projection = predicate.projection().clone();
1065 cache_projection.intersect(&projection);
1066
1067 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1068 .with_parquet_metadata(&reader.metadata)
1069 .build_array_reader(fields.as_deref(), predicate.projection())?;
1070
1071 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1072 }
1073 }
1074
1075 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1076 .with_parquet_metadata(&reader.metadata)
1077 .build_array_reader(fields.as_deref(), &projection)?;
1078
1079 let read_plan = plan_builder
1080 .limited(reader.num_rows())
1081 .with_offset(offset)
1082 .with_limit(limit)
1083 .build_limited()
1084 .build();
1085
1086 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1087 }
1088}
1089
1090struct ReaderRowGroups<T: ChunkReader> {
1091 reader: Arc<T>,
1092
1093 metadata: Arc<ParquetMetaData>,
1094 row_groups: Vec<usize>,
1096}
1097
1098impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1099 fn num_rows(&self) -> usize {
1100 let meta = self.metadata.row_groups();
1101 self.row_groups
1102 .iter()
1103 .map(|x| meta[*x].num_rows() as usize)
1104 .sum()
1105 }
1106
1107 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1108 Ok(Box::new(ReaderPageIterator {
1109 column_idx: i,
1110 reader: self.reader.clone(),
1111 metadata: self.metadata.clone(),
1112 row_groups: self.row_groups.clone().into_iter(),
1113 }))
1114 }
1115
1116 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1117 Box::new(
1118 self.row_groups
1119 .iter()
1120 .map(move |i| self.metadata.row_group(*i)),
1121 )
1122 }
1123
1124 fn metadata(&self) -> &ParquetMetaData {
1125 self.metadata.as_ref()
1126 }
1127}
1128
1129struct ReaderPageIterator<T: ChunkReader> {
1130 reader: Arc<T>,
1131 column_idx: usize,
1132 row_groups: std::vec::IntoIter<usize>,
1133 metadata: Arc<ParquetMetaData>,
1134}
1135
1136impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1137 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1139 let rg = self.metadata.row_group(rg_idx);
1140 let column_chunk_metadata = rg.column(self.column_idx);
1141 let offset_index = self.metadata.offset_index();
1142 let page_locations = offset_index
1145 .filter(|i| !i[rg_idx].is_empty())
1146 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1147 let total_rows = rg.num_rows() as usize;
1148 let reader = self.reader.clone();
1149
1150 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1151 .add_crypto_context(
1152 rg_idx,
1153 self.column_idx,
1154 self.metadata.as_ref(),
1155 column_chunk_metadata,
1156 )
1157 }
1158}
1159
1160impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1161 type Item = Result<Box<dyn PageReader>>;
1162
1163 fn next(&mut self) -> Option<Self::Item> {
1164 let rg_idx = self.row_groups.next()?;
1165 let page_reader = self
1166 .next_page_reader(rg_idx)
1167 .map(|page_reader| Box::new(page_reader) as _);
1168 Some(page_reader)
1169 }
1170}
1171
1172impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1173
1174pub struct ParquetRecordBatchReader {
1185 array_reader: Box<dyn ArrayReader>,
1186 schema: SchemaRef,
1187 read_plan: ReadPlan,
1188}
1189
1190impl Debug for ParquetRecordBatchReader {
1191 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1192 f.debug_struct("ParquetRecordBatchReader")
1193 .field("array_reader", &"...")
1194 .field("schema", &self.schema)
1195 .field("read_plan", &self.read_plan)
1196 .finish()
1197 }
1198}
1199
1200impl Iterator for ParquetRecordBatchReader {
1201 type Item = Result<RecordBatch, ArrowError>;
1202
1203 fn next(&mut self) -> Option<Self::Item> {
1204 self.next_inner()
1205 .map_err(|arrow_err| arrow_err.into())
1206 .transpose()
1207 }
1208}
1209
1210impl ParquetRecordBatchReader {
1211 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1217 let mut read_records = 0;
1218 let batch_size = self.batch_size();
1219 if batch_size == 0 {
1220 return Ok(None);
1221 }
1222 match self.read_plan.row_selection_cursor_mut() {
1223 RowSelectionCursor::Mask(mask_cursor) => {
1224 while !mask_cursor.is_empty() {
1227 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1228 return Ok(None);
1229 };
1230
1231 if mask_chunk.initial_skip > 0 {
1232 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1233 if skipped != mask_chunk.initial_skip {
1234 return Err(general_err!(
1235 "failed to skip rows, expected {}, got {}",
1236 mask_chunk.initial_skip,
1237 skipped
1238 ));
1239 }
1240 }
1241
1242 if mask_chunk.chunk_rows == 0 {
1243 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1244 return Ok(None);
1245 }
1246 continue;
1247 }
1248
1249 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1250
1251 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1252 if read == 0 {
1253 return Err(general_err!(
1254 "reached end of column while expecting {} rows",
1255 mask_chunk.chunk_rows
1256 ));
1257 }
1258 if read != mask_chunk.chunk_rows {
1259 return Err(general_err!(
1260 "insufficient rows read from array reader - expected {}, got {}",
1261 mask_chunk.chunk_rows,
1262 read
1263 ));
1264 }
1265
1266 let array = self.array_reader.consume_batch()?;
1267 let struct_array = array.as_struct_opt().ok_or_else(|| {
1270 ArrowError::ParquetError(
1271 "Struct array reader should return struct array".to_string(),
1272 )
1273 })?;
1274
1275 let filtered_batch =
1276 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1277
1278 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1279 return Err(general_err!(
1280 "filtered rows mismatch selection - expected {}, got {}",
1281 mask_chunk.selected_rows,
1282 filtered_batch.num_rows()
1283 ));
1284 }
1285
1286 if filtered_batch.num_rows() == 0 {
1287 continue;
1288 }
1289
1290 return Ok(Some(filtered_batch));
1291 }
1292 }
1293 RowSelectionCursor::Selectors(selectors_cursor) => {
1294 while read_records < batch_size && !selectors_cursor.is_empty() {
1295 let front = selectors_cursor.next_selector();
1296 if front.skip {
1297 let skipped = self.array_reader.skip_records(front.row_count)?;
1298
1299 if skipped != front.row_count {
1300 return Err(general_err!(
1301 "failed to skip rows, expected {}, got {}",
1302 front.row_count,
1303 skipped
1304 ));
1305 }
1306 continue;
1307 }
1308
1309 if front.row_count == 0 {
1312 continue;
1313 }
1314
1315 let need_read = batch_size - read_records;
1317 let to_read = match front.row_count.checked_sub(need_read) {
1318 Some(remaining) if remaining != 0 => {
1319 selectors_cursor.return_selector(RowSelector::select(remaining));
1322 need_read
1323 }
1324 _ => front.row_count,
1325 };
1326 match self.array_reader.read_records(to_read)? {
1327 0 => break,
1328 rec => read_records += rec,
1329 };
1330 }
1331 }
1332 RowSelectionCursor::All => {
1333 self.array_reader.read_records(batch_size)?;
1334 }
1335 };
1336
1337 let array = self.array_reader.consume_batch()?;
1338 let struct_array = array.as_struct_opt().ok_or_else(|| {
1339 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1340 })?;
1341
1342 Ok(if struct_array.len() > 0 {
1343 Some(RecordBatch::from(struct_array))
1344 } else {
1345 None
1346 })
1347 }
1348}
1349
1350impl RecordBatchReader for ParquetRecordBatchReader {
1351 fn schema(&self) -> SchemaRef {
1356 self.schema.clone()
1357 }
1358}
1359
1360impl ParquetRecordBatchReader {
1361 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1365 ParquetRecordBatchReaderBuilder::try_new(reader)?
1366 .with_batch_size(batch_size)
1367 .build()
1368 }
1369
1370 pub fn try_new_with_row_groups(
1375 levels: &FieldLevels,
1376 row_groups: &dyn RowGroups,
1377 batch_size: usize,
1378 selection: Option<RowSelection>,
1379 ) -> Result<Self> {
1380 let metrics = ArrowReaderMetrics::disabled();
1382 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1383 .with_parquet_metadata(row_groups.metadata())
1384 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1385
1386 let read_plan = ReadPlanBuilder::new(batch_size)
1387 .with_selection(selection)
1388 .build();
1389
1390 Ok(Self {
1391 array_reader,
1392 schema: Arc::new(Schema::new(levels.fields.clone())),
1393 read_plan,
1394 })
1395 }
1396
1397 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1401 let schema = match array_reader.get_data_type() {
1402 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1403 _ => unreachable!("Struct array reader's data type is not struct!"),
1404 };
1405
1406 Self {
1407 array_reader,
1408 schema: Arc::new(schema),
1409 read_plan,
1410 }
1411 }
1412
1413 #[inline(always)]
1414 pub(crate) fn batch_size(&self) -> usize {
1415 self.read_plan.batch_size()
1416 }
1417}
1418
1419#[cfg(test)]
1420pub(crate) mod tests {
1421 use std::cmp::min;
1422 use std::collections::{HashMap, VecDeque};
1423 use std::fmt::Formatter;
1424 use std::fs::File;
1425 use std::io::Seek;
1426 use std::path::PathBuf;
1427 use std::sync::Arc;
1428
1429 use rand::rngs::StdRng;
1430 use rand::{Rng, RngCore, SeedableRng, random, rng};
1431 use tempfile::tempfile;
1432
1433 use crate::arrow::arrow_reader::{
1434 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1435 ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1436 RowSelectionPolicy, RowSelector,
1437 };
1438 use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, virtual_type::RowNumber};
1439 use crate::arrow::{ArrowWriter, ProjectionMask};
1440 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1441 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1442 use crate::data_type::{
1443 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1444 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1445 };
1446 use crate::errors::Result;
1447 use crate::file::metadata::{ParquetMetaData, ParquetStatisticsPolicy};
1448 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1449 use crate::file::writer::SerializedFileWriter;
1450 use crate::schema::parser::parse_message_type;
1451 use crate::schema::types::{Type, TypePtr};
1452 use crate::util::test_common::rand_gen::RandGen;
1453 use arrow::compute::kernels::cmp::eq;
1454 use arrow::compute::or;
1455 use arrow_array::builder::*;
1456 use arrow_array::cast::AsArray;
1457 use arrow_array::types::{
1458 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1459 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1460 Time64MicrosecondType,
1461 };
1462 use arrow_array::*;
1463 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1464 use arrow_data::{ArrayData, ArrayDataBuilder};
1465 use arrow_schema::{
1466 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1467 };
1468 use arrow_select::concat::concat_batches;
1469 use bytes::Bytes;
1470 use half::f16;
1471 use num_traits::PrimInt;
1472
1473 #[test]
1474 fn test_arrow_reader_all_columns() {
1475 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1476
1477 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1478 let original_schema = Arc::clone(builder.schema());
1479 let reader = builder.build().unwrap();
1480
1481 assert_eq!(original_schema.fields(), reader.schema().fields());
1483 }
1484
1485 #[test]
1486 fn test_reuse_schema() {
1487 let file = get_test_file("parquet/alltypes-java.parquet");
1488
1489 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1490 let expected = builder.metadata;
1491 let schema = expected.file_metadata().schema_descr_ptr();
1492
1493 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1494 let builder =
1495 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1496
1497 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1499 }
1500
1501 #[test]
1502 fn test_page_encoding_stats_mask() {
1503 let testdata = arrow::util::test_util::parquet_test_data();
1504 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1505 let file = File::open(path).unwrap();
1506
1507 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1508 let builder =
1509 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1510
1511 let row_group_metadata = builder.metadata.row_group(0);
1512
1513 let page_encoding_stats = row_group_metadata
1515 .column(0)
1516 .page_encoding_stats_mask()
1517 .unwrap();
1518 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1519 let page_encoding_stats = row_group_metadata
1520 .column(2)
1521 .page_encoding_stats_mask()
1522 .unwrap();
1523 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1524 }
1525
1526 #[test]
1527 fn test_page_encoding_stats_skipped() {
1528 let testdata = arrow::util::test_util::parquet_test_data();
1529 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1530 let file = File::open(path).unwrap();
1531
1532 let arrow_options =
1534 ArrowReaderOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
1535 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1536 file.try_clone().unwrap(),
1537 arrow_options,
1538 )
1539 .unwrap();
1540
1541 let row_group_metadata = builder.metadata.row_group(0);
1542 for column in row_group_metadata.columns() {
1543 assert!(column.page_encoding_stats().is_none());
1544 assert!(column.page_encoding_stats_mask().is_none());
1545 }
1546
1547 let arrow_options = ArrowReaderOptions::new()
1549 .with_encoding_stats_as_mask(true)
1550 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1551 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1552 file.try_clone().unwrap(),
1553 arrow_options,
1554 )
1555 .unwrap();
1556
1557 let row_group_metadata = builder.metadata.row_group(0);
1558 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1559 assert!(column.page_encoding_stats().is_none());
1560 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1561 }
1562 }
1563
1564 #[test]
1565 fn test_arrow_reader_single_column() {
1566 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1567
1568 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1569 let original_schema = Arc::clone(builder.schema());
1570
1571 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1572 let reader = builder.with_projection(mask).build().unwrap();
1573
1574 assert_eq!(1, reader.schema().fields().len());
1576 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1577 }
1578
1579 #[test]
1580 fn test_arrow_reader_single_column_by_name() {
1581 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1582
1583 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1584 let original_schema = Arc::clone(builder.schema());
1585
1586 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1587 let reader = builder.with_projection(mask).build().unwrap();
1588
1589 assert_eq!(1, reader.schema().fields().len());
1591 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1592 }
1593
1594 #[test]
1595 fn test_null_column_reader_test() {
1596 let mut file = tempfile::tempfile().unwrap();
1597
1598 let schema = "
1599 message message {
1600 OPTIONAL INT32 int32;
1601 }
1602 ";
1603 let schema = Arc::new(parse_message_type(schema).unwrap());
1604
1605 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1606 generate_single_column_file_with_data::<Int32Type>(
1607 &[vec![], vec![]],
1608 Some(&def_levels),
1609 file.try_clone().unwrap(), schema,
1611 Some(Field::new("int32", ArrowDataType::Null, true)),
1612 &Default::default(),
1613 )
1614 .unwrap();
1615
1616 file.rewind().unwrap();
1617
1618 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1619 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1620
1621 assert_eq!(batches.len(), 4);
1622 for batch in &batches[0..3] {
1623 assert_eq!(batch.num_rows(), 2);
1624 assert_eq!(batch.num_columns(), 1);
1625 assert_eq!(batch.column(0).null_count(), 2);
1626 }
1627
1628 assert_eq!(batches[3].num_rows(), 1);
1629 assert_eq!(batches[3].num_columns(), 1);
1630 assert_eq!(batches[3].column(0).null_count(), 1);
1631 }
1632
1633 #[test]
1634 fn test_primitive_single_column_reader_test() {
1635 run_single_column_reader_tests::<BoolType, _, BoolType>(
1636 2,
1637 ConvertedType::NONE,
1638 None,
1639 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1640 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1641 );
1642 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1643 2,
1644 ConvertedType::NONE,
1645 None,
1646 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1647 &[
1648 Encoding::PLAIN,
1649 Encoding::RLE_DICTIONARY,
1650 Encoding::DELTA_BINARY_PACKED,
1651 Encoding::BYTE_STREAM_SPLIT,
1652 ],
1653 );
1654 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1655 2,
1656 ConvertedType::NONE,
1657 None,
1658 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1659 &[
1660 Encoding::PLAIN,
1661 Encoding::RLE_DICTIONARY,
1662 Encoding::DELTA_BINARY_PACKED,
1663 Encoding::BYTE_STREAM_SPLIT,
1664 ],
1665 );
1666 run_single_column_reader_tests::<FloatType, _, FloatType>(
1667 2,
1668 ConvertedType::NONE,
1669 None,
1670 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1671 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1672 );
1673 }
1674
1675 #[test]
1676 fn test_unsigned_primitive_single_column_reader_test() {
1677 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1678 2,
1679 ConvertedType::UINT_32,
1680 Some(ArrowDataType::UInt32),
1681 |vals| {
1682 Arc::new(UInt32Array::from_iter(
1683 vals.iter().map(|x| x.map(|x| x as u32)),
1684 ))
1685 },
1686 &[
1687 Encoding::PLAIN,
1688 Encoding::RLE_DICTIONARY,
1689 Encoding::DELTA_BINARY_PACKED,
1690 ],
1691 );
1692 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1693 2,
1694 ConvertedType::UINT_64,
1695 Some(ArrowDataType::UInt64),
1696 |vals| {
1697 Arc::new(UInt64Array::from_iter(
1698 vals.iter().map(|x| x.map(|x| x as u64)),
1699 ))
1700 },
1701 &[
1702 Encoding::PLAIN,
1703 Encoding::RLE_DICTIONARY,
1704 Encoding::DELTA_BINARY_PACKED,
1705 ],
1706 );
1707 }
1708
1709 #[test]
1710 fn test_unsigned_roundtrip() {
1711 let schema = Arc::new(Schema::new(vec![
1712 Field::new("uint32", ArrowDataType::UInt32, true),
1713 Field::new("uint64", ArrowDataType::UInt64, true),
1714 ]));
1715
1716 let mut buf = Vec::with_capacity(1024);
1717 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1718
1719 let original = RecordBatch::try_new(
1720 schema,
1721 vec![
1722 Arc::new(UInt32Array::from_iter_values([
1723 0,
1724 i32::MAX as u32,
1725 u32::MAX,
1726 ])),
1727 Arc::new(UInt64Array::from_iter_values([
1728 0,
1729 i64::MAX as u64,
1730 u64::MAX,
1731 ])),
1732 ],
1733 )
1734 .unwrap();
1735
1736 writer.write(&original).unwrap();
1737 writer.close().unwrap();
1738
1739 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1740 let ret = reader.next().unwrap().unwrap();
1741 assert_eq!(ret, original);
1742
1743 ret.column(0)
1745 .as_any()
1746 .downcast_ref::<UInt32Array>()
1747 .unwrap();
1748
1749 ret.column(1)
1750 .as_any()
1751 .downcast_ref::<UInt64Array>()
1752 .unwrap();
1753 }
1754
1755 #[test]
1756 fn test_float16_roundtrip() -> Result<()> {
1757 let schema = Arc::new(Schema::new(vec![
1758 Field::new("float16", ArrowDataType::Float16, false),
1759 Field::new("float16-nullable", ArrowDataType::Float16, true),
1760 ]));
1761
1762 let mut buf = Vec::with_capacity(1024);
1763 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1764
1765 let original = RecordBatch::try_new(
1766 schema,
1767 vec![
1768 Arc::new(Float16Array::from_iter_values([
1769 f16::EPSILON,
1770 f16::MIN,
1771 f16::MAX,
1772 f16::NAN,
1773 f16::INFINITY,
1774 f16::NEG_INFINITY,
1775 f16::ONE,
1776 f16::NEG_ONE,
1777 f16::ZERO,
1778 f16::NEG_ZERO,
1779 f16::E,
1780 f16::PI,
1781 f16::FRAC_1_PI,
1782 ])),
1783 Arc::new(Float16Array::from(vec![
1784 None,
1785 None,
1786 None,
1787 Some(f16::NAN),
1788 Some(f16::INFINITY),
1789 Some(f16::NEG_INFINITY),
1790 None,
1791 None,
1792 None,
1793 None,
1794 None,
1795 None,
1796 Some(f16::FRAC_1_PI),
1797 ])),
1798 ],
1799 )?;
1800
1801 writer.write(&original)?;
1802 writer.close()?;
1803
1804 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1805 let ret = reader.next().unwrap()?;
1806 assert_eq!(ret, original);
1807
1808 ret.column(0).as_primitive::<Float16Type>();
1810 ret.column(1).as_primitive::<Float16Type>();
1811
1812 Ok(())
1813 }
1814
1815 #[test]
1816 fn test_time_utc_roundtrip() -> Result<()> {
1817 let schema = Arc::new(Schema::new(vec![
1818 Field::new(
1819 "time_millis",
1820 ArrowDataType::Time32(TimeUnit::Millisecond),
1821 true,
1822 )
1823 .with_metadata(HashMap::from_iter(vec![(
1824 "adjusted_to_utc".to_string(),
1825 "".to_string(),
1826 )])),
1827 Field::new(
1828 "time_micros",
1829 ArrowDataType::Time64(TimeUnit::Microsecond),
1830 true,
1831 )
1832 .with_metadata(HashMap::from_iter(vec![(
1833 "adjusted_to_utc".to_string(),
1834 "".to_string(),
1835 )])),
1836 ]));
1837
1838 let mut buf = Vec::with_capacity(1024);
1839 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1840
1841 let original = RecordBatch::try_new(
1842 schema,
1843 vec![
1844 Arc::new(Time32MillisecondArray::from(vec![
1845 Some(-1),
1846 Some(0),
1847 Some(86_399_000),
1848 Some(86_400_000),
1849 Some(86_401_000),
1850 None,
1851 ])),
1852 Arc::new(Time64MicrosecondArray::from(vec![
1853 Some(-1),
1854 Some(0),
1855 Some(86_399 * 1_000_000),
1856 Some(86_400 * 1_000_000),
1857 Some(86_401 * 1_000_000),
1858 None,
1859 ])),
1860 ],
1861 )?;
1862
1863 writer.write(&original)?;
1864 writer.close()?;
1865
1866 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1867 let ret = reader.next().unwrap()?;
1868 assert_eq!(ret, original);
1869
1870 ret.column(0).as_primitive::<Time32MillisecondType>();
1872 ret.column(1).as_primitive::<Time64MicrosecondType>();
1873
1874 Ok(())
1875 }
1876
1877 #[test]
1878 fn test_date32_roundtrip() -> Result<()> {
1879 use arrow_array::Date32Array;
1880
1881 let schema = Arc::new(Schema::new(vec![Field::new(
1882 "date32",
1883 ArrowDataType::Date32,
1884 false,
1885 )]));
1886
1887 let mut buf = Vec::with_capacity(1024);
1888
1889 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1890
1891 let original = RecordBatch::try_new(
1892 schema,
1893 vec![Arc::new(Date32Array::from(vec![
1894 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1895 ]))],
1896 )?;
1897
1898 writer.write(&original)?;
1899 writer.close()?;
1900
1901 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1902 let ret = reader.next().unwrap()?;
1903 assert_eq!(ret, original);
1904
1905 ret.column(0).as_primitive::<Date32Type>();
1907
1908 Ok(())
1909 }
1910
1911 #[test]
1912 fn test_date64_roundtrip() -> Result<()> {
1913 use arrow_array::Date64Array;
1914
1915 let schema = Arc::new(Schema::new(vec![
1916 Field::new("small-date64", ArrowDataType::Date64, false),
1917 Field::new("big-date64", ArrowDataType::Date64, false),
1918 Field::new("invalid-date64", ArrowDataType::Date64, false),
1919 ]));
1920
1921 let mut default_buf = Vec::with_capacity(1024);
1922 let mut coerce_buf = Vec::with_capacity(1024);
1923
1924 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1925
1926 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1927 let mut coerce_writer =
1928 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1929
1930 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1931
1932 let original = RecordBatch::try_new(
1933 schema,
1934 vec![
1935 Arc::new(Date64Array::from(vec![
1937 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1938 -1_000 * NUM_MILLISECONDS_IN_DAY,
1939 0,
1940 1_000 * NUM_MILLISECONDS_IN_DAY,
1941 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1942 ])),
1943 Arc::new(Date64Array::from(vec![
1945 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1946 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1947 0,
1948 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1949 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1950 ])),
1951 Arc::new(Date64Array::from(vec![
1953 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1954 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1955 1,
1956 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1957 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1958 ])),
1959 ],
1960 )?;
1961
1962 default_writer.write(&original)?;
1963 coerce_writer.write(&original)?;
1964
1965 default_writer.close()?;
1966 coerce_writer.close()?;
1967
1968 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1969 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1970
1971 let default_ret = default_reader.next().unwrap()?;
1972 let coerce_ret = coerce_reader.next().unwrap()?;
1973
1974 assert_eq!(default_ret, original);
1976
1977 assert_eq!(coerce_ret.column(0), original.column(0));
1979 assert_ne!(coerce_ret.column(1), original.column(1));
1980 assert_ne!(coerce_ret.column(2), original.column(2));
1981
1982 default_ret.column(0).as_primitive::<Date64Type>();
1984 coerce_ret.column(0).as_primitive::<Date64Type>();
1985
1986 Ok(())
1987 }
1988 struct RandFixedLenGen {}
1989
1990 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1991 fn r#gen(len: i32) -> FixedLenByteArray {
1992 let mut v = vec![0u8; len as usize];
1993 rng().fill_bytes(&mut v);
1994 ByteArray::from(v).into()
1995 }
1996 }
1997
1998 #[test]
1999 fn test_fixed_length_binary_column_reader() {
2000 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2001 20,
2002 ConvertedType::NONE,
2003 None,
2004 |vals| {
2005 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2006 for val in vals {
2007 match val {
2008 Some(b) => builder.append_value(b).unwrap(),
2009 None => builder.append_null(),
2010 }
2011 }
2012 Arc::new(builder.finish())
2013 },
2014 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2015 );
2016 }
2017
2018 #[test]
2019 fn test_interval_day_time_column_reader() {
2020 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2021 12,
2022 ConvertedType::INTERVAL,
2023 None,
2024 |vals| {
2025 Arc::new(
2026 vals.iter()
2027 .map(|x| {
2028 x.as_ref().map(|b| IntervalDayTime {
2029 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2030 milliseconds: i32::from_le_bytes(
2031 b.as_ref()[8..12].try_into().unwrap(),
2032 ),
2033 })
2034 })
2035 .collect::<IntervalDayTimeArray>(),
2036 )
2037 },
2038 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2039 );
2040 }
2041
2042 #[test]
2043 fn test_int96_single_column_reader_test() {
2044 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2045
2046 type TypeHintAndConversionFunction =
2047 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2048
2049 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2050 (None, |vals: &[Option<Int96>]| {
2052 Arc::new(TimestampNanosecondArray::from_iter(
2053 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2054 )) as ArrayRef
2055 }),
2056 (
2058 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2059 |vals: &[Option<Int96>]| {
2060 Arc::new(TimestampSecondArray::from_iter(
2061 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2062 )) as ArrayRef
2063 },
2064 ),
2065 (
2066 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2067 |vals: &[Option<Int96>]| {
2068 Arc::new(TimestampMillisecondArray::from_iter(
2069 vals.iter().map(|x| x.map(|x| x.to_millis())),
2070 )) as ArrayRef
2071 },
2072 ),
2073 (
2074 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2075 |vals: &[Option<Int96>]| {
2076 Arc::new(TimestampMicrosecondArray::from_iter(
2077 vals.iter().map(|x| x.map(|x| x.to_micros())),
2078 )) as ArrayRef
2079 },
2080 ),
2081 (
2082 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2083 |vals: &[Option<Int96>]| {
2084 Arc::new(TimestampNanosecondArray::from_iter(
2085 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2086 )) as ArrayRef
2087 },
2088 ),
2089 (
2091 Some(ArrowDataType::Timestamp(
2092 TimeUnit::Second,
2093 Some(Arc::from("-05:00")),
2094 )),
2095 |vals: &[Option<Int96>]| {
2096 Arc::new(
2097 TimestampSecondArray::from_iter(
2098 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2099 )
2100 .with_timezone("-05:00"),
2101 ) as ArrayRef
2102 },
2103 ),
2104 ];
2105
2106 resolutions.iter().for_each(|(arrow_type, converter)| {
2107 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2108 2,
2109 ConvertedType::NONE,
2110 arrow_type.clone(),
2111 converter,
2112 encodings,
2113 );
2114 })
2115 }
2116
2117 #[test]
2118 fn test_int96_from_spark_file_with_provided_schema() {
2119 use arrow_schema::DataType::Timestamp;
2123 let test_data = arrow::util::test_util::parquet_test_data();
2124 let path = format!("{test_data}/int96_from_spark.parquet");
2125 let file = File::open(path).unwrap();
2126
2127 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2128 "a",
2129 Timestamp(TimeUnit::Microsecond, None),
2130 true,
2131 )]));
2132 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2133
2134 let mut record_reader =
2135 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2136 .unwrap()
2137 .build()
2138 .unwrap();
2139
2140 let batch = record_reader.next().unwrap().unwrap();
2141 assert_eq!(batch.num_columns(), 1);
2142 let column = batch.column(0);
2143 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2144
2145 let expected = Arc::new(Int64Array::from(vec![
2146 Some(1704141296123456),
2147 Some(1704070800000000),
2148 Some(253402225200000000),
2149 Some(1735599600000000),
2150 None,
2151 Some(9089380393200000000),
2152 ]));
2153
2154 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2159 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2160
2161 assert_eq!(casted_timestamps.len(), expected.len());
2162
2163 casted_timestamps
2164 .iter()
2165 .zip(expected.iter())
2166 .for_each(|(lhs, rhs)| {
2167 assert_eq!(lhs, rhs);
2168 });
2169 }
2170
2171 #[test]
2172 fn test_int96_from_spark_file_without_provided_schema() {
2173 use arrow_schema::DataType::Timestamp;
2177 let test_data = arrow::util::test_util::parquet_test_data();
2178 let path = format!("{test_data}/int96_from_spark.parquet");
2179 let file = File::open(path).unwrap();
2180
2181 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2182 .unwrap()
2183 .build()
2184 .unwrap();
2185
2186 let batch = record_reader.next().unwrap().unwrap();
2187 assert_eq!(batch.num_columns(), 1);
2188 let column = batch.column(0);
2189 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2190
2191 let expected = Arc::new(Int64Array::from(vec![
2192 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2197 Some(-4864435138808946688), ]));
2199
2200 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2205 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2206
2207 assert_eq!(casted_timestamps.len(), expected.len());
2208
2209 casted_timestamps
2210 .iter()
2211 .zip(expected.iter())
2212 .for_each(|(lhs, rhs)| {
2213 assert_eq!(lhs, rhs);
2214 });
2215 }
2216
2217 struct RandUtf8Gen {}
2218
2219 impl RandGen<ByteArrayType> for RandUtf8Gen {
2220 fn r#gen(len: i32) -> ByteArray {
2221 Int32Type::r#gen(len).to_string().as_str().into()
2222 }
2223 }
2224
2225 #[test]
2226 fn test_utf8_single_column_reader_test() {
2227 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2228 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2229 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2230 })))
2231 }
2232
2233 let encodings = &[
2234 Encoding::PLAIN,
2235 Encoding::RLE_DICTIONARY,
2236 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2237 Encoding::DELTA_BYTE_ARRAY,
2238 ];
2239
2240 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2241 2,
2242 ConvertedType::NONE,
2243 None,
2244 |vals| {
2245 Arc::new(BinaryArray::from_iter(
2246 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2247 ))
2248 },
2249 encodings,
2250 );
2251
2252 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2253 2,
2254 ConvertedType::UTF8,
2255 None,
2256 string_converter::<i32>,
2257 encodings,
2258 );
2259
2260 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2261 2,
2262 ConvertedType::UTF8,
2263 Some(ArrowDataType::Utf8),
2264 string_converter::<i32>,
2265 encodings,
2266 );
2267
2268 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2269 2,
2270 ConvertedType::UTF8,
2271 Some(ArrowDataType::LargeUtf8),
2272 string_converter::<i64>,
2273 encodings,
2274 );
2275
2276 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2277 for key in &small_key_types {
2278 for encoding in encodings {
2279 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2280 opts.encoding = *encoding;
2281
2282 let data_type =
2283 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2284
2285 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2287 opts,
2288 2,
2289 ConvertedType::UTF8,
2290 Some(data_type.clone()),
2291 move |vals| {
2292 let vals = string_converter::<i32>(vals);
2293 arrow::compute::cast(&vals, &data_type).unwrap()
2294 },
2295 );
2296 }
2297 }
2298
2299 let key_types = [
2300 ArrowDataType::Int16,
2301 ArrowDataType::UInt16,
2302 ArrowDataType::Int32,
2303 ArrowDataType::UInt32,
2304 ArrowDataType::Int64,
2305 ArrowDataType::UInt64,
2306 ];
2307
2308 for key in &key_types {
2309 let data_type =
2310 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2311
2312 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2313 2,
2314 ConvertedType::UTF8,
2315 Some(data_type.clone()),
2316 move |vals| {
2317 let vals = string_converter::<i32>(vals);
2318 arrow::compute::cast(&vals, &data_type).unwrap()
2319 },
2320 encodings,
2321 );
2322
2323 }
2340 }
2341
2342 #[test]
2343 fn test_decimal_nullable_struct() {
2344 let decimals = Decimal256Array::from_iter_values(
2345 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2346 );
2347
2348 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2349 "decimals",
2350 decimals.data_type().clone(),
2351 false,
2352 )])))
2353 .len(8)
2354 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2355 .child_data(vec![decimals.into_data()])
2356 .build()
2357 .unwrap();
2358
2359 let written =
2360 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2361 .unwrap();
2362
2363 let mut buffer = Vec::with_capacity(1024);
2364 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2365 writer.write(&written).unwrap();
2366 writer.close().unwrap();
2367
2368 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2369 .unwrap()
2370 .collect::<Result<Vec<_>, _>>()
2371 .unwrap();
2372
2373 assert_eq!(&written.slice(0, 3), &read[0]);
2374 assert_eq!(&written.slice(3, 3), &read[1]);
2375 assert_eq!(&written.slice(6, 2), &read[2]);
2376 }
2377
2378 #[test]
2379 fn test_int32_nullable_struct() {
2380 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2381 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2382 "int32",
2383 int32.data_type().clone(),
2384 false,
2385 )])))
2386 .len(8)
2387 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2388 .child_data(vec![int32.into_data()])
2389 .build()
2390 .unwrap();
2391
2392 let written =
2393 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2394 .unwrap();
2395
2396 let mut buffer = Vec::with_capacity(1024);
2397 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2398 writer.write(&written).unwrap();
2399 writer.close().unwrap();
2400
2401 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2402 .unwrap()
2403 .collect::<Result<Vec<_>, _>>()
2404 .unwrap();
2405
2406 assert_eq!(&written.slice(0, 3), &read[0]);
2407 assert_eq!(&written.slice(3, 3), &read[1]);
2408 assert_eq!(&written.slice(6, 2), &read[2]);
2409 }
2410
2411 #[test]
2412 fn test_decimal_list() {
2413 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2414
2415 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2417 decimals.data_type().clone(),
2418 false,
2419 ))))
2420 .len(7)
2421 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2422 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2423 .child_data(vec![decimals.into_data()])
2424 .build()
2425 .unwrap();
2426
2427 let written =
2428 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2429 .unwrap();
2430
2431 let mut buffer = Vec::with_capacity(1024);
2432 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2433 writer.write(&written).unwrap();
2434 writer.close().unwrap();
2435
2436 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2437 .unwrap()
2438 .collect::<Result<Vec<_>, _>>()
2439 .unwrap();
2440
2441 assert_eq!(&written.slice(0, 3), &read[0]);
2442 assert_eq!(&written.slice(3, 3), &read[1]);
2443 assert_eq!(&written.slice(6, 1), &read[2]);
2444 }
2445
2446 #[test]
2447 fn test_read_decimal_file() {
2448 use arrow_array::Decimal128Array;
2449 let testdata = arrow::util::test_util::parquet_test_data();
2450 let file_variants = vec![
2451 ("byte_array", 4),
2452 ("fixed_length", 25),
2453 ("int32", 4),
2454 ("int64", 10),
2455 ];
2456 for (prefix, target_precision) in file_variants {
2457 let path = format!("{testdata}/{prefix}_decimal.parquet");
2458 let file = File::open(path).unwrap();
2459 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2460
2461 let batch = record_reader.next().unwrap().unwrap();
2462 assert_eq!(batch.num_rows(), 24);
2463 let col = batch
2464 .column(0)
2465 .as_any()
2466 .downcast_ref::<Decimal128Array>()
2467 .unwrap();
2468
2469 let expected = 1..25;
2470
2471 assert_eq!(col.precision(), target_precision);
2472 assert_eq!(col.scale(), 2);
2473
2474 for (i, v) in expected.enumerate() {
2475 assert_eq!(col.value(i), v * 100_i128);
2476 }
2477 }
2478 }
2479
2480 #[test]
2481 fn test_read_float16_nonzeros_file() {
2482 use arrow_array::Float16Array;
2483 let testdata = arrow::util::test_util::parquet_test_data();
2484 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2486 let file = File::open(path).unwrap();
2487 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2488
2489 let batch = record_reader.next().unwrap().unwrap();
2490 assert_eq!(batch.num_rows(), 8);
2491 let col = batch
2492 .column(0)
2493 .as_any()
2494 .downcast_ref::<Float16Array>()
2495 .unwrap();
2496
2497 let f16_two = f16::ONE + f16::ONE;
2498
2499 assert_eq!(col.null_count(), 1);
2500 assert!(col.is_null(0));
2501 assert_eq!(col.value(1), f16::ONE);
2502 assert_eq!(col.value(2), -f16_two);
2503 assert!(col.value(3).is_nan());
2504 assert_eq!(col.value(4), f16::ZERO);
2505 assert!(col.value(4).is_sign_positive());
2506 assert_eq!(col.value(5), f16::NEG_ONE);
2507 assert_eq!(col.value(6), f16::NEG_ZERO);
2508 assert!(col.value(6).is_sign_negative());
2509 assert_eq!(col.value(7), f16_two);
2510 }
2511
2512 #[test]
2513 fn test_read_float16_zeros_file() {
2514 use arrow_array::Float16Array;
2515 let testdata = arrow::util::test_util::parquet_test_data();
2516 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2518 let file = File::open(path).unwrap();
2519 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2520
2521 let batch = record_reader.next().unwrap().unwrap();
2522 assert_eq!(batch.num_rows(), 3);
2523 let col = batch
2524 .column(0)
2525 .as_any()
2526 .downcast_ref::<Float16Array>()
2527 .unwrap();
2528
2529 assert_eq!(col.null_count(), 1);
2530 assert!(col.is_null(0));
2531 assert_eq!(col.value(1), f16::ZERO);
2532 assert!(col.value(1).is_sign_positive());
2533 assert!(col.value(2).is_nan());
2534 }
2535
2536 #[test]
2537 fn test_read_float32_float64_byte_stream_split() {
2538 let path = format!(
2539 "{}/byte_stream_split.zstd.parquet",
2540 arrow::util::test_util::parquet_test_data(),
2541 );
2542 let file = File::open(path).unwrap();
2543 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2544
2545 let mut row_count = 0;
2546 for batch in record_reader {
2547 let batch = batch.unwrap();
2548 row_count += batch.num_rows();
2549 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2550 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2551
2552 for &x in f32_col.values() {
2554 assert!(x > -10.0);
2555 assert!(x < 10.0);
2556 }
2557 for &x in f64_col.values() {
2558 assert!(x > -10.0);
2559 assert!(x < 10.0);
2560 }
2561 }
2562 assert_eq!(row_count, 300);
2563 }
2564
2565 #[test]
2566 fn test_read_extended_byte_stream_split() {
2567 let path = format!(
2568 "{}/byte_stream_split_extended.gzip.parquet",
2569 arrow::util::test_util::parquet_test_data(),
2570 );
2571 let file = File::open(path).unwrap();
2572 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2573
2574 let mut row_count = 0;
2575 for batch in record_reader {
2576 let batch = batch.unwrap();
2577 row_count += batch.num_rows();
2578
2579 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2581 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2582 assert_eq!(f16_col.len(), f16_bss.len());
2583 f16_col
2584 .iter()
2585 .zip(f16_bss.iter())
2586 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2587
2588 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2590 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2591 assert_eq!(f32_col.len(), f32_bss.len());
2592 f32_col
2593 .iter()
2594 .zip(f32_bss.iter())
2595 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2596
2597 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2599 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2600 assert_eq!(f64_col.len(), f64_bss.len());
2601 f64_col
2602 .iter()
2603 .zip(f64_bss.iter())
2604 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2605
2606 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2608 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2609 assert_eq!(i32_col.len(), i32_bss.len());
2610 i32_col
2611 .iter()
2612 .zip(i32_bss.iter())
2613 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2614
2615 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2617 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2618 assert_eq!(i64_col.len(), i64_bss.len());
2619 i64_col
2620 .iter()
2621 .zip(i64_bss.iter())
2622 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2623
2624 let flba_col = batch.column(10).as_fixed_size_binary();
2626 let flba_bss = batch.column(11).as_fixed_size_binary();
2627 assert_eq!(flba_col.len(), flba_bss.len());
2628 flba_col
2629 .iter()
2630 .zip(flba_bss.iter())
2631 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2632
2633 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2635 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2636 assert_eq!(dec_col.len(), dec_bss.len());
2637 dec_col
2638 .iter()
2639 .zip(dec_bss.iter())
2640 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2641 }
2642 assert_eq!(row_count, 200);
2643 }
2644
2645 #[test]
2646 fn test_read_incorrect_map_schema_file() {
2647 let testdata = arrow::util::test_util::parquet_test_data();
2648 let path = format!("{testdata}/incorrect_map_schema.parquet");
2650 let file = File::open(path).unwrap();
2651 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2652
2653 let batch = record_reader.next().unwrap().unwrap();
2654 assert_eq!(batch.num_rows(), 1);
2655
2656 let expected_schema = Schema::new(vec![Field::new(
2657 "my_map",
2658 ArrowDataType::Map(
2659 Arc::new(Field::new(
2660 "key_value",
2661 ArrowDataType::Struct(Fields::from(vec![
2662 Field::new("key", ArrowDataType::Utf8, false),
2663 Field::new("value", ArrowDataType::Utf8, true),
2664 ])),
2665 false,
2666 )),
2667 false,
2668 ),
2669 true,
2670 )]);
2671 assert_eq!(batch.schema().as_ref(), &expected_schema);
2672
2673 assert_eq!(batch.num_rows(), 1);
2674 assert_eq!(batch.column(0).null_count(), 0);
2675 assert_eq!(
2676 batch.column(0).as_map().keys().as_ref(),
2677 &StringArray::from(vec!["parent", "name"])
2678 );
2679 assert_eq!(
2680 batch.column(0).as_map().values().as_ref(),
2681 &StringArray::from(vec!["another", "report"])
2682 );
2683 }
2684
2685 #[test]
2686 fn test_read_dict_fixed_size_binary() {
2687 let schema = Arc::new(Schema::new(vec![Field::new(
2688 "a",
2689 ArrowDataType::Dictionary(
2690 Box::new(ArrowDataType::UInt8),
2691 Box::new(ArrowDataType::FixedSizeBinary(8)),
2692 ),
2693 true,
2694 )]));
2695 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2696 let values = FixedSizeBinaryArray::try_from_iter(
2697 vec![
2698 (0u8..8u8).collect::<Vec<u8>>(),
2699 (24u8..32u8).collect::<Vec<u8>>(),
2700 ]
2701 .into_iter(),
2702 )
2703 .unwrap();
2704 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2705 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2706
2707 let mut buffer = Vec::with_capacity(1024);
2708 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2709 writer.write(&batch).unwrap();
2710 writer.close().unwrap();
2711 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2712 .unwrap()
2713 .collect::<Result<Vec<_>, _>>()
2714 .unwrap();
2715
2716 assert_eq!(read.len(), 1);
2717 assert_eq!(&batch, &read[0])
2718 }
2719
2720 #[test]
2721 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2722 let struct_fields = Fields::from(vec![
2729 Field::new(
2730 "city",
2731 ArrowDataType::Dictionary(
2732 Box::new(ArrowDataType::UInt8),
2733 Box::new(ArrowDataType::Utf8),
2734 ),
2735 true,
2736 ),
2737 Field::new("name", ArrowDataType::Utf8, true),
2738 ]);
2739 let schema = Arc::new(Schema::new(vec![Field::new(
2740 "items",
2741 ArrowDataType::Struct(struct_fields.clone()),
2742 true,
2743 )]));
2744
2745 let items_arr = StructArray::new(
2746 struct_fields,
2747 vec![
2748 Arc::new(DictionaryArray::new(
2749 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2750 Arc::new(StringArray::from_iter_values(vec![
2751 "quebec",
2752 "fredericton",
2753 "halifax",
2754 ])),
2755 )),
2756 Arc::new(StringArray::from_iter_values(vec![
2757 "albert", "terry", "lance", "", "tim",
2758 ])),
2759 ],
2760 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2761 );
2762
2763 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2764 let mut buffer = Vec::with_capacity(1024);
2765 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2766 writer.write(&batch).unwrap();
2767 writer.close().unwrap();
2768 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2769 .unwrap()
2770 .collect::<Result<Vec<_>, _>>()
2771 .unwrap();
2772
2773 assert_eq!(read.len(), 1);
2774 assert_eq!(&batch, &read[0])
2775 }
2776
2777 #[derive(Clone)]
2779 struct TestOptions {
2780 num_row_groups: usize,
2783 num_rows: usize,
2785 record_batch_size: usize,
2787 null_percent: Option<usize>,
2789 write_batch_size: usize,
2794 max_data_page_size: usize,
2796 max_dict_page_size: usize,
2798 writer_version: WriterVersion,
2800 enabled_statistics: EnabledStatistics,
2802 encoding: Encoding,
2804 row_selections: Option<(RowSelection, usize)>,
2806 row_filter: Option<Vec<bool>>,
2808 limit: Option<usize>,
2810 offset: Option<usize>,
2812 }
2813
2814 impl std::fmt::Debug for TestOptions {
2816 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2817 f.debug_struct("TestOptions")
2818 .field("num_row_groups", &self.num_row_groups)
2819 .field("num_rows", &self.num_rows)
2820 .field("record_batch_size", &self.record_batch_size)
2821 .field("null_percent", &self.null_percent)
2822 .field("write_batch_size", &self.write_batch_size)
2823 .field("max_data_page_size", &self.max_data_page_size)
2824 .field("max_dict_page_size", &self.max_dict_page_size)
2825 .field("writer_version", &self.writer_version)
2826 .field("enabled_statistics", &self.enabled_statistics)
2827 .field("encoding", &self.encoding)
2828 .field("row_selections", &self.row_selections.is_some())
2829 .field("row_filter", &self.row_filter.is_some())
2830 .field("limit", &self.limit)
2831 .field("offset", &self.offset)
2832 .finish()
2833 }
2834 }
2835
2836 impl Default for TestOptions {
2837 fn default() -> Self {
2838 Self {
2839 num_row_groups: 2,
2840 num_rows: 100,
2841 record_batch_size: 15,
2842 null_percent: None,
2843 write_batch_size: 64,
2844 max_data_page_size: 1024 * 1024,
2845 max_dict_page_size: 1024 * 1024,
2846 writer_version: WriterVersion::PARQUET_1_0,
2847 enabled_statistics: EnabledStatistics::Page,
2848 encoding: Encoding::PLAIN,
2849 row_selections: None,
2850 row_filter: None,
2851 limit: None,
2852 offset: None,
2853 }
2854 }
2855 }
2856
2857 impl TestOptions {
2858 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2859 Self {
2860 num_row_groups,
2861 num_rows,
2862 record_batch_size,
2863 ..Default::default()
2864 }
2865 }
2866
2867 fn with_null_percent(self, null_percent: usize) -> Self {
2868 Self {
2869 null_percent: Some(null_percent),
2870 ..self
2871 }
2872 }
2873
2874 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2875 Self {
2876 max_data_page_size,
2877 ..self
2878 }
2879 }
2880
2881 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2882 Self {
2883 max_dict_page_size,
2884 ..self
2885 }
2886 }
2887
2888 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2889 Self {
2890 enabled_statistics,
2891 ..self
2892 }
2893 }
2894
2895 fn with_row_selections(self) -> Self {
2896 assert!(self.row_filter.is_none(), "Must set row selection first");
2897
2898 let mut rng = rng();
2899 let step = rng.random_range(self.record_batch_size..self.num_rows);
2900 let row_selections = create_test_selection(
2901 step,
2902 self.num_row_groups * self.num_rows,
2903 rng.random::<bool>(),
2904 );
2905 Self {
2906 row_selections: Some(row_selections),
2907 ..self
2908 }
2909 }
2910
2911 fn with_row_filter(self) -> Self {
2912 let row_count = match &self.row_selections {
2913 Some((_, count)) => *count,
2914 None => self.num_row_groups * self.num_rows,
2915 };
2916
2917 let mut rng = rng();
2918 Self {
2919 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2920 ..self
2921 }
2922 }
2923
2924 fn with_limit(self, limit: usize) -> Self {
2925 Self {
2926 limit: Some(limit),
2927 ..self
2928 }
2929 }
2930
2931 fn with_offset(self, offset: usize) -> Self {
2932 Self {
2933 offset: Some(offset),
2934 ..self
2935 }
2936 }
2937
2938 fn writer_props(&self) -> WriterProperties {
2939 let builder = WriterProperties::builder()
2940 .set_data_page_size_limit(self.max_data_page_size)
2941 .set_write_batch_size(self.write_batch_size)
2942 .set_writer_version(self.writer_version)
2943 .set_statistics_enabled(self.enabled_statistics);
2944
2945 let builder = match self.encoding {
2946 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2947 .set_dictionary_enabled(true)
2948 .set_dictionary_page_size_limit(self.max_dict_page_size),
2949 _ => builder
2950 .set_dictionary_enabled(false)
2951 .set_encoding(self.encoding),
2952 };
2953
2954 builder.build()
2955 }
2956 }
2957
2958 fn run_single_column_reader_tests<T, F, G>(
2965 rand_max: i32,
2966 converted_type: ConvertedType,
2967 arrow_type: Option<ArrowDataType>,
2968 converter: F,
2969 encodings: &[Encoding],
2970 ) where
2971 T: DataType,
2972 G: RandGen<T>,
2973 F: Fn(&[Option<T::T>]) -> ArrayRef,
2974 {
2975 let all_options = vec![
2976 TestOptions::new(2, 100, 15),
2979 TestOptions::new(3, 25, 5),
2984 TestOptions::new(4, 100, 25),
2988 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2990 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2992 TestOptions::new(2, 256, 127).with_null_percent(0),
2994 TestOptions::new(2, 256, 93).with_null_percent(25),
2996 TestOptions::new(4, 100, 25).with_limit(0),
2998 TestOptions::new(4, 100, 25).with_limit(50),
3000 TestOptions::new(4, 100, 25).with_limit(10),
3002 TestOptions::new(4, 100, 25).with_limit(101),
3004 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3006 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3008 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3010 TestOptions::new(2, 256, 91)
3012 .with_null_percent(25)
3013 .with_enabled_statistics(EnabledStatistics::Chunk),
3014 TestOptions::new(2, 256, 91)
3016 .with_null_percent(25)
3017 .with_enabled_statistics(EnabledStatistics::None),
3018 TestOptions::new(2, 128, 91)
3020 .with_null_percent(100)
3021 .with_enabled_statistics(EnabledStatistics::None),
3022 TestOptions::new(2, 100, 15).with_row_selections(),
3027 TestOptions::new(3, 25, 5).with_row_selections(),
3032 TestOptions::new(4, 100, 25).with_row_selections(),
3036 TestOptions::new(3, 256, 73)
3038 .with_max_data_page_size(128)
3039 .with_row_selections(),
3040 TestOptions::new(3, 256, 57)
3042 .with_max_dict_page_size(128)
3043 .with_row_selections(),
3044 TestOptions::new(2, 256, 127)
3046 .with_null_percent(0)
3047 .with_row_selections(),
3048 TestOptions::new(2, 256, 93)
3050 .with_null_percent(25)
3051 .with_row_selections(),
3052 TestOptions::new(2, 256, 93)
3054 .with_null_percent(25)
3055 .with_row_selections()
3056 .with_limit(10),
3057 TestOptions::new(2, 256, 93)
3059 .with_null_percent(25)
3060 .with_row_selections()
3061 .with_offset(20)
3062 .with_limit(10),
3063 TestOptions::new(4, 100, 25).with_row_filter(),
3067 TestOptions::new(4, 100, 25)
3069 .with_row_selections()
3070 .with_row_filter(),
3071 TestOptions::new(2, 256, 93)
3073 .with_null_percent(25)
3074 .with_max_data_page_size(10)
3075 .with_row_filter(),
3076 TestOptions::new(2, 256, 93)
3078 .with_null_percent(25)
3079 .with_max_data_page_size(10)
3080 .with_row_selections()
3081 .with_row_filter(),
3082 TestOptions::new(2, 256, 93)
3084 .with_enabled_statistics(EnabledStatistics::None)
3085 .with_max_data_page_size(10)
3086 .with_row_selections(),
3087 ];
3088
3089 all_options.into_iter().for_each(|opts| {
3090 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3091 for encoding in encodings {
3092 let opts = TestOptions {
3093 writer_version,
3094 encoding: *encoding,
3095 ..opts.clone()
3096 };
3097
3098 single_column_reader_test::<T, _, G>(
3099 opts,
3100 rand_max,
3101 converted_type,
3102 arrow_type.clone(),
3103 &converter,
3104 )
3105 }
3106 }
3107 });
3108 }
3109
3110 fn single_column_reader_test<T, F, G>(
3114 opts: TestOptions,
3115 rand_max: i32,
3116 converted_type: ConvertedType,
3117 arrow_type: Option<ArrowDataType>,
3118 converter: F,
3119 ) where
3120 T: DataType,
3121 G: RandGen<T>,
3122 F: Fn(&[Option<T::T>]) -> ArrayRef,
3123 {
3124 println!(
3126 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3127 T::get_physical_type(),
3128 converted_type,
3129 arrow_type,
3130 opts
3131 );
3132
3133 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3135 Some(null_percent) => {
3136 let mut rng = rng();
3137
3138 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3139 .map(|_| {
3140 std::iter::from_fn(|| {
3141 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3142 })
3143 .take(opts.num_rows)
3144 .collect()
3145 })
3146 .collect();
3147 (Repetition::OPTIONAL, Some(def_levels))
3148 }
3149 None => (Repetition::REQUIRED, None),
3150 };
3151
3152 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3154 .map(|idx| {
3155 let null_count = match def_levels.as_ref() {
3156 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3157 None => 0,
3158 };
3159 G::gen_vec(rand_max, opts.num_rows - null_count)
3160 })
3161 .collect();
3162
3163 let len = match T::get_physical_type() {
3164 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3165 crate::basic::Type::INT96 => 12,
3166 _ => -1,
3167 };
3168
3169 let fields = vec![Arc::new(
3170 Type::primitive_type_builder("leaf", T::get_physical_type())
3171 .with_repetition(repetition)
3172 .with_converted_type(converted_type)
3173 .with_length(len)
3174 .build()
3175 .unwrap(),
3176 )];
3177
3178 let schema = Arc::new(
3179 Type::group_type_builder("test_schema")
3180 .with_fields(fields)
3181 .build()
3182 .unwrap(),
3183 );
3184
3185 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3186
3187 let mut file = tempfile::tempfile().unwrap();
3188
3189 generate_single_column_file_with_data::<T>(
3190 &values,
3191 def_levels.as_ref(),
3192 file.try_clone().unwrap(), schema,
3194 arrow_field,
3195 &opts,
3196 )
3197 .unwrap();
3198
3199 file.rewind().unwrap();
3200
3201 let options = ArrowReaderOptions::new()
3202 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
3203
3204 let mut builder =
3205 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3206
3207 let expected_data = match opts.row_selections {
3208 Some((selections, row_count)) => {
3209 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3210
3211 let mut skip_data: Vec<Option<T::T>> = vec![];
3212 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3213 for select in dequeue {
3214 if select.skip {
3215 without_skip_data.drain(0..select.row_count);
3216 } else {
3217 skip_data.extend(without_skip_data.drain(0..select.row_count));
3218 }
3219 }
3220 builder = builder.with_row_selection(selections);
3221
3222 assert_eq!(skip_data.len(), row_count);
3223 skip_data
3224 }
3225 None => {
3226 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3228 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3229 expected_data
3230 }
3231 };
3232
3233 let mut expected_data = match opts.row_filter {
3234 Some(filter) => {
3235 let expected_data = expected_data
3236 .into_iter()
3237 .zip(filter.iter())
3238 .filter_map(|(d, f)| f.then(|| d))
3239 .collect();
3240
3241 let mut filter_offset = 0;
3242 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3243 ProjectionMask::all(),
3244 move |b| {
3245 let array = BooleanArray::from_iter(
3246 filter
3247 .iter()
3248 .skip(filter_offset)
3249 .take(b.num_rows())
3250 .map(|x| Some(*x)),
3251 );
3252 filter_offset += b.num_rows();
3253 Ok(array)
3254 },
3255 ))]);
3256
3257 builder = builder.with_row_filter(filter);
3258 expected_data
3259 }
3260 None => expected_data,
3261 };
3262
3263 if let Some(offset) = opts.offset {
3264 builder = builder.with_offset(offset);
3265 expected_data = expected_data.into_iter().skip(offset).collect();
3266 }
3267
3268 if let Some(limit) = opts.limit {
3269 builder = builder.with_limit(limit);
3270 expected_data = expected_data.into_iter().take(limit).collect();
3271 }
3272
3273 let mut record_reader = builder
3274 .with_batch_size(opts.record_batch_size)
3275 .build()
3276 .unwrap();
3277
3278 let mut total_read = 0;
3279 loop {
3280 let maybe_batch = record_reader.next();
3281 if total_read < expected_data.len() {
3282 let end = min(total_read + opts.record_batch_size, expected_data.len());
3283 let batch = maybe_batch.unwrap().unwrap();
3284 assert_eq!(end - total_read, batch.num_rows());
3285
3286 let a = converter(&expected_data[total_read..end]);
3287 let b = batch.column(0);
3288
3289 assert_eq!(a.data_type(), b.data_type());
3290 assert_eq!(a.to_data(), b.to_data());
3291 assert_eq!(
3292 a.as_any().type_id(),
3293 b.as_any().type_id(),
3294 "incorrect type ids"
3295 );
3296
3297 total_read = end;
3298 } else {
3299 assert!(maybe_batch.is_none());
3300 break;
3301 }
3302 }
3303 }
3304
3305 fn gen_expected_data<T: DataType>(
3306 def_levels: Option<&Vec<Vec<i16>>>,
3307 values: &[Vec<T::T>],
3308 ) -> Vec<Option<T::T>> {
3309 let data: Vec<Option<T::T>> = match def_levels {
3310 Some(levels) => {
3311 let mut values_iter = values.iter().flatten();
3312 levels
3313 .iter()
3314 .flatten()
3315 .map(|d| match d {
3316 1 => Some(values_iter.next().cloned().unwrap()),
3317 0 => None,
3318 _ => unreachable!(),
3319 })
3320 .collect()
3321 }
3322 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3323 };
3324 data
3325 }
3326
3327 fn generate_single_column_file_with_data<T: DataType>(
3328 values: &[Vec<T::T>],
3329 def_levels: Option<&Vec<Vec<i16>>>,
3330 file: File,
3331 schema: TypePtr,
3332 field: Option<Field>,
3333 opts: &TestOptions,
3334 ) -> Result<ParquetMetaData> {
3335 let mut writer_props = opts.writer_props();
3336 if let Some(field) = field {
3337 let arrow_schema = Schema::new(vec![field]);
3338 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3339 }
3340
3341 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3342
3343 for (idx, v) in values.iter().enumerate() {
3344 let def_levels = def_levels.map(|d| d[idx].as_slice());
3345 let mut row_group_writer = writer.next_row_group()?;
3346 {
3347 let mut column_writer = row_group_writer
3348 .next_column()?
3349 .expect("Column writer is none!");
3350
3351 column_writer
3352 .typed::<T>()
3353 .write_batch(v, def_levels, None)?;
3354
3355 column_writer.close()?;
3356 }
3357 row_group_writer.close()?;
3358 }
3359
3360 writer.close()
3361 }
3362
3363 fn get_test_file(file_name: &str) -> File {
3364 let mut path = PathBuf::new();
3365 path.push(arrow::util::test_util::arrow_test_data());
3366 path.push(file_name);
3367
3368 File::open(path.as_path()).expect("File not found!")
3369 }
3370
3371 #[test]
3372 fn test_read_structs() {
3373 let testdata = arrow::util::test_util::parquet_test_data();
3377 let path = format!("{testdata}/nested_structs.rust.parquet");
3378 let file = File::open(&path).unwrap();
3379 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3380
3381 for batch in record_batch_reader {
3382 batch.unwrap();
3383 }
3384
3385 let file = File::open(&path).unwrap();
3386 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3387
3388 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3389 let projected_reader = builder
3390 .with_projection(mask)
3391 .with_batch_size(60)
3392 .build()
3393 .unwrap();
3394
3395 let expected_schema = Schema::new(vec![
3396 Field::new(
3397 "roll_num",
3398 ArrowDataType::Struct(Fields::from(vec![Field::new(
3399 "count",
3400 ArrowDataType::UInt64,
3401 false,
3402 )])),
3403 false,
3404 ),
3405 Field::new(
3406 "PC_CUR",
3407 ArrowDataType::Struct(Fields::from(vec![
3408 Field::new("mean", ArrowDataType::Int64, false),
3409 Field::new("sum", ArrowDataType::Int64, false),
3410 ])),
3411 false,
3412 ),
3413 ]);
3414
3415 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3417
3418 for batch in projected_reader {
3419 let batch = batch.unwrap();
3420 assert_eq!(batch.schema().as_ref(), &expected_schema);
3421 }
3422 }
3423
3424 #[test]
3425 fn test_read_structs_by_name() {
3427 let testdata = arrow::util::test_util::parquet_test_data();
3428 let path = format!("{testdata}/nested_structs.rust.parquet");
3429 let file = File::open(&path).unwrap();
3430 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3431
3432 for batch in record_batch_reader {
3433 batch.unwrap();
3434 }
3435
3436 let file = File::open(&path).unwrap();
3437 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3438
3439 let mask = ProjectionMask::columns(
3440 builder.parquet_schema(),
3441 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3442 );
3443 let projected_reader = builder
3444 .with_projection(mask)
3445 .with_batch_size(60)
3446 .build()
3447 .unwrap();
3448
3449 let expected_schema = Schema::new(vec![
3450 Field::new(
3451 "roll_num",
3452 ArrowDataType::Struct(Fields::from(vec![Field::new(
3453 "count",
3454 ArrowDataType::UInt64,
3455 false,
3456 )])),
3457 false,
3458 ),
3459 Field::new(
3460 "PC_CUR",
3461 ArrowDataType::Struct(Fields::from(vec![
3462 Field::new("mean", ArrowDataType::Int64, false),
3463 Field::new("sum", ArrowDataType::Int64, false),
3464 ])),
3465 false,
3466 ),
3467 ]);
3468
3469 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3470
3471 for batch in projected_reader {
3472 let batch = batch.unwrap();
3473 assert_eq!(batch.schema().as_ref(), &expected_schema);
3474 }
3475 }
3476
3477 #[test]
3478 fn test_read_maps() {
3479 let testdata = arrow::util::test_util::parquet_test_data();
3480 let path = format!("{testdata}/nested_maps.snappy.parquet");
3481 let file = File::open(path).unwrap();
3482 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3483
3484 for batch in record_batch_reader {
3485 batch.unwrap();
3486 }
3487 }
3488
3489 #[test]
3490 fn test_nested_nullability() {
3491 let message_type = "message nested {
3492 OPTIONAL Group group {
3493 REQUIRED INT32 leaf;
3494 }
3495 }";
3496
3497 let file = tempfile::tempfile().unwrap();
3498 let schema = Arc::new(parse_message_type(message_type).unwrap());
3499
3500 {
3501 let mut writer =
3503 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3504 .unwrap();
3505
3506 {
3507 let mut row_group_writer = writer.next_row_group().unwrap();
3508 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3509
3510 column_writer
3511 .typed::<Int32Type>()
3512 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3513 .unwrap();
3514
3515 column_writer.close().unwrap();
3516 row_group_writer.close().unwrap();
3517 }
3518
3519 writer.close().unwrap();
3520 }
3521
3522 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3523 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3524
3525 let reader = builder.with_projection(mask).build().unwrap();
3526
3527 let expected_schema = Schema::new(vec![Field::new(
3528 "group",
3529 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3530 true,
3531 )]);
3532
3533 let batch = reader.into_iter().next().unwrap().unwrap();
3534 assert_eq!(batch.schema().as_ref(), &expected_schema);
3535 assert_eq!(batch.num_rows(), 4);
3536 assert_eq!(batch.column(0).null_count(), 2);
3537 }
3538
3539 #[test]
3540 fn test_invalid_utf8() {
3541 let data = vec![
3543 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3544 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3545 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3546 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3547 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3548 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3549 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3550 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3551 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3552 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3553 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3554 82, 49,
3555 ];
3556
3557 let file = Bytes::from(data);
3558 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3559
3560 let error = record_batch_reader.next().unwrap().unwrap_err();
3561
3562 assert!(
3563 error.to_string().contains("invalid utf-8 sequence"),
3564 "{}",
3565 error
3566 );
3567 }
3568
3569 #[test]
3570 fn test_invalid_utf8_string_array() {
3571 test_invalid_utf8_string_array_inner::<i32>();
3572 }
3573
3574 #[test]
3575 fn test_invalid_utf8_large_string_array() {
3576 test_invalid_utf8_string_array_inner::<i64>();
3577 }
3578
3579 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3580 let cases = [
3581 invalid_utf8_first_char::<O>(),
3582 invalid_utf8_first_char_long_strings::<O>(),
3583 invalid_utf8_later_char::<O>(),
3584 invalid_utf8_later_char_long_strings::<O>(),
3585 invalid_utf8_later_char_really_long_strings::<O>(),
3586 invalid_utf8_later_char_really_long_strings2::<O>(),
3587 ];
3588 for array in &cases {
3589 for encoding in STRING_ENCODINGS {
3590 let array = unsafe {
3593 GenericStringArray::<O>::new_unchecked(
3594 array.offsets().clone(),
3595 array.values().clone(),
3596 array.nulls().cloned(),
3597 )
3598 };
3599 let data_type = array.data_type().clone();
3600 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3601 let err = read_from_parquet(data).unwrap_err();
3602 let expected_err =
3603 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3604 assert!(
3605 err.to_string().contains(expected_err),
3606 "data type: {data_type}, expected: {expected_err}, got: {err}"
3607 );
3608 }
3609 }
3610 }
3611
3612 #[test]
3613 fn test_invalid_utf8_string_view_array() {
3614 let cases = [
3615 invalid_utf8_first_char::<i32>(),
3616 invalid_utf8_first_char_long_strings::<i32>(),
3617 invalid_utf8_later_char::<i32>(),
3618 invalid_utf8_later_char_long_strings::<i32>(),
3619 invalid_utf8_later_char_really_long_strings::<i32>(),
3620 invalid_utf8_later_char_really_long_strings2::<i32>(),
3621 ];
3622
3623 for encoding in STRING_ENCODINGS {
3624 for array in &cases {
3625 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3626 let array = array.as_binary_view();
3627
3628 let array = unsafe {
3631 StringViewArray::new_unchecked(
3632 array.views().clone(),
3633 array.data_buffers().to_vec(),
3634 array.nulls().cloned(),
3635 )
3636 };
3637
3638 let data_type = array.data_type().clone();
3639 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3640 let err = read_from_parquet(data).unwrap_err();
3641 let expected_err =
3642 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3643 assert!(
3644 err.to_string().contains(expected_err),
3645 "data type: {data_type}, expected: {expected_err}, got: {err}"
3646 );
3647 }
3648 }
3649 }
3650
3651 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3653 None,
3654 Some(Encoding::PLAIN),
3655 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3656 Some(Encoding::DELTA_BYTE_ARRAY),
3657 ];
3658
3659 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3662
3663 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3666
3667 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3669 let valid: &[u8] = b" ";
3670 let invalid = INVALID_UTF8_FIRST_CHAR;
3671 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3672 }
3673
3674 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3678 let valid: &[u8] = b" ";
3679 let mut invalid = vec![];
3680 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3681 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3682 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3683 }
3684
3685 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3688 let valid: &[u8] = b" ";
3689 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3690 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3691 }
3692
3693 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3697 let valid: &[u8] = b" ";
3698 let mut invalid = vec![];
3699 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3700 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3701 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3702 }
3703
3704 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3708 let valid: &[u8] = b" ";
3709 let mut invalid = vec![];
3710 for _ in 0..10 {
3711 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3713 }
3714 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3715 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3716 }
3717
3718 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3721 let valid: &[u8] = b" ";
3722 let mut valid_long = vec![];
3723 for _ in 0..10 {
3724 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3726 }
3727 let invalid = INVALID_UTF8_LATER_CHAR;
3728 GenericBinaryArray::<O>::from_iter(vec![
3729 None,
3730 Some(valid),
3731 Some(invalid),
3732 None,
3733 Some(&valid_long),
3734 Some(valid),
3735 ])
3736 }
3737
3738 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3743 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3744 let mut data = vec![];
3745 let schema = batch.schema();
3746 let props = encoding.map(|encoding| {
3747 WriterProperties::builder()
3748 .set_dictionary_enabled(false)
3750 .set_encoding(encoding)
3751 .build()
3752 });
3753
3754 {
3755 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3756 writer.write(&batch).unwrap();
3757 writer.flush().unwrap();
3758 writer.close().unwrap();
3759 };
3760 data
3761 }
3762
3763 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3765 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3766 .unwrap()
3767 .build()
3768 .unwrap();
3769
3770 reader.collect()
3771 }
3772
3773 #[test]
3774 fn test_dictionary_preservation() {
3775 let fields = vec![Arc::new(
3776 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3777 .with_repetition(Repetition::OPTIONAL)
3778 .with_converted_type(ConvertedType::UTF8)
3779 .build()
3780 .unwrap(),
3781 )];
3782
3783 let schema = Arc::new(
3784 Type::group_type_builder("test_schema")
3785 .with_fields(fields)
3786 .build()
3787 .unwrap(),
3788 );
3789
3790 let dict_type = ArrowDataType::Dictionary(
3791 Box::new(ArrowDataType::Int32),
3792 Box::new(ArrowDataType::Utf8),
3793 );
3794
3795 let arrow_field = Field::new("leaf", dict_type, true);
3796
3797 let mut file = tempfile::tempfile().unwrap();
3798
3799 let values = vec![
3800 vec![
3801 ByteArray::from("hello"),
3802 ByteArray::from("a"),
3803 ByteArray::from("b"),
3804 ByteArray::from("d"),
3805 ],
3806 vec![
3807 ByteArray::from("c"),
3808 ByteArray::from("a"),
3809 ByteArray::from("b"),
3810 ],
3811 ];
3812
3813 let def_levels = vec![
3814 vec![1, 0, 0, 1, 0, 0, 1, 1],
3815 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3816 ];
3817
3818 let opts = TestOptions {
3819 encoding: Encoding::RLE_DICTIONARY,
3820 ..Default::default()
3821 };
3822
3823 generate_single_column_file_with_data::<ByteArrayType>(
3824 &values,
3825 Some(&def_levels),
3826 file.try_clone().unwrap(), schema,
3828 Some(arrow_field),
3829 &opts,
3830 )
3831 .unwrap();
3832
3833 file.rewind().unwrap();
3834
3835 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3836
3837 let batches = record_reader
3838 .collect::<Result<Vec<RecordBatch>, _>>()
3839 .unwrap();
3840
3841 assert_eq!(batches.len(), 6);
3842 assert!(batches.iter().all(|x| x.num_columns() == 1));
3843
3844 let row_counts = batches
3845 .iter()
3846 .map(|x| (x.num_rows(), x.column(0).null_count()))
3847 .collect::<Vec<_>>();
3848
3849 assert_eq!(
3850 row_counts,
3851 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3852 );
3853
3854 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3855
3856 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3858 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3860 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3861 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3863 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3864 }
3865
3866 #[test]
3867 fn test_read_null_list() {
3868 let testdata = arrow::util::test_util::parquet_test_data();
3869 let path = format!("{testdata}/null_list.parquet");
3870 let file = File::open(path).unwrap();
3871 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3872
3873 let batch = record_batch_reader.next().unwrap().unwrap();
3874 assert_eq!(batch.num_rows(), 1);
3875 assert_eq!(batch.num_columns(), 1);
3876 assert_eq!(batch.column(0).len(), 1);
3877
3878 let list = batch
3879 .column(0)
3880 .as_any()
3881 .downcast_ref::<ListArray>()
3882 .unwrap();
3883 assert_eq!(list.len(), 1);
3884 assert!(list.is_valid(0));
3885
3886 let val = list.value(0);
3887 assert_eq!(val.len(), 0);
3888 }
3889
3890 #[test]
3891 fn test_null_schema_inference() {
3892 let testdata = arrow::util::test_util::parquet_test_data();
3893 let path = format!("{testdata}/null_list.parquet");
3894 let file = File::open(path).unwrap();
3895
3896 let arrow_field = Field::new(
3897 "emptylist",
3898 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3899 true,
3900 );
3901
3902 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3903 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3904 let schema = builder.schema();
3905 assert_eq!(schema.fields().len(), 1);
3906 assert_eq!(schema.field(0), &arrow_field);
3907 }
3908
3909 #[test]
3910 fn test_skip_metadata() {
3911 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3912 let field = Field::new("col", col.data_type().clone(), true);
3913
3914 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3915
3916 let metadata = [("key".to_string(), "value".to_string())]
3917 .into_iter()
3918 .collect();
3919
3920 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3921
3922 assert_ne!(schema_with_metadata, schema_without_metadata);
3923
3924 let batch =
3925 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3926
3927 let file = |version: WriterVersion| {
3928 let props = WriterProperties::builder()
3929 .set_writer_version(version)
3930 .build();
3931
3932 let file = tempfile().unwrap();
3933 let mut writer =
3934 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3935 .unwrap();
3936 writer.write(&batch).unwrap();
3937 writer.close().unwrap();
3938 file
3939 };
3940
3941 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3942
3943 let v1_reader = file(WriterVersion::PARQUET_1_0);
3944 let v2_reader = file(WriterVersion::PARQUET_2_0);
3945
3946 let arrow_reader =
3947 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3948 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3949
3950 let reader =
3951 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3952 .unwrap()
3953 .build()
3954 .unwrap();
3955 assert_eq!(reader.schema(), schema_without_metadata);
3956
3957 let arrow_reader =
3958 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3959 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3960
3961 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3962 .unwrap()
3963 .build()
3964 .unwrap();
3965 assert_eq!(reader.schema(), schema_without_metadata);
3966 }
3967
3968 fn write_parquet_from_iter<I, F>(value: I) -> File
3969 where
3970 I: IntoIterator<Item = (F, ArrayRef)>,
3971 F: AsRef<str>,
3972 {
3973 let batch = RecordBatch::try_from_iter(value).unwrap();
3974 let file = tempfile().unwrap();
3975 let mut writer =
3976 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3977 writer.write(&batch).unwrap();
3978 writer.close().unwrap();
3979 file
3980 }
3981
3982 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3983 where
3984 I: IntoIterator<Item = (F, ArrayRef)>,
3985 F: AsRef<str>,
3986 {
3987 let file = write_parquet_from_iter(value);
3988 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3989 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3990 file.try_clone().unwrap(),
3991 options_with_schema,
3992 );
3993 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3994 }
3995
3996 #[test]
3997 fn test_schema_too_few_columns() {
3998 run_schema_test_with_error(
3999 vec![
4000 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4001 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4002 ],
4003 Arc::new(Schema::new(vec![Field::new(
4004 "int64",
4005 ArrowDataType::Int64,
4006 false,
4007 )])),
4008 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4009 );
4010 }
4011
4012 #[test]
4013 fn test_schema_too_many_columns() {
4014 run_schema_test_with_error(
4015 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4016 Arc::new(Schema::new(vec![
4017 Field::new("int64", ArrowDataType::Int64, false),
4018 Field::new("int32", ArrowDataType::Int32, false),
4019 ])),
4020 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4021 );
4022 }
4023
4024 #[test]
4025 fn test_schema_mismatched_column_names() {
4026 run_schema_test_with_error(
4027 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4028 Arc::new(Schema::new(vec![Field::new(
4029 "other",
4030 ArrowDataType::Int64,
4031 false,
4032 )])),
4033 "Arrow: incompatible arrow schema, expected field named int64 got other",
4034 );
4035 }
4036
4037 #[test]
4038 fn test_schema_incompatible_columns() {
4039 run_schema_test_with_error(
4040 vec![
4041 (
4042 "col1_invalid",
4043 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4044 ),
4045 (
4046 "col2_valid",
4047 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4048 ),
4049 (
4050 "col3_invalid",
4051 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4052 ),
4053 ],
4054 Arc::new(Schema::new(vec![
4055 Field::new("col1_invalid", ArrowDataType::Int32, false),
4056 Field::new("col2_valid", ArrowDataType::Int32, false),
4057 Field::new("col3_invalid", ArrowDataType::Int32, false),
4058 ])),
4059 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4060 );
4061 }
4062
4063 #[test]
4064 fn test_one_incompatible_nested_column() {
4065 let nested_fields = Fields::from(vec![
4066 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4067 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4068 ]);
4069 let nested = StructArray::try_new(
4070 nested_fields,
4071 vec![
4072 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4073 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4074 ],
4075 None,
4076 )
4077 .expect("struct array");
4078 let supplied_nested_fields = Fields::from(vec![
4079 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4080 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4081 ]);
4082 run_schema_test_with_error(
4083 vec![
4084 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4085 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4086 ("nested", Arc::new(nested) as ArrayRef),
4087 ],
4088 Arc::new(Schema::new(vec![
4089 Field::new("col1", ArrowDataType::Int64, false),
4090 Field::new("col2", ArrowDataType::Int32, false),
4091 Field::new(
4092 "nested",
4093 ArrowDataType::Struct(supplied_nested_fields),
4094 false,
4095 ),
4096 ])),
4097 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4098 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4099 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4100 );
4101 }
4102
4103 fn utf8_parquet() -> Bytes {
4105 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4106 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4107 let props = None;
4108 let mut parquet_data = vec![];
4110 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4111 writer.write(&batch).unwrap();
4112 writer.close().unwrap();
4113 Bytes::from(parquet_data)
4114 }
4115
4116 #[test]
4117 fn test_schema_error_bad_types() {
4118 let parquet_data = utf8_parquet();
4120
4121 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4123 "column1",
4124 arrow::datatypes::DataType::Int32,
4125 false,
4126 )]));
4127
4128 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4130 let err =
4131 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4132 .unwrap_err();
4133 assert_eq!(
4134 err.to_string(),
4135 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4136 )
4137 }
4138
4139 #[test]
4140 fn test_schema_error_bad_nullability() {
4141 let parquet_data = utf8_parquet();
4143
4144 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4146 "column1",
4147 arrow::datatypes::DataType::Utf8,
4148 true,
4149 )]));
4150
4151 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4153 let err =
4154 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4155 .unwrap_err();
4156 assert_eq!(
4157 err.to_string(),
4158 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4159 )
4160 }
4161
4162 #[test]
4163 fn test_read_binary_as_utf8() {
4164 let file = write_parquet_from_iter(vec![
4165 (
4166 "binary_to_utf8",
4167 Arc::new(BinaryArray::from(vec![
4168 b"one".as_ref(),
4169 b"two".as_ref(),
4170 b"three".as_ref(),
4171 ])) as ArrayRef,
4172 ),
4173 (
4174 "large_binary_to_large_utf8",
4175 Arc::new(LargeBinaryArray::from(vec![
4176 b"one".as_ref(),
4177 b"two".as_ref(),
4178 b"three".as_ref(),
4179 ])) as ArrayRef,
4180 ),
4181 (
4182 "binary_view_to_utf8_view",
4183 Arc::new(BinaryViewArray::from(vec![
4184 b"one".as_ref(),
4185 b"two".as_ref(),
4186 b"three".as_ref(),
4187 ])) as ArrayRef,
4188 ),
4189 ]);
4190 let supplied_fields = Fields::from(vec![
4191 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4192 Field::new(
4193 "large_binary_to_large_utf8",
4194 ArrowDataType::LargeUtf8,
4195 false,
4196 ),
4197 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4198 ]);
4199
4200 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4201 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4202 file.try_clone().unwrap(),
4203 options,
4204 )
4205 .expect("reader builder with schema")
4206 .build()
4207 .expect("reader with schema");
4208
4209 let batch = arrow_reader.next().unwrap().unwrap();
4210 assert_eq!(batch.num_columns(), 3);
4211 assert_eq!(batch.num_rows(), 3);
4212 assert_eq!(
4213 batch
4214 .column(0)
4215 .as_string::<i32>()
4216 .iter()
4217 .collect::<Vec<_>>(),
4218 vec![Some("one"), Some("two"), Some("three")]
4219 );
4220
4221 assert_eq!(
4222 batch
4223 .column(1)
4224 .as_string::<i64>()
4225 .iter()
4226 .collect::<Vec<_>>(),
4227 vec![Some("one"), Some("two"), Some("three")]
4228 );
4229
4230 assert_eq!(
4231 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4232 vec![Some("one"), Some("two"), Some("three")]
4233 );
4234 }
4235
4236 #[test]
4237 #[should_panic(expected = "Invalid UTF8 sequence at")]
4238 fn test_read_non_utf8_binary_as_utf8() {
4239 let file = write_parquet_from_iter(vec![(
4240 "non_utf8_binary",
4241 Arc::new(BinaryArray::from(vec![
4242 b"\xDE\x00\xFF".as_ref(),
4243 b"\xDE\x01\xAA".as_ref(),
4244 b"\xDE\x02\xFF".as_ref(),
4245 ])) as ArrayRef,
4246 )]);
4247 let supplied_fields = Fields::from(vec![Field::new(
4248 "non_utf8_binary",
4249 ArrowDataType::Utf8,
4250 false,
4251 )]);
4252
4253 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4254 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4255 file.try_clone().unwrap(),
4256 options,
4257 )
4258 .expect("reader builder with schema")
4259 .build()
4260 .expect("reader with schema");
4261 arrow_reader.next().unwrap().unwrap_err();
4262 }
4263
4264 #[test]
4265 fn test_with_schema() {
4266 let nested_fields = Fields::from(vec![
4267 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4268 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4269 ]);
4270
4271 let nested_arrays: Vec<ArrayRef> = vec![
4272 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4273 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4274 ];
4275
4276 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4277
4278 let file = write_parquet_from_iter(vec![
4279 (
4280 "int32_to_ts_second",
4281 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4282 ),
4283 (
4284 "date32_to_date64",
4285 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4286 ),
4287 ("nested", Arc::new(nested) as ArrayRef),
4288 ]);
4289
4290 let supplied_nested_fields = Fields::from(vec![
4291 Field::new(
4292 "utf8_to_dict",
4293 ArrowDataType::Dictionary(
4294 Box::new(ArrowDataType::Int32),
4295 Box::new(ArrowDataType::Utf8),
4296 ),
4297 false,
4298 ),
4299 Field::new(
4300 "int64_to_ts_nano",
4301 ArrowDataType::Timestamp(
4302 arrow::datatypes::TimeUnit::Nanosecond,
4303 Some("+10:00".into()),
4304 ),
4305 false,
4306 ),
4307 ]);
4308
4309 let supplied_schema = Arc::new(Schema::new(vec![
4310 Field::new(
4311 "int32_to_ts_second",
4312 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4313 false,
4314 ),
4315 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4316 Field::new(
4317 "nested",
4318 ArrowDataType::Struct(supplied_nested_fields),
4319 false,
4320 ),
4321 ]));
4322
4323 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4324 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4325 file.try_clone().unwrap(),
4326 options,
4327 )
4328 .expect("reader builder with schema")
4329 .build()
4330 .expect("reader with schema");
4331
4332 assert_eq!(arrow_reader.schema(), supplied_schema);
4333 let batch = arrow_reader.next().unwrap().unwrap();
4334 assert_eq!(batch.num_columns(), 3);
4335 assert_eq!(batch.num_rows(), 4);
4336 assert_eq!(
4337 batch
4338 .column(0)
4339 .as_any()
4340 .downcast_ref::<TimestampSecondArray>()
4341 .expect("downcast to timestamp second")
4342 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4343 .map(|v| v.to_string())
4344 .expect("value as datetime"),
4345 "1970-01-01 01:00:00 +01:00"
4346 );
4347 assert_eq!(
4348 batch
4349 .column(1)
4350 .as_any()
4351 .downcast_ref::<Date64Array>()
4352 .expect("downcast to date64")
4353 .value_as_date(0)
4354 .map(|v| v.to_string())
4355 .expect("value as date"),
4356 "1970-01-01"
4357 );
4358
4359 let nested = batch
4360 .column(2)
4361 .as_any()
4362 .downcast_ref::<StructArray>()
4363 .expect("downcast to struct");
4364
4365 let nested_dict = nested
4366 .column(0)
4367 .as_any()
4368 .downcast_ref::<Int32DictionaryArray>()
4369 .expect("downcast to dictionary");
4370
4371 assert_eq!(
4372 nested_dict
4373 .values()
4374 .as_any()
4375 .downcast_ref::<StringArray>()
4376 .expect("downcast to string")
4377 .iter()
4378 .collect::<Vec<_>>(),
4379 vec![Some("a"), Some("b")]
4380 );
4381
4382 assert_eq!(
4383 nested_dict.keys().iter().collect::<Vec<_>>(),
4384 vec![Some(0), Some(0), Some(0), Some(1)]
4385 );
4386
4387 assert_eq!(
4388 nested
4389 .column(1)
4390 .as_any()
4391 .downcast_ref::<TimestampNanosecondArray>()
4392 .expect("downcast to timestamp nanosecond")
4393 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4394 .map(|v| v.to_string())
4395 .expect("value as datetime"),
4396 "1970-01-01 10:00:00.000000001 +10:00"
4397 );
4398 }
4399
4400 #[test]
4401 fn test_empty_projection() {
4402 let testdata = arrow::util::test_util::parquet_test_data();
4403 let path = format!("{testdata}/alltypes_plain.parquet");
4404 let file = File::open(path).unwrap();
4405
4406 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4407 let file_metadata = builder.metadata().file_metadata();
4408 let expected_rows = file_metadata.num_rows() as usize;
4409
4410 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4411 let batch_reader = builder
4412 .with_projection(mask)
4413 .with_batch_size(2)
4414 .build()
4415 .unwrap();
4416
4417 let mut total_rows = 0;
4418 for maybe_batch in batch_reader {
4419 let batch = maybe_batch.unwrap();
4420 total_rows += batch.num_rows();
4421 assert_eq!(batch.num_columns(), 0);
4422 assert!(batch.num_rows() <= 2);
4423 }
4424
4425 assert_eq!(total_rows, expected_rows);
4426 }
4427
4428 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4429 let schema = Arc::new(Schema::new(vec![Field::new(
4430 "list",
4431 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4432 true,
4433 )]));
4434
4435 let mut buf = Vec::with_capacity(1024);
4436
4437 let mut writer = ArrowWriter::try_new(
4438 &mut buf,
4439 schema.clone(),
4440 Some(
4441 WriterProperties::builder()
4442 .set_max_row_group_size(row_group_size)
4443 .build(),
4444 ),
4445 )
4446 .unwrap();
4447 for _ in 0..2 {
4448 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4449 for _ in 0..(batch_size) {
4450 list_builder.append(true);
4451 }
4452 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4453 .unwrap();
4454 writer.write(&batch).unwrap();
4455 }
4456 writer.close().unwrap();
4457
4458 let mut record_reader =
4459 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4460 assert_eq!(
4461 batch_size,
4462 record_reader.next().unwrap().unwrap().num_rows()
4463 );
4464 assert_eq!(
4465 batch_size,
4466 record_reader.next().unwrap().unwrap().num_rows()
4467 );
4468 }
4469
4470 #[test]
4471 fn test_row_group_exact_multiple() {
4472 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4473 test_row_group_batch(8, 8);
4474 test_row_group_batch(10, 8);
4475 test_row_group_batch(8, 10);
4476 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4477 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4478 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4479 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4480 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4481 }
4482
4483 fn get_expected_batches(
4486 column: &RecordBatch,
4487 selection: &RowSelection,
4488 batch_size: usize,
4489 ) -> Vec<RecordBatch> {
4490 let mut expected_batches = vec![];
4491
4492 let mut selection: VecDeque<_> = selection.clone().into();
4493 let mut row_offset = 0;
4494 let mut last_start = None;
4495 while row_offset < column.num_rows() && !selection.is_empty() {
4496 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4497 while batch_remaining > 0 && !selection.is_empty() {
4498 let (to_read, skip) = match selection.front_mut() {
4499 Some(selection) if selection.row_count > batch_remaining => {
4500 selection.row_count -= batch_remaining;
4501 (batch_remaining, selection.skip)
4502 }
4503 Some(_) => {
4504 let select = selection.pop_front().unwrap();
4505 (select.row_count, select.skip)
4506 }
4507 None => break,
4508 };
4509
4510 batch_remaining -= to_read;
4511
4512 match skip {
4513 true => {
4514 if let Some(last_start) = last_start.take() {
4515 expected_batches.push(column.slice(last_start, row_offset - last_start))
4516 }
4517 row_offset += to_read
4518 }
4519 false => {
4520 last_start.get_or_insert(row_offset);
4521 row_offset += to_read
4522 }
4523 }
4524 }
4525 }
4526
4527 if let Some(last_start) = last_start.take() {
4528 expected_batches.push(column.slice(last_start, row_offset - last_start))
4529 }
4530
4531 for batch in &expected_batches[..expected_batches.len() - 1] {
4533 assert_eq!(batch.num_rows(), batch_size);
4534 }
4535
4536 expected_batches
4537 }
4538
4539 fn create_test_selection(
4540 step_len: usize,
4541 total_len: usize,
4542 skip_first: bool,
4543 ) -> (RowSelection, usize) {
4544 let mut remaining = total_len;
4545 let mut skip = skip_first;
4546 let mut vec = vec![];
4547 let mut selected_count = 0;
4548 while remaining != 0 {
4549 let step = if remaining > step_len {
4550 step_len
4551 } else {
4552 remaining
4553 };
4554 vec.push(RowSelector {
4555 row_count: step,
4556 skip,
4557 });
4558 remaining -= step;
4559 if !skip {
4560 selected_count += step;
4561 }
4562 skip = !skip;
4563 }
4564 (vec.into(), selected_count)
4565 }
4566
4567 #[test]
4568 fn test_scan_row_with_selection() {
4569 let testdata = arrow::util::test_util::parquet_test_data();
4570 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4571 let test_file = File::open(&path).unwrap();
4572
4573 let mut serial_reader =
4574 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4575 let data = serial_reader.next().unwrap().unwrap();
4576
4577 let do_test = |batch_size: usize, selection_len: usize| {
4578 for skip_first in [false, true] {
4579 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4580
4581 let expected = get_expected_batches(&data, &selections, batch_size);
4582 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4583 assert_eq!(
4584 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4585 expected,
4586 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4587 );
4588 }
4589 };
4590
4591 do_test(1000, 1000);
4594
4595 do_test(20, 20);
4597
4598 do_test(20, 5);
4600
4601 do_test(20, 5);
4604
4605 fn create_skip_reader(
4606 test_file: &File,
4607 batch_size: usize,
4608 selections: RowSelection,
4609 ) -> ParquetRecordBatchReader {
4610 let options = ArrowReaderOptions::new().with_page_index(true);
4611 let file = test_file.try_clone().unwrap();
4612 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4613 .unwrap()
4614 .with_batch_size(batch_size)
4615 .with_row_selection(selections)
4616 .build()
4617 .unwrap()
4618 }
4619 }
4620
4621 #[test]
4622 fn test_batch_size_overallocate() {
4623 let testdata = arrow::util::test_util::parquet_test_data();
4624 let path = format!("{testdata}/alltypes_plain.parquet");
4626 let test_file = File::open(path).unwrap();
4627
4628 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4629 let num_rows = builder.metadata.file_metadata().num_rows();
4630 let reader = builder
4631 .with_batch_size(1024)
4632 .with_projection(ProjectionMask::all())
4633 .build()
4634 .unwrap();
4635 assert_ne!(1024, num_rows);
4636 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4637 }
4638
4639 #[test]
4640 fn test_read_with_page_index_enabled() {
4641 let testdata = arrow::util::test_util::parquet_test_data();
4642
4643 {
4644 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4646 let test_file = File::open(path).unwrap();
4647 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4648 test_file,
4649 ArrowReaderOptions::new().with_page_index(true),
4650 )
4651 .unwrap();
4652 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4653 let reader = builder.build().unwrap();
4654 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4655 assert_eq!(batches.len(), 8);
4656 }
4657
4658 {
4659 let path = format!("{testdata}/alltypes_plain.parquet");
4661 let test_file = File::open(path).unwrap();
4662 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4663 test_file,
4664 ArrowReaderOptions::new().with_page_index(true),
4665 )
4666 .unwrap();
4667 assert!(builder.metadata().offset_index().is_none());
4670 let reader = builder.build().unwrap();
4671 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4672 assert_eq!(batches.len(), 1);
4673 }
4674 }
4675
4676 #[test]
4677 fn test_raw_repetition() {
4678 const MESSAGE_TYPE: &str = "
4679 message Log {
4680 OPTIONAL INT32 eventType;
4681 REPEATED INT32 category;
4682 REPEATED group filter {
4683 OPTIONAL INT32 error;
4684 }
4685 }
4686 ";
4687 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4688 let props = Default::default();
4689
4690 let mut buf = Vec::with_capacity(1024);
4691 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4692 let mut row_group_writer = writer.next_row_group().unwrap();
4693
4694 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4696 col_writer
4697 .typed::<Int32Type>()
4698 .write_batch(&[1], Some(&[1]), None)
4699 .unwrap();
4700 col_writer.close().unwrap();
4701 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4703 col_writer
4704 .typed::<Int32Type>()
4705 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4706 .unwrap();
4707 col_writer.close().unwrap();
4708 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4710 col_writer
4711 .typed::<Int32Type>()
4712 .write_batch(&[1], Some(&[1]), Some(&[0]))
4713 .unwrap();
4714 col_writer.close().unwrap();
4715
4716 let rg_md = row_group_writer.close().unwrap();
4717 assert_eq!(rg_md.num_rows(), 1);
4718 writer.close().unwrap();
4719
4720 let bytes = Bytes::from(buf);
4721
4722 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4723 let full = no_mask.next().unwrap().unwrap();
4724
4725 assert_eq!(full.num_columns(), 3);
4726
4727 for idx in 0..3 {
4728 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4729 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4730 let mut reader = b.with_projection(mask).build().unwrap();
4731 let projected = reader.next().unwrap().unwrap();
4732
4733 assert_eq!(projected.num_columns(), 1);
4734 assert_eq!(full.column(idx), projected.column(0));
4735 }
4736 }
4737
4738 #[test]
4739 fn test_read_lz4_raw() {
4740 let testdata = arrow::util::test_util::parquet_test_data();
4741 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4742 let file = File::open(path).unwrap();
4743
4744 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4745 .unwrap()
4746 .collect::<Result<Vec<_>, _>>()
4747 .unwrap();
4748 assert_eq!(batches.len(), 1);
4749 let batch = &batches[0];
4750
4751 assert_eq!(batch.num_columns(), 3);
4752 assert_eq!(batch.num_rows(), 4);
4753
4754 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4756 assert_eq!(
4757 a.values(),
4758 &[1593604800, 1593604800, 1593604801, 1593604801]
4759 );
4760
4761 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4762 let a: Vec<_> = a.iter().flatten().collect();
4763 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4764
4765 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4766 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4767 }
4768
4769 #[test]
4779 fn test_read_lz4_hadoop_fallback() {
4780 for file in [
4781 "hadoop_lz4_compressed.parquet",
4782 "non_hadoop_lz4_compressed.parquet",
4783 ] {
4784 let testdata = arrow::util::test_util::parquet_test_data();
4785 let path = format!("{testdata}/{file}");
4786 let file = File::open(path).unwrap();
4787 let expected_rows = 4;
4788
4789 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4790 .unwrap()
4791 .collect::<Result<Vec<_>, _>>()
4792 .unwrap();
4793 assert_eq!(batches.len(), 1);
4794 let batch = &batches[0];
4795
4796 assert_eq!(batch.num_columns(), 3);
4797 assert_eq!(batch.num_rows(), expected_rows);
4798
4799 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4800 assert_eq!(
4801 a.values(),
4802 &[1593604800, 1593604800, 1593604801, 1593604801]
4803 );
4804
4805 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4806 let b: Vec<_> = b.iter().flatten().collect();
4807 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4808
4809 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4810 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4811 }
4812 }
4813
4814 #[test]
4815 fn test_read_lz4_hadoop_large() {
4816 let testdata = arrow::util::test_util::parquet_test_data();
4817 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4818 let file = File::open(path).unwrap();
4819 let expected_rows = 10000;
4820
4821 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4822 .unwrap()
4823 .collect::<Result<Vec<_>, _>>()
4824 .unwrap();
4825 assert_eq!(batches.len(), 1);
4826 let batch = &batches[0];
4827
4828 assert_eq!(batch.num_columns(), 1);
4829 assert_eq!(batch.num_rows(), expected_rows);
4830
4831 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4832 let a: Vec<_> = a.iter().flatten().collect();
4833 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4834 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4835 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4836 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4837 }
4838
4839 #[test]
4840 #[cfg(feature = "snap")]
4841 fn test_read_nested_lists() {
4842 let testdata = arrow::util::test_util::parquet_test_data();
4843 let path = format!("{testdata}/nested_lists.snappy.parquet");
4844 let file = File::open(path).unwrap();
4845
4846 let f = file.try_clone().unwrap();
4847 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4848 let expected = reader.next().unwrap().unwrap();
4849 assert_eq!(expected.num_rows(), 3);
4850
4851 let selection = RowSelection::from(vec![
4852 RowSelector::skip(1),
4853 RowSelector::select(1),
4854 RowSelector::skip(1),
4855 ]);
4856 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4857 .unwrap()
4858 .with_row_selection(selection)
4859 .build()
4860 .unwrap();
4861
4862 let actual = reader.next().unwrap().unwrap();
4863 assert_eq!(actual.num_rows(), 1);
4864 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4865 }
4866
4867 #[test]
4868 fn test_arbitrary_decimal() {
4869 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4870 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4871 .with_precision_and_scale(19, 0)
4872 .unwrap();
4873 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4874 .with_precision_and_scale(12, 0)
4875 .unwrap();
4876 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4877 .with_precision_and_scale(17, 10)
4878 .unwrap();
4879
4880 let written = RecordBatch::try_from_iter([
4881 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4882 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4883 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4884 ])
4885 .unwrap();
4886
4887 let mut buffer = Vec::with_capacity(1024);
4888 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4889 writer.write(&written).unwrap();
4890 writer.close().unwrap();
4891
4892 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4893 .unwrap()
4894 .collect::<Result<Vec<_>, _>>()
4895 .unwrap();
4896
4897 assert_eq!(&written.slice(0, 8), &read[0]);
4898 }
4899
4900 #[test]
4901 fn test_list_skip() {
4902 let mut list = ListBuilder::new(Int32Builder::new());
4903 list.append_value([Some(1), Some(2)]);
4904 list.append_value([Some(3)]);
4905 list.append_value([Some(4)]);
4906 let list = list.finish();
4907 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4908
4909 let props = WriterProperties::builder()
4911 .set_data_page_row_count_limit(1)
4912 .set_write_batch_size(2)
4913 .build();
4914
4915 let mut buffer = Vec::with_capacity(1024);
4916 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4917 writer.write(&batch).unwrap();
4918 writer.close().unwrap();
4919
4920 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4921 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4922 .unwrap()
4923 .with_row_selection(selection.into())
4924 .build()
4925 .unwrap();
4926 let out = reader.next().unwrap().unwrap();
4927 assert_eq!(out.num_rows(), 1);
4928 assert_eq!(out, batch.slice(2, 1));
4929 }
4930
4931 #[test]
4932 fn test_row_selection_interleaved_skip() -> Result<()> {
4933 let schema = Arc::new(Schema::new(vec![Field::new(
4934 "v",
4935 ArrowDataType::Int32,
4936 false,
4937 )]));
4938
4939 let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
4940 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
4941
4942 let mut buffer = Vec::with_capacity(1024);
4943 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
4944 writer.write(&batch)?;
4945 writer.close()?;
4946
4947 let selection = RowSelection::from(vec![
4948 RowSelector::select(1),
4949 RowSelector::skip(2),
4950 RowSelector::select(2),
4951 ]);
4952
4953 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
4954 .with_batch_size(4)
4955 .with_row_selection(selection)
4956 .build()?;
4957
4958 let out = reader.next().unwrap()?;
4959 assert_eq!(out.num_rows(), 3);
4960 let values = out
4961 .column(0)
4962 .as_primitive::<arrow_array::types::Int32Type>()
4963 .values();
4964 assert_eq!(values, &[0, 3, 4]);
4965 assert!(reader.next().is_none());
4966 Ok(())
4967 }
4968
4969 #[test]
4970 fn test_row_selection_mask_sparse_rows() -> Result<()> {
4971 let schema = Arc::new(Schema::new(vec![Field::new(
4972 "v",
4973 ArrowDataType::Int32,
4974 false,
4975 )]));
4976
4977 let values = Int32Array::from((0..30).collect::<Vec<i32>>());
4978 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
4979
4980 let mut buffer = Vec::with_capacity(1024);
4981 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
4982 writer.write(&batch)?;
4983 writer.close()?;
4984
4985 let total_rows = batch.num_rows();
4986 let ranges = (1..total_rows)
4987 .step_by(2)
4988 .map(|i| i..i + 1)
4989 .collect::<Vec<_>>();
4990 let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
4991
4992 let selectors: Vec<RowSelector> = selection.clone().into();
4993 assert!(total_rows < selectors.len() * 8);
4994
4995 let bytes = Bytes::from(buffer);
4996
4997 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
4998 .with_batch_size(7)
4999 .with_row_selection(selection)
5000 .build()?;
5001
5002 let mut collected = Vec::new();
5003 for batch in reader {
5004 let batch = batch?;
5005 collected.extend_from_slice(
5006 batch
5007 .column(0)
5008 .as_primitive::<arrow_array::types::Int32Type>()
5009 .values(),
5010 );
5011 }
5012
5013 let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
5014 assert_eq!(collected, expected);
5015 Ok(())
5016 }
5017
5018 fn test_decimal32_roundtrip() {
5019 let d = |values: Vec<i32>, p: u8| {
5020 let iter = values.into_iter();
5021 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5022 .with_precision_and_scale(p, 2)
5023 .unwrap()
5024 };
5025
5026 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5027 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5028
5029 let mut buffer = Vec::with_capacity(1024);
5030 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5031 writer.write(&batch).unwrap();
5032 writer.close().unwrap();
5033
5034 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5035 let t1 = builder.parquet_schema().columns()[0].physical_type();
5036 assert_eq!(t1, PhysicalType::INT32);
5037
5038 let mut reader = builder.build().unwrap();
5039 assert_eq!(batch.schema(), reader.schema());
5040
5041 let out = reader.next().unwrap().unwrap();
5042 assert_eq!(batch, out);
5043 }
5044
5045 fn test_decimal64_roundtrip() {
5046 let d = |values: Vec<i64>, p: u8| {
5050 let iter = values.into_iter();
5051 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5052 .with_precision_and_scale(p, 2)
5053 .unwrap()
5054 };
5055
5056 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5057 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5058 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5059
5060 let batch = RecordBatch::try_from_iter([
5061 ("d1", Arc::new(d1) as ArrayRef),
5062 ("d2", Arc::new(d2) as ArrayRef),
5063 ("d3", Arc::new(d3) as ArrayRef),
5064 ])
5065 .unwrap();
5066
5067 let mut buffer = Vec::with_capacity(1024);
5068 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5069 writer.write(&batch).unwrap();
5070 writer.close().unwrap();
5071
5072 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5073 let t1 = builder.parquet_schema().columns()[0].physical_type();
5074 assert_eq!(t1, PhysicalType::INT32);
5075 let t2 = builder.parquet_schema().columns()[1].physical_type();
5076 assert_eq!(t2, PhysicalType::INT64);
5077 let t3 = builder.parquet_schema().columns()[2].physical_type();
5078 assert_eq!(t3, PhysicalType::INT64);
5079
5080 let mut reader = builder.build().unwrap();
5081 assert_eq!(batch.schema(), reader.schema());
5082
5083 let out = reader.next().unwrap().unwrap();
5084 assert_eq!(batch, out);
5085 }
5086
5087 fn test_decimal_roundtrip<T: DecimalType>() {
5088 let d = |values: Vec<usize>, p: u8| {
5093 let iter = values.into_iter().map(T::Native::usize_as);
5094 PrimitiveArray::<T>::from_iter_values(iter)
5095 .with_precision_and_scale(p, 2)
5096 .unwrap()
5097 };
5098
5099 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5100 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5101 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5102 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5103
5104 let batch = RecordBatch::try_from_iter([
5105 ("d1", Arc::new(d1) as ArrayRef),
5106 ("d2", Arc::new(d2) as ArrayRef),
5107 ("d3", Arc::new(d3) as ArrayRef),
5108 ("d4", Arc::new(d4) as ArrayRef),
5109 ])
5110 .unwrap();
5111
5112 let mut buffer = Vec::with_capacity(1024);
5113 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5114 writer.write(&batch).unwrap();
5115 writer.close().unwrap();
5116
5117 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5118 let t1 = builder.parquet_schema().columns()[0].physical_type();
5119 assert_eq!(t1, PhysicalType::INT32);
5120 let t2 = builder.parquet_schema().columns()[1].physical_type();
5121 assert_eq!(t2, PhysicalType::INT64);
5122 let t3 = builder.parquet_schema().columns()[2].physical_type();
5123 assert_eq!(t3, PhysicalType::INT64);
5124 let t4 = builder.parquet_schema().columns()[3].physical_type();
5125 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5126
5127 let mut reader = builder.build().unwrap();
5128 assert_eq!(batch.schema(), reader.schema());
5129
5130 let out = reader.next().unwrap().unwrap();
5131 assert_eq!(batch, out);
5132 }
5133
5134 #[test]
5135 fn test_decimal() {
5136 test_decimal32_roundtrip();
5137 test_decimal64_roundtrip();
5138 test_decimal_roundtrip::<Decimal128Type>();
5139 test_decimal_roundtrip::<Decimal256Type>();
5140 }
5141
5142 #[test]
5143 fn test_list_selection() {
5144 let schema = Arc::new(Schema::new(vec![Field::new_list(
5145 "list",
5146 Field::new_list_field(ArrowDataType::Utf8, true),
5147 false,
5148 )]));
5149 let mut buf = Vec::with_capacity(1024);
5150
5151 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5152
5153 for i in 0..2 {
5154 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5155 for j in 0..1024 {
5156 list_a_builder.values().append_value(format!("{i} {j}"));
5157 list_a_builder.append(true);
5158 }
5159 let batch =
5160 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5161 .unwrap();
5162 writer.write(&batch).unwrap();
5163 }
5164 let _metadata = writer.close().unwrap();
5165
5166 let buf = Bytes::from(buf);
5167 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5168 .unwrap()
5169 .with_row_selection(RowSelection::from(vec![
5170 RowSelector::skip(100),
5171 RowSelector::select(924),
5172 RowSelector::skip(100),
5173 RowSelector::select(924),
5174 ]))
5175 .build()
5176 .unwrap();
5177
5178 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5179 let batch = concat_batches(&schema, &batches).unwrap();
5180
5181 assert_eq!(batch.num_rows(), 924 * 2);
5182 let list = batch.column(0).as_list::<i32>();
5183
5184 for w in list.value_offsets().windows(2) {
5185 assert_eq!(w[0] + 1, w[1])
5186 }
5187 let mut values = list.values().as_string::<i32>().iter();
5188
5189 for i in 0..2 {
5190 for j in 100..1024 {
5191 let expected = format!("{i} {j}");
5192 assert_eq!(values.next().unwrap().unwrap(), &expected);
5193 }
5194 }
5195 }
5196
5197 #[test]
5198 fn test_list_selection_fuzz() {
5199 let mut rng = rng();
5200 let schema = Arc::new(Schema::new(vec![Field::new_list(
5201 "list",
5202 Field::new_list(
5203 Field::LIST_FIELD_DEFAULT_NAME,
5204 Field::new_list_field(ArrowDataType::Int32, true),
5205 true,
5206 ),
5207 true,
5208 )]));
5209 let mut buf = Vec::with_capacity(1024);
5210 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5211
5212 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5213
5214 for _ in 0..2048 {
5215 if rng.random_bool(0.2) {
5216 list_a_builder.append(false);
5217 continue;
5218 }
5219
5220 let list_a_len = rng.random_range(0..10);
5221 let list_b_builder = list_a_builder.values();
5222
5223 for _ in 0..list_a_len {
5224 if rng.random_bool(0.2) {
5225 list_b_builder.append(false);
5226 continue;
5227 }
5228
5229 let list_b_len = rng.random_range(0..10);
5230 let int_builder = list_b_builder.values();
5231 for _ in 0..list_b_len {
5232 match rng.random_bool(0.2) {
5233 true => int_builder.append_null(),
5234 false => int_builder.append_value(rng.random()),
5235 }
5236 }
5237 list_b_builder.append(true)
5238 }
5239 list_a_builder.append(true);
5240 }
5241
5242 let array = Arc::new(list_a_builder.finish());
5243 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5244
5245 writer.write(&batch).unwrap();
5246 let _metadata = writer.close().unwrap();
5247
5248 let buf = Bytes::from(buf);
5249
5250 let cases = [
5251 vec![
5252 RowSelector::skip(100),
5253 RowSelector::select(924),
5254 RowSelector::skip(100),
5255 RowSelector::select(924),
5256 ],
5257 vec![
5258 RowSelector::select(924),
5259 RowSelector::skip(100),
5260 RowSelector::select(924),
5261 RowSelector::skip(100),
5262 ],
5263 vec![
5264 RowSelector::skip(1023),
5265 RowSelector::select(1),
5266 RowSelector::skip(1023),
5267 RowSelector::select(1),
5268 ],
5269 vec![
5270 RowSelector::select(1),
5271 RowSelector::skip(1023),
5272 RowSelector::select(1),
5273 RowSelector::skip(1023),
5274 ],
5275 ];
5276
5277 for batch_size in [100, 1024, 2048] {
5278 for selection in &cases {
5279 let selection = RowSelection::from(selection.clone());
5280 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5281 .unwrap()
5282 .with_row_selection(selection.clone())
5283 .with_batch_size(batch_size)
5284 .build()
5285 .unwrap();
5286
5287 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5288 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5289 assert_eq!(actual.num_rows(), selection.row_count());
5290
5291 let mut batch_offset = 0;
5292 let mut actual_offset = 0;
5293 for selector in selection.iter() {
5294 if selector.skip {
5295 batch_offset += selector.row_count;
5296 continue;
5297 }
5298
5299 assert_eq!(
5300 batch.slice(batch_offset, selector.row_count),
5301 actual.slice(actual_offset, selector.row_count)
5302 );
5303
5304 batch_offset += selector.row_count;
5305 actual_offset += selector.row_count;
5306 }
5307 }
5308 }
5309 }
5310
5311 #[test]
5312 fn test_read_old_nested_list() {
5313 use arrow::datatypes::DataType;
5314 use arrow::datatypes::ToByteSlice;
5315
5316 let testdata = arrow::util::test_util::parquet_test_data();
5317 let path = format!("{testdata}/old_list_structure.parquet");
5326 let test_file = File::open(path).unwrap();
5327
5328 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5330
5331 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5333
5334 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5336 "array",
5337 DataType::Int32,
5338 false,
5339 ))))
5340 .len(2)
5341 .add_buffer(a_value_offsets)
5342 .add_child_data(a_values.into_data())
5343 .build()
5344 .unwrap();
5345 let a = ListArray::from(a_list_data);
5346
5347 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5348 let mut reader = builder.build().unwrap();
5349 let out = reader.next().unwrap().unwrap();
5350 assert_eq!(out.num_rows(), 1);
5351 assert_eq!(out.num_columns(), 1);
5352 let c0 = out.column(0);
5354 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5355 let r0 = c0arr.value(0);
5357 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5358 assert_eq!(r0arr, &a);
5359 }
5360
5361 #[test]
5362 fn test_map_no_value() {
5363 let testdata = arrow::util::test_util::parquet_test_data();
5383 let path = format!("{testdata}/map_no_value.parquet");
5384 let file = File::open(path).unwrap();
5385
5386 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5387 .unwrap()
5388 .build()
5389 .unwrap();
5390 let out = reader.next().unwrap().unwrap();
5391 assert_eq!(out.num_rows(), 3);
5392 assert_eq!(out.num_columns(), 3);
5393 let c0 = out.column(1).as_list::<i32>();
5395 let c1 = out.column(2).as_list::<i32>();
5396 assert_eq!(c0.len(), c1.len());
5397 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5398 }
5399
5400 #[test]
5401 fn test_row_filter_full_page_skip_is_handled() {
5402 let first_value: i64 = 1111;
5403 let last_value: i64 = 9999;
5404 let num_rows: usize = 12;
5405
5406 let schema = Arc::new(Schema::new(vec![
5410 Field::new("key", arrow_schema::DataType::Int64, false),
5411 Field::new("value", arrow_schema::DataType::Int64, false),
5412 ]));
5413
5414 let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5415 int_values[0] = first_value;
5416 int_values[num_rows - 1] = last_value;
5417 let keys = Int64Array::from(int_values.clone());
5418 let values = Int64Array::from(int_values.clone());
5419 let batch = RecordBatch::try_new(
5420 Arc::clone(&schema),
5421 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5422 )
5423 .unwrap();
5424
5425 let props = WriterProperties::builder()
5426 .set_write_batch_size(2)
5427 .set_data_page_row_count_limit(2)
5428 .build();
5429
5430 let mut buffer = Vec::new();
5431 let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5432 writer.write(&batch).unwrap();
5433 writer.close().unwrap();
5434 let data = Bytes::from(buffer);
5435
5436 let options = ArrowReaderOptions::new().with_page_index(true);
5437 let builder =
5438 ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5439 let schema = builder.parquet_schema().clone();
5440 let filter_mask = ProjectionMask::leaves(&schema, [0]);
5441
5442 let make_predicate = |mask: ProjectionMask| {
5443 ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5444 let column = batch.column(0);
5445 let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5446 let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5447 or(&match_first, &match_second)
5448 })
5449 };
5450
5451 let options = ArrowReaderOptions::new().with_page_index(true);
5452 let predicate = make_predicate(filter_mask.clone());
5453
5454 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5457 .unwrap()
5458 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5459 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5460 .with_batch_size(12)
5461 .build()
5462 .unwrap();
5463
5464 let schema = reader.schema().clone();
5467 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5468 let result = concat_batches(&schema, &batches).unwrap();
5469 assert_eq!(result.num_rows(), 2);
5470 }
5471
5472 #[test]
5473 fn test_get_row_group_column_bloom_filter_with_length() {
5474 let testdata = arrow::util::test_util::parquet_test_data();
5476 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5477 let file = File::open(path).unwrap();
5478 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5479 let schema = builder.schema().clone();
5480 let reader = builder.build().unwrap();
5481
5482 let mut parquet_data = Vec::new();
5483 let props = WriterProperties::builder()
5484 .set_bloom_filter_enabled(true)
5485 .build();
5486 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5487 for batch in reader {
5488 let batch = batch.unwrap();
5489 writer.write(&batch).unwrap();
5490 }
5491 writer.close().unwrap();
5492
5493 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5495 }
5496
5497 #[test]
5498 fn test_get_row_group_column_bloom_filter_without_length() {
5499 let testdata = arrow::util::test_util::parquet_test_data();
5500 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5501 let data = Bytes::from(std::fs::read(path).unwrap());
5502 test_get_row_group_column_bloom_filter(data, false);
5503 }
5504
5505 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5506 let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5507
5508 let metadata = builder.metadata();
5509 assert_eq!(metadata.num_row_groups(), 1);
5510 let row_group = metadata.row_group(0);
5511 let column = row_group.column(0);
5512 assert_eq!(column.bloom_filter_length().is_some(), with_length);
5513
5514 let sbbf = builder
5515 .get_row_group_column_bloom_filter(0, 0)
5516 .unwrap()
5517 .unwrap();
5518 assert!(sbbf.check(&"Hello"));
5519 assert!(!sbbf.check(&"Hello_Not_Exists"));
5520 }
5521
5522 #[test]
5523 fn test_read_unknown_logical_type() {
5524 let testdata = arrow::util::test_util::parquet_test_data();
5525 let path = format!("{testdata}/unknown-logical-type.parquet");
5526 let test_file = File::open(path).unwrap();
5527
5528 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5529 .expect("Error creating reader builder");
5530
5531 let schema = builder.metadata().file_metadata().schema_descr();
5532 assert_eq!(
5533 schema.column(0).logical_type_ref(),
5534 Some(&LogicalType::String)
5535 );
5536 assert_eq!(
5537 schema.column(1).logical_type_ref(),
5538 Some(&LogicalType::_Unknown { field_id: 2555 })
5539 );
5540 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5541
5542 let mut reader = builder.build().unwrap();
5543 let out = reader.next().unwrap().unwrap();
5544 assert_eq!(out.num_rows(), 3);
5545 assert_eq!(out.num_columns(), 2);
5546 }
5547
5548 #[test]
5549 fn test_read_row_numbers() {
5550 let file = write_parquet_from_iter(vec![(
5551 "value",
5552 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5553 )]);
5554 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5555
5556 let row_number_field = Arc::new(
5557 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5558 );
5559
5560 let options = ArrowReaderOptions::new()
5561 .with_schema(Arc::new(Schema::new(supplied_fields)))
5562 .with_virtual_columns(vec![row_number_field.clone()])
5563 .unwrap();
5564 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5565 file.try_clone().unwrap(),
5566 options,
5567 )
5568 .expect("reader builder with schema")
5569 .build()
5570 .expect("reader with schema");
5571
5572 let batch = arrow_reader.next().unwrap().unwrap();
5573 let schema = Arc::new(Schema::new(vec![
5574 Field::new("value", ArrowDataType::Int64, false),
5575 (*row_number_field).clone(),
5576 ]));
5577
5578 assert_eq!(batch.schema(), schema);
5579 assert_eq!(batch.num_columns(), 2);
5580 assert_eq!(batch.num_rows(), 3);
5581 assert_eq!(
5582 batch
5583 .column(0)
5584 .as_primitive::<types::Int64Type>()
5585 .iter()
5586 .collect::<Vec<_>>(),
5587 vec![Some(1), Some(2), Some(3)]
5588 );
5589 assert_eq!(
5590 batch
5591 .column(1)
5592 .as_primitive::<types::Int64Type>()
5593 .iter()
5594 .collect::<Vec<_>>(),
5595 vec![Some(0), Some(1), Some(2)]
5596 );
5597 }
5598
5599 #[test]
5600 fn test_read_only_row_numbers() {
5601 let file = write_parquet_from_iter(vec![(
5602 "value",
5603 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5604 )]);
5605 let row_number_field = Arc::new(
5606 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5607 );
5608 let options = ArrowReaderOptions::new()
5609 .with_virtual_columns(vec![row_number_field.clone()])
5610 .unwrap();
5611 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5612 let num_columns = metadata
5613 .metadata
5614 .file_metadata()
5615 .schema_descr()
5616 .num_columns();
5617
5618 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5619 .with_projection(ProjectionMask::none(num_columns))
5620 .build()
5621 .expect("reader with schema");
5622
5623 let batch = arrow_reader.next().unwrap().unwrap();
5624 let schema = Arc::new(Schema::new(vec![row_number_field]));
5625
5626 assert_eq!(batch.schema(), schema);
5627 assert_eq!(batch.num_columns(), 1);
5628 assert_eq!(batch.num_rows(), 3);
5629 assert_eq!(
5630 batch
5631 .column(0)
5632 .as_primitive::<types::Int64Type>()
5633 .iter()
5634 .collect::<Vec<_>>(),
5635 vec![Some(0), Some(1), Some(2)]
5636 );
5637 }
5638
5639 #[test]
5640 fn test_read_row_numbers_row_group_order() -> Result<()> {
5641 let array = Int64Array::from_iter_values(5000..5100);
5643 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5644 let mut buffer = Vec::new();
5645 let options = WriterProperties::builder()
5646 .set_max_row_group_size(50)
5647 .build();
5648 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5649 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5651 writer.write(&batch_chunk)?;
5652 }
5653 writer.close()?;
5654
5655 let row_number_field = Arc::new(
5656 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5657 );
5658
5659 let buffer = Bytes::from(buffer);
5660
5661 let options =
5662 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5663
5664 let arrow_reader =
5666 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5667 .build()?;
5668
5669 assert_eq!(
5670 ValuesAndRowNumbers {
5671 values: (5000..5100).collect(),
5672 row_numbers: (0..100).collect()
5673 },
5674 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5675 );
5676
5677 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5679 .with_row_groups(vec![1, 0])
5680 .build()?;
5681
5682 assert_eq!(
5683 ValuesAndRowNumbers {
5684 values: (5050..5100).chain(5000..5050).collect(),
5685 row_numbers: (50..100).chain(0..50).collect(),
5686 },
5687 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5688 );
5689
5690 Ok(())
5691 }
5692
5693 #[derive(Debug, PartialEq)]
5694 struct ValuesAndRowNumbers {
5695 values: Vec<i64>,
5696 row_numbers: Vec<i64>,
5697 }
5698 impl ValuesAndRowNumbers {
5699 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5700 let mut values = vec![];
5701 let mut row_numbers = vec![];
5702 for batch in reader {
5703 let batch = batch.expect("Could not read batch");
5704 values.extend(
5705 batch
5706 .column_by_name("col")
5707 .expect("Could not get col column")
5708 .as_primitive::<arrow::datatypes::Int64Type>()
5709 .iter()
5710 .map(|v| v.expect("Could not get value")),
5711 );
5712
5713 row_numbers.extend(
5714 batch
5715 .column_by_name("row_number")
5716 .expect("Could not get row_number column")
5717 .as_primitive::<arrow::datatypes::Int64Type>()
5718 .iter()
5719 .map(|v| v.expect("Could not get row number"))
5720 .collect::<Vec<_>>(),
5721 );
5722 }
5723 Self {
5724 values,
5725 row_numbers,
5726 }
5727 }
5728 }
5729
5730 #[test]
5731 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5732 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5734 assert_eq!(
5735 ArrowReaderOptions::new()
5736 .with_virtual_columns(vec![regular_field])
5737 .unwrap_err()
5738 .to_string(),
5739 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5740 );
5741 }
5742
5743 #[test]
5744 fn test_row_numbers_with_multiple_row_groups() {
5745 test_row_numbers_with_multiple_row_groups_helper(
5746 false,
5747 |path, selection, _row_filter, batch_size| {
5748 let file = File::open(path).unwrap();
5749 let row_number_field = Arc::new(
5750 Field::new("row_number", ArrowDataType::Int64, false)
5751 .with_extension_type(RowNumber),
5752 );
5753 let options = ArrowReaderOptions::new()
5754 .with_virtual_columns(vec![row_number_field])
5755 .unwrap();
5756 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5757 .unwrap()
5758 .with_row_selection(selection)
5759 .with_batch_size(batch_size)
5760 .build()
5761 .expect("Could not create reader");
5762 reader
5763 .collect::<Result<Vec<_>, _>>()
5764 .expect("Could not read")
5765 },
5766 );
5767 }
5768
5769 #[test]
5770 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5771 test_row_numbers_with_multiple_row_groups_helper(
5772 true,
5773 |path, selection, row_filter, batch_size| {
5774 let file = File::open(path).unwrap();
5775 let row_number_field = Arc::new(
5776 Field::new("row_number", ArrowDataType::Int64, false)
5777 .with_extension_type(RowNumber),
5778 );
5779 let options = ArrowReaderOptions::new()
5780 .with_virtual_columns(vec![row_number_field])
5781 .unwrap();
5782 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5783 .unwrap()
5784 .with_row_selection(selection)
5785 .with_batch_size(batch_size)
5786 .with_row_filter(row_filter.expect("No filter"))
5787 .build()
5788 .expect("Could not create reader");
5789 reader
5790 .collect::<Result<Vec<_>, _>>()
5791 .expect("Could not read")
5792 },
5793 );
5794 }
5795
5796 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5797 use_filter: bool,
5798 test_case: F,
5799 ) where
5800 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5801 {
5802 let seed: u64 = random();
5803 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5804 let mut rng = StdRng::seed_from_u64(seed);
5805
5806 use tempfile::TempDir;
5807 let tempdir = TempDir::new().expect("Could not create temp dir");
5808
5809 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5810
5811 let path = tempdir.path().join("test.parquet");
5812 std::fs::write(&path, bytes).expect("Could not write file");
5813
5814 let mut case = vec![];
5815 let mut remaining = metadata.file_metadata().num_rows();
5816 while remaining > 0 {
5817 let row_count = rng.random_range(1..=remaining);
5818 remaining -= row_count;
5819 case.push(RowSelector {
5820 row_count: row_count as usize,
5821 skip: rng.random_bool(0.5),
5822 });
5823 }
5824
5825 let filter = use_filter.then(|| {
5826 let filter = (0..metadata.file_metadata().num_rows())
5827 .map(|_| rng.random_bool(0.99))
5828 .collect::<Vec<_>>();
5829 let mut filter_offset = 0;
5830 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5831 ProjectionMask::all(),
5832 move |b| {
5833 let array = BooleanArray::from_iter(
5834 filter
5835 .iter()
5836 .skip(filter_offset)
5837 .take(b.num_rows())
5838 .map(|x| Some(*x)),
5839 );
5840 filter_offset += b.num_rows();
5841 Ok(array)
5842 },
5843 ))])
5844 });
5845
5846 let selection = RowSelection::from(case);
5847 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5848
5849 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5850 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5851 return;
5852 }
5853 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5854 .expect("Failed to concatenate");
5855 let values = actual
5857 .column(0)
5858 .as_primitive::<types::Int64Type>()
5859 .iter()
5860 .collect::<Vec<_>>();
5861 let row_numbers = actual
5862 .column(1)
5863 .as_primitive::<types::Int64Type>()
5864 .iter()
5865 .collect::<Vec<_>>();
5866 assert_eq!(
5867 row_numbers
5868 .into_iter()
5869 .map(|number| number.map(|number| number + 1))
5870 .collect::<Vec<_>>(),
5871 values
5872 );
5873 }
5874
5875 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5876 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5877 "value",
5878 ArrowDataType::Int64,
5879 false,
5880 )])));
5881
5882 let mut buf = Vec::with_capacity(1024);
5883 let mut writer =
5884 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5885
5886 let mut values = 1..=rng.random_range(1..4096);
5887 while !values.is_empty() {
5888 let batch_values = values
5889 .by_ref()
5890 .take(rng.random_range(1..4096))
5891 .collect::<Vec<_>>();
5892 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5893 let batch =
5894 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5895 writer.write(&batch).expect("Could not write batch");
5896 writer.flush().expect("Could not flush");
5897 }
5898 let metadata = writer.close().expect("Could not close writer");
5899
5900 (Bytes::from(buf), metadata)
5901 }
5902}