1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{PredicateOptions, ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub const DEFAULT_BATCH_SIZE: usize = 1024;
62
63pub struct ArrowReaderBuilder<T> {
111 pub(crate) input: T,
120
121 pub(crate) metadata: Arc<ParquetMetaData>,
122
123 pub(crate) schema: SchemaRef,
124
125 pub(crate) fields: Option<Arc<ParquetField>>,
126
127 pub(crate) batch_size: usize,
128
129 pub(crate) row_groups: Option<Vec<usize>>,
130
131 pub(crate) projection: ProjectionMask,
132
133 pub(crate) filter: Option<RowFilter>,
134
135 pub(crate) selection: Option<RowSelection>,
136
137 pub(crate) row_selection_policy: RowSelectionPolicy,
138
139 pub(crate) limit: Option<usize>,
140
141 pub(crate) offset: Option<usize>,
142
143 pub(crate) metrics: ArrowReaderMetrics,
144
145 pub(crate) max_predicate_cache_size: usize,
146}
147
148impl<T: Debug> Debug for ArrowReaderBuilder<T> {
149 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("ArrowReaderBuilder<T>")
151 .field("input", &self.input)
152 .field("metadata", &self.metadata)
153 .field("schema", &self.schema)
154 .field("fields", &self.fields)
155 .field("batch_size", &self.batch_size)
156 .field("row_groups", &self.row_groups)
157 .field("projection", &self.projection)
158 .field("filter", &self.filter)
159 .field("selection", &self.selection)
160 .field("row_selection_policy", &self.row_selection_policy)
161 .field("limit", &self.limit)
162 .field("offset", &self.offset)
163 .field("metrics", &self.metrics)
164 .finish()
165 }
166}
167
168impl<T> ArrowReaderBuilder<T> {
169 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
170 Self {
171 input,
172 metadata: metadata.metadata,
173 schema: metadata.schema,
174 fields: metadata.fields,
175 batch_size: DEFAULT_BATCH_SIZE,
176 row_groups: None,
177 projection: ProjectionMask::all(),
178 filter: None,
179 selection: None,
180 row_selection_policy: RowSelectionPolicy::default(),
181 limit: None,
182 offset: None,
183 metrics: ArrowReaderMetrics::Disabled,
184 max_predicate_cache_size: 100 * 1024 * 1024, }
186 }
187
188 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
190 &self.metadata
191 }
192
193 pub fn parquet_schema(&self) -> &SchemaDescriptor {
195 self.metadata.file_metadata().schema_descr()
196 }
197
198 pub fn schema(&self) -> &SchemaRef {
200 &self.schema
201 }
202
203 pub fn with_batch_size(self, batch_size: usize) -> Self {
210 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
212 Self { batch_size, ..self }
213 }
214
215 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
219 Self {
220 row_groups: Some(row_groups),
221 ..self
222 }
223 }
224
225 pub fn with_projection(self, mask: ProjectionMask) -> Self {
227 Self {
228 projection: mask,
229 ..self
230 }
231 }
232
233 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
237 Self {
238 row_selection_policy: policy,
239 ..self
240 }
241 }
242
243 pub fn with_row_selection(self, selection: RowSelection) -> Self {
303 Self {
304 selection: Some(selection),
305 ..self
306 }
307 }
308
309 pub fn with_row_filter(self, filter: RowFilter) -> Self {
349 Self {
350 filter: Some(filter),
351 ..self
352 }
353 }
354
355 pub fn with_limit(self, limit: usize) -> Self {
363 Self {
364 limit: Some(limit),
365 ..self
366 }
367 }
368
369 pub fn with_offset(self, offset: usize) -> Self {
377 Self {
378 offset: Some(offset),
379 ..self
380 }
381 }
382
383 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
418 Self { metrics, ..self }
419 }
420
421 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
436 Self {
437 max_predicate_cache_size,
438 ..self
439 }
440 }
441}
442
443#[derive(Debug, Clone, Default)]
458pub struct ArrowReaderOptions {
459 skip_arrow_metadata: bool,
461 supplied_schema: Option<SchemaRef>,
466
467 pub(crate) column_index: PageIndexPolicy,
468 pub(crate) offset_index: PageIndexPolicy,
469
470 metadata_options: ParquetMetaDataOptions,
472 #[cfg(feature = "encryption")]
474 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
475
476 virtual_columns: Vec<FieldRef>,
477}
478
479impl ArrowReaderOptions {
480 pub fn new() -> Self {
482 Self::default()
483 }
484
485 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
492 Self {
493 skip_arrow_metadata,
494 ..self
495 }
496 }
497
498 pub fn with_schema(self, schema: SchemaRef) -> Self {
607 Self {
608 supplied_schema: Some(schema),
609 skip_arrow_metadata: true,
610 ..self
611 }
612 }
613
614 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
615 pub fn with_page_index(self, page_index: bool) -> Self {
628 self.with_page_index_policy(PageIndexPolicy::from(page_index))
629 }
630
631 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
639 self.with_column_index_policy(policy)
640 .with_offset_index_policy(policy)
641 }
642
643 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
650 self.column_index = policy;
651 self
652 }
653
654 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
661 self.offset_index = policy;
662 self
663 }
664
665 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
671 self.metadata_options.set_schema(schema);
672 self
673 }
674
675 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
686 self.metadata_options.set_encoding_stats_as_mask(val);
687 self
688 }
689
690 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
695 self.metadata_options.set_encoding_stats_policy(policy);
696 self
697 }
698
699 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
704 self.metadata_options.set_column_stats_policy(policy);
705 self
706 }
707
708 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
713 self.metadata_options.set_size_stats_policy(policy);
714 self
715 }
716
717 #[cfg(feature = "encryption")]
721 pub fn with_file_decryption_properties(
722 self,
723 file_decryption_properties: Arc<FileDecryptionProperties>,
724 ) -> Self {
725 Self {
726 file_decryption_properties: Some(file_decryption_properties),
727 ..self
728 }
729 }
730
731 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
784 for field in &virtual_columns {
786 if !is_virtual_column(field) {
787 return Err(ParquetError::General(format!(
788 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
789 field.name()
790 )));
791 }
792 }
793 Ok(Self {
794 virtual_columns,
795 ..self
796 })
797 }
798
799 #[deprecated(
800 since = "57.2.0",
801 note = "Use `column_index_policy` or `offset_index_policy` instead"
802 )]
803 pub fn page_index(&self) -> bool {
810 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
811 }
812
813 pub fn offset_index_policy(&self) -> PageIndexPolicy {
818 self.offset_index
819 }
820
821 pub fn column_index_policy(&self) -> PageIndexPolicy {
826 self.column_index
827 }
828
829 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
831 &self.metadata_options
832 }
833
834 #[cfg(feature = "encryption")]
839 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
840 self.file_decryption_properties.as_ref()
841 }
842}
843
844#[derive(Debug, Clone)]
859pub struct ArrowReaderMetadata {
860 pub(crate) metadata: Arc<ParquetMetaData>,
862 pub(crate) schema: SchemaRef,
864 pub(crate) fields: Option<Arc<ParquetField>>,
866}
867
868impl ArrowReaderMetadata {
869 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
881 let metadata = ParquetMetaDataReader::new()
882 .with_column_index_policy(options.column_index)
883 .with_offset_index_policy(options.offset_index)
884 .with_metadata_options(Some(options.metadata_options.clone()));
885 #[cfg(feature = "encryption")]
886 let metadata = metadata.with_decryption_properties(
887 options.file_decryption_properties.as_ref().map(Arc::clone),
888 );
889 let metadata = metadata.parse_and_finish(reader)?;
890 Self::try_new(Arc::new(metadata), options)
891 }
892
893 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
901 match options.supplied_schema {
902 Some(supplied_schema) => Self::with_supplied_schema(
903 metadata,
904 supplied_schema.clone(),
905 &options.virtual_columns,
906 ),
907 None => {
908 let kv_metadata = match options.skip_arrow_metadata {
909 true => None,
910 false => metadata.file_metadata().key_value_metadata(),
911 };
912
913 let (schema, fields) = parquet_to_arrow_schema_and_fields(
914 metadata.file_metadata().schema_descr(),
915 ProjectionMask::all(),
916 kv_metadata,
917 &options.virtual_columns,
918 )?;
919
920 Ok(Self {
921 metadata,
922 schema: Arc::new(schema),
923 fields: fields.map(Arc::new),
924 })
925 }
926 }
927 }
928
929 fn with_supplied_schema(
930 metadata: Arc<ParquetMetaData>,
931 supplied_schema: SchemaRef,
932 virtual_columns: &[FieldRef],
933 ) -> Result<Self> {
934 let parquet_schema = metadata.file_metadata().schema_descr();
935 let field_levels = parquet_to_arrow_field_levels_with_virtual(
936 parquet_schema,
937 ProjectionMask::all(),
938 Some(supplied_schema.fields()),
939 virtual_columns,
940 )?;
941 let fields = field_levels.fields;
942 let inferred_len = fields.len();
943 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
944 if inferred_len != supplied_len {
948 return Err(arrow_err!(format!(
949 "Incompatible supplied Arrow schema: expected {} columns received {}",
950 inferred_len, supplied_len
951 )));
952 }
953
954 let mut errors = Vec::new();
955
956 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
957
958 for (field1, field2) in field_iter {
959 if field1.data_type() != field2.data_type() {
960 errors.push(format!(
961 "data type mismatch for field {}: requested {} but found {}",
962 field1.name(),
963 field1.data_type(),
964 field2.data_type()
965 ));
966 }
967 if field1.is_nullable() != field2.is_nullable() {
968 errors.push(format!(
969 "nullability mismatch for field {}: expected {:?} but found {:?}",
970 field1.name(),
971 field1.is_nullable(),
972 field2.is_nullable()
973 ));
974 }
975 if field1.metadata() != field2.metadata() {
976 errors.push(format!(
977 "metadata mismatch for field {}: expected {:?} but found {:?}",
978 field1.name(),
979 field1.metadata(),
980 field2.metadata()
981 ));
982 }
983 }
984
985 if !errors.is_empty() {
986 let message = errors.join(", ");
987 return Err(ParquetError::ArrowError(format!(
988 "Incompatible supplied Arrow schema: {message}",
989 )));
990 }
991
992 Ok(Self {
993 metadata,
994 schema: supplied_schema,
995 fields: field_levels.levels.map(Arc::new),
996 })
997 }
998
999 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
1001 &self.metadata
1002 }
1003
1004 pub fn parquet_schema(&self) -> &SchemaDescriptor {
1006 self.metadata.file_metadata().schema_descr()
1007 }
1008
1009 pub fn schema(&self) -> &SchemaRef {
1011 &self.schema
1012 }
1013}
1014
1015#[doc(hidden)]
1016pub struct SyncReader<T: ChunkReader>(T);
1018
1019impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1020 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1021 f.debug_tuple("SyncReader").field(&self.0).finish()
1022 }
1023}
1024
1025pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1032
1033impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1034 pub fn try_new(reader: T) -> Result<Self> {
1065 Self::try_new_with_options(reader, Default::default())
1066 }
1067
1068 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1073 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1074 Ok(Self::new_with_metadata(reader, metadata))
1075 }
1076
1077 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1116 Self::new_builder(SyncReader(input), metadata)
1117 }
1118
1119 pub fn get_row_group_column_bloom_filter(
1125 &self,
1126 row_group_idx: usize,
1127 column_idx: usize,
1128 ) -> Result<Option<Sbbf>> {
1129 let metadata = self.metadata.row_group(row_group_idx);
1130 let column_metadata = metadata.column(column_idx);
1131
1132 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1133 offset
1134 .try_into()
1135 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1136 } else {
1137 return Ok(None);
1138 };
1139
1140 let buffer = match column_metadata.bloom_filter_length() {
1141 Some(length) => self.input.0.get_bytes(offset, length as usize),
1142 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1143 }?;
1144
1145 let (header, bitset_offset) =
1146 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1147
1148 match header.algorithm {
1149 BloomFilterAlgorithm::BLOCK => {
1150 }
1152 }
1153 match header.compression {
1154 BloomFilterCompression::UNCOMPRESSED => {
1155 }
1157 }
1158 match header.hash {
1159 BloomFilterHash::XXHASH => {
1160 }
1162 }
1163
1164 let bitset = match column_metadata.bloom_filter_length() {
1165 Some(_) => buffer.slice(
1166 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1167 - TryInto::<usize>::try_into(offset).unwrap())..,
1168 ),
1169 None => {
1170 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1171 ParquetError::General("Bloom filter length is invalid".to_string())
1172 })?;
1173 self.input.0.get_bytes(bitset_offset, bitset_length)?
1174 }
1175 };
1176 Ok(Some(Sbbf::new(&bitset)))
1177 }
1178
1179 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1183 let Self {
1184 input,
1185 metadata,
1186 schema: _,
1187 fields,
1188 batch_size,
1189 row_groups,
1190 projection,
1191 mut filter,
1192 selection,
1193 row_selection_policy,
1194 limit,
1195 offset,
1196 metrics,
1197 max_predicate_cache_size: _,
1199 } = self;
1200
1201 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1203
1204 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1205
1206 let reader = ReaderRowGroups {
1207 reader: Arc::new(input.0),
1208 metadata,
1209 row_groups,
1210 };
1211
1212 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1213 .with_selection(selection)
1214 .with_row_selection_policy(row_selection_policy);
1215
1216 if let Some(filter) = filter.as_mut() {
1218 for predicate in filter.predicates.iter_mut() {
1219 if !plan_builder.selects_any() {
1221 break;
1222 }
1223
1224 let mut cache_projection = predicate.projection().clone();
1225 cache_projection.intersect(&projection);
1226
1227 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1228 .with_batch_size(batch_size)
1229 .with_parquet_metadata(&reader.metadata)
1230 .build_array_reader(fields.as_deref(), predicate.projection())?;
1231
1232 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1233 }
1234 }
1235
1236 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1237 .with_batch_size(batch_size)
1238 .with_parquet_metadata(&reader.metadata)
1239 .build_array_reader(fields.as_deref(), &projection)?;
1240
1241 let read_plan = plan_builder
1242 .limited(reader.num_rows())
1243 .with_offset(offset)
1244 .with_limit(limit)
1245 .build_limited()
1246 .build();
1247
1248 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1249 }
1250}
1251
1252struct ReaderRowGroups<T: ChunkReader> {
1253 reader: Arc<T>,
1254
1255 metadata: Arc<ParquetMetaData>,
1256 row_groups: Vec<usize>,
1258}
1259
1260impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1261 fn num_rows(&self) -> usize {
1262 let meta = self.metadata.row_groups();
1263 self.row_groups
1264 .iter()
1265 .map(|x| meta[*x].num_rows() as usize)
1266 .sum()
1267 }
1268
1269 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1270 Ok(Box::new(ReaderPageIterator {
1271 column_idx: i,
1272 reader: self.reader.clone(),
1273 metadata: self.metadata.clone(),
1274 row_groups: self.row_groups.clone().into_iter(),
1275 }))
1276 }
1277
1278 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1279 Box::new(
1280 self.row_groups
1281 .iter()
1282 .map(move |i| self.metadata.row_group(*i)),
1283 )
1284 }
1285
1286 fn metadata(&self) -> &ParquetMetaData {
1287 self.metadata.as_ref()
1288 }
1289}
1290
1291struct ReaderPageIterator<T: ChunkReader> {
1292 reader: Arc<T>,
1293 column_idx: usize,
1294 row_groups: std::vec::IntoIter<usize>,
1295 metadata: Arc<ParquetMetaData>,
1296}
1297
1298impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1299 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1301 let rg = self.metadata.row_group(rg_idx);
1302 let column_chunk_metadata = rg.column(self.column_idx);
1303 let offset_index = self.metadata.offset_index();
1304 let page_locations = offset_index
1307 .filter(|i| !i[rg_idx].is_empty())
1308 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1309 let total_rows = rg.num_rows() as usize;
1310 let reader = self.reader.clone();
1311
1312 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1313 .add_crypto_context(
1314 rg_idx,
1315 self.column_idx,
1316 self.metadata.as_ref(),
1317 column_chunk_metadata,
1318 )
1319 }
1320}
1321
1322impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1323 type Item = Result<Box<dyn PageReader>>;
1324
1325 fn next(&mut self) -> Option<Self::Item> {
1326 let rg_idx = self.row_groups.next()?;
1327 let page_reader = self
1328 .next_page_reader(rg_idx)
1329 .map(|page_reader| Box::new(page_reader) as _);
1330 Some(page_reader)
1331 }
1332}
1333
1334impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1335
1336pub struct ParquetRecordBatchReader {
1347 array_reader: Box<dyn ArrayReader>,
1348 schema: SchemaRef,
1349 read_plan: ReadPlan,
1350}
1351
1352impl Debug for ParquetRecordBatchReader {
1353 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1354 f.debug_struct("ParquetRecordBatchReader")
1355 .field("array_reader", &"...")
1356 .field("schema", &self.schema)
1357 .field("read_plan", &self.read_plan)
1358 .finish()
1359 }
1360}
1361
1362impl Iterator for ParquetRecordBatchReader {
1363 type Item = Result<RecordBatch, ArrowError>;
1364
1365 fn next(&mut self) -> Option<Self::Item> {
1366 self.next_inner()
1367 .map_err(|arrow_err| arrow_err.into())
1368 .transpose()
1369 }
1370}
1371
1372impl ParquetRecordBatchReader {
1373 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1379 let mut read_records = 0;
1380 let batch_size = self.batch_size();
1381 if batch_size == 0 {
1382 return Ok(None);
1383 }
1384 match self.read_plan.row_selection_cursor_mut() {
1385 RowSelectionCursor::Mask(mask_cursor) => {
1386 while !mask_cursor.is_empty() {
1389 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1390 return Ok(None);
1391 };
1392
1393 if mask_chunk.initial_skip > 0 {
1394 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1395 if skipped != mask_chunk.initial_skip {
1396 return Err(general_err!(
1397 "failed to skip rows, expected {}, got {}",
1398 mask_chunk.initial_skip,
1399 skipped
1400 ));
1401 }
1402 }
1403
1404 if mask_chunk.chunk_rows == 0 {
1405 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1406 return Ok(None);
1407 }
1408 continue;
1409 }
1410
1411 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1412
1413 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1414 if read == 0 {
1415 return Err(general_err!(
1416 "reached end of column while expecting {} rows",
1417 mask_chunk.chunk_rows
1418 ));
1419 }
1420 if read != mask_chunk.chunk_rows {
1421 return Err(general_err!(
1422 "insufficient rows read from array reader - expected {}, got {}",
1423 mask_chunk.chunk_rows,
1424 read
1425 ));
1426 }
1427
1428 let array = self.array_reader.consume_batch()?;
1429 let struct_array = array.as_struct_opt().ok_or_else(|| {
1432 ArrowError::ParquetError(
1433 "Struct array reader should return struct array".to_string(),
1434 )
1435 })?;
1436
1437 let filtered_batch =
1438 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1439
1440 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1441 return Err(general_err!(
1442 "filtered rows mismatch selection - expected {}, got {}",
1443 mask_chunk.selected_rows,
1444 filtered_batch.num_rows()
1445 ));
1446 }
1447
1448 if filtered_batch.num_rows() == 0 {
1449 continue;
1450 }
1451
1452 return Ok(Some(filtered_batch));
1453 }
1454 }
1455 RowSelectionCursor::Selectors(selectors_cursor) => {
1456 while read_records < batch_size && !selectors_cursor.is_empty() {
1457 let front = selectors_cursor.next_selector();
1458 if front.skip {
1459 let skipped = self.array_reader.skip_records(front.row_count)?;
1460
1461 if skipped != front.row_count {
1462 return Err(general_err!(
1463 "failed to skip rows, expected {}, got {}",
1464 front.row_count,
1465 skipped
1466 ));
1467 }
1468 continue;
1469 }
1470
1471 if front.row_count == 0 {
1474 continue;
1475 }
1476
1477 let need_read = batch_size - read_records;
1479 let to_read = match front.row_count.checked_sub(need_read) {
1480 Some(remaining) if remaining != 0 => {
1481 selectors_cursor.return_selector(RowSelector::select(remaining));
1484 need_read
1485 }
1486 _ => front.row_count,
1487 };
1488 match self.array_reader.read_records(to_read)? {
1489 0 => break,
1490 rec => read_records += rec,
1491 };
1492 }
1493 }
1494 RowSelectionCursor::All => {
1495 self.array_reader.read_records(batch_size)?;
1496 }
1497 };
1498
1499 let array = self.array_reader.consume_batch()?;
1500 let struct_array = array.as_struct_opt().ok_or_else(|| {
1501 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1502 })?;
1503
1504 Ok(if struct_array.len() > 0 {
1505 Some(RecordBatch::from(struct_array))
1506 } else {
1507 None
1508 })
1509 }
1510}
1511
1512impl RecordBatchReader for ParquetRecordBatchReader {
1513 fn schema(&self) -> SchemaRef {
1518 self.schema.clone()
1519 }
1520}
1521
1522impl ParquetRecordBatchReader {
1523 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1527 ParquetRecordBatchReaderBuilder::try_new(reader)?
1528 .with_batch_size(batch_size)
1529 .build()
1530 }
1531
1532 pub fn try_new_with_row_groups(
1537 levels: &FieldLevels,
1538 row_groups: &dyn RowGroups,
1539 batch_size: usize,
1540 selection: Option<RowSelection>,
1541 ) -> Result<Self> {
1542 let metrics = ArrowReaderMetrics::disabled();
1544 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1545 .with_batch_size(batch_size)
1546 .with_parquet_metadata(row_groups.metadata())
1547 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1548
1549 let read_plan = ReadPlanBuilder::new(batch_size)
1550 .with_selection(selection)
1551 .build();
1552
1553 Ok(Self {
1554 array_reader,
1555 schema: Arc::new(Schema::new(levels.fields.clone())),
1556 read_plan,
1557 })
1558 }
1559
1560 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1564 let schema = match array_reader.get_data_type() {
1565 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1566 _ => unreachable!("Struct array reader's data type is not struct!"),
1567 };
1568
1569 Self {
1570 array_reader,
1571 schema: Arc::new(schema),
1572 read_plan,
1573 }
1574 }
1575
1576 #[inline(always)]
1577 pub(crate) fn batch_size(&self) -> usize {
1578 self.read_plan.batch_size()
1579 }
1580}
1581
1582#[cfg(test)]
1583pub(crate) mod tests {
1584 use std::cmp::min;
1585 use std::collections::{HashMap, VecDeque};
1586 use std::fmt::Formatter;
1587 use std::fs::File;
1588 use std::io::Seek;
1589 use std::path::PathBuf;
1590 use std::sync::Arc;
1591
1592 use rand::rngs::StdRng;
1593 use rand::{Rng, RngCore, SeedableRng, random, rng};
1594 use tempfile::tempfile;
1595
1596 use crate::arrow::arrow_reader::{
1597 ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1598 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1599 };
1600 use crate::arrow::schema::{
1601 add_encoded_arrow_schema_to_metadata,
1602 virtual_type::{RowGroupIndex, RowNumber},
1603 };
1604 use crate::arrow::{ArrowWriter, ProjectionMask};
1605 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1606 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1607 use crate::data_type::{
1608 BoolType, ByteArray, ByteArrayType, DataType, DoubleType, FixedLenByteArray,
1609 FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96, Int96Type,
1610 };
1611 use crate::errors::Result;
1612 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1613 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1614 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1615 use crate::schema::parser::parse_message_type;
1616 use crate::schema::types::{Type, TypePtr};
1617 use crate::util::test_common::rand_gen::RandGen;
1618 use arrow_array::builder::*;
1619 use arrow_array::cast::AsArray;
1620 use arrow_array::types::{
1621 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1622 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1623 Time64MicrosecondType,
1624 };
1625 use arrow_array::*;
1626 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1627 use arrow_data::{ArrayData, ArrayDataBuilder};
1628 use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1629 use arrow_select::concat::concat_batches;
1630 use bytes::Bytes;
1631 use half::f16;
1632 use num_traits::PrimInt;
1633
1634 #[test]
1635 fn test_arrow_reader_all_columns() {
1636 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1637
1638 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1639 let original_schema = Arc::clone(builder.schema());
1640 let reader = builder.build().unwrap();
1641
1642 assert_eq!(original_schema.fields(), reader.schema().fields());
1644 }
1645
1646 #[test]
1647 fn test_reuse_schema() {
1648 let file = get_test_file("parquet/alltypes-java.parquet");
1649
1650 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1651 let expected = builder.metadata;
1652 let schema = expected.file_metadata().schema_descr_ptr();
1653
1654 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1655 let builder =
1656 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1657
1658 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1660 }
1661
1662 #[test]
1663 fn test_page_encoding_stats_mask() {
1664 let testdata = arrow::util::test_util::parquet_test_data();
1665 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1666 let file = File::open(path).unwrap();
1667
1668 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1669 let builder =
1670 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1671
1672 let row_group_metadata = builder.metadata.row_group(0);
1673
1674 let page_encoding_stats = row_group_metadata
1676 .column(0)
1677 .page_encoding_stats_mask()
1678 .unwrap();
1679 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1680 let page_encoding_stats = row_group_metadata
1681 .column(2)
1682 .page_encoding_stats_mask()
1683 .unwrap();
1684 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1685 }
1686
1687 #[test]
1688 fn test_stats_stats_skipped() {
1689 let testdata = arrow::util::test_util::parquet_test_data();
1690 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1691 let file = File::open(path).unwrap();
1692
1693 let arrow_options = ArrowReaderOptions::new()
1695 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1696 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1697 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1698 file.try_clone().unwrap(),
1699 arrow_options,
1700 )
1701 .unwrap();
1702
1703 let row_group_metadata = builder.metadata.row_group(0);
1704 for column in row_group_metadata.columns() {
1705 assert!(column.page_encoding_stats().is_none());
1706 assert!(column.page_encoding_stats_mask().is_none());
1707 assert!(column.statistics().is_none());
1708 }
1709
1710 let arrow_options = ArrowReaderOptions::new()
1712 .with_encoding_stats_as_mask(true)
1713 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1714 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1715 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1716 file.try_clone().unwrap(),
1717 arrow_options,
1718 )
1719 .unwrap();
1720
1721 let row_group_metadata = builder.metadata.row_group(0);
1722 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1723 assert!(column.page_encoding_stats().is_none());
1724 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1725 assert_eq!(column.statistics().is_some(), idx == 0);
1726 }
1727 }
1728
1729 #[test]
1730 fn test_size_stats_stats_skipped() {
1731 let testdata = arrow::util::test_util::parquet_test_data();
1732 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1733 let file = File::open(path).unwrap();
1734
1735 let arrow_options =
1737 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1738 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1739 file.try_clone().unwrap(),
1740 arrow_options,
1741 )
1742 .unwrap();
1743
1744 let row_group_metadata = builder.metadata.row_group(0);
1745 for column in row_group_metadata.columns() {
1746 assert!(column.repetition_level_histogram().is_none());
1747 assert!(column.definition_level_histogram().is_none());
1748 assert!(column.unencoded_byte_array_data_bytes().is_none());
1749 }
1750
1751 let arrow_options = ArrowReaderOptions::new()
1753 .with_encoding_stats_as_mask(true)
1754 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1755 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1756 file.try_clone().unwrap(),
1757 arrow_options,
1758 )
1759 .unwrap();
1760
1761 let row_group_metadata = builder.metadata.row_group(0);
1762 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1763 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1764 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1765 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1766 }
1767 }
1768
1769 #[test]
1770 fn test_arrow_reader_single_column() {
1771 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1772
1773 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1774 let original_schema = Arc::clone(builder.schema());
1775
1776 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1777 let reader = builder.with_projection(mask).build().unwrap();
1778
1779 assert_eq!(1, reader.schema().fields().len());
1781 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1782 }
1783
1784 #[test]
1785 fn test_arrow_reader_single_column_by_name() {
1786 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1787
1788 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1789 let original_schema = Arc::clone(builder.schema());
1790
1791 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1792 let reader = builder.with_projection(mask).build().unwrap();
1793
1794 assert_eq!(1, reader.schema().fields().len());
1796 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1797 }
1798
1799 #[test]
1800 fn test_null_column_reader_test() {
1801 let mut file = tempfile::tempfile().unwrap();
1802
1803 let schema = "
1804 message message {
1805 OPTIONAL INT32 int32;
1806 }
1807 ";
1808 let schema = Arc::new(parse_message_type(schema).unwrap());
1809
1810 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1811 generate_single_column_file_with_data::<Int32Type>(
1812 &[vec![], vec![]],
1813 Some(&def_levels),
1814 file.try_clone().unwrap(), schema,
1816 Some(Field::new("int32", ArrowDataType::Null, true)),
1817 &Default::default(),
1818 )
1819 .unwrap();
1820
1821 file.rewind().unwrap();
1822
1823 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1824 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1825
1826 assert_eq!(batches.len(), 4);
1827 for batch in &batches[0..3] {
1828 assert_eq!(batch.num_rows(), 2);
1829 assert_eq!(batch.num_columns(), 1);
1830 assert_eq!(batch.column(0).null_count(), 2);
1831 }
1832
1833 assert_eq!(batches[3].num_rows(), 1);
1834 assert_eq!(batches[3].num_columns(), 1);
1835 assert_eq!(batches[3].column(0).null_count(), 1);
1836 }
1837
1838 #[test]
1839 fn test_primitive_single_column_reader_test() {
1840 run_single_column_reader_tests::<BoolType, _, BoolType>(
1841 2,
1842 ConvertedType::NONE,
1843 None,
1844 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1845 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1846 );
1847 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1848 2,
1849 ConvertedType::NONE,
1850 None,
1851 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1852 &[
1853 Encoding::PLAIN,
1854 Encoding::RLE_DICTIONARY,
1855 Encoding::DELTA_BINARY_PACKED,
1856 Encoding::BYTE_STREAM_SPLIT,
1857 ],
1858 );
1859 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1860 2,
1861 ConvertedType::NONE,
1862 None,
1863 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1864 &[
1865 Encoding::PLAIN,
1866 Encoding::RLE_DICTIONARY,
1867 Encoding::DELTA_BINARY_PACKED,
1868 Encoding::BYTE_STREAM_SPLIT,
1869 ],
1870 );
1871 run_single_column_reader_tests::<FloatType, _, FloatType>(
1872 2,
1873 ConvertedType::NONE,
1874 None,
1875 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1876 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1877 );
1878 }
1879
1880 #[test]
1881 fn test_unsigned_primitive_single_column_reader_test() {
1882 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1883 2,
1884 ConvertedType::UINT_32,
1885 Some(ArrowDataType::UInt32),
1886 |vals| {
1887 Arc::new(UInt32Array::from_iter(
1888 vals.iter().map(|x| x.map(|x| x as u32)),
1889 ))
1890 },
1891 &[
1892 Encoding::PLAIN,
1893 Encoding::RLE_DICTIONARY,
1894 Encoding::DELTA_BINARY_PACKED,
1895 ],
1896 );
1897 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1898 2,
1899 ConvertedType::UINT_64,
1900 Some(ArrowDataType::UInt64),
1901 |vals| {
1902 Arc::new(UInt64Array::from_iter(
1903 vals.iter().map(|x| x.map(|x| x as u64)),
1904 ))
1905 },
1906 &[
1907 Encoding::PLAIN,
1908 Encoding::RLE_DICTIONARY,
1909 Encoding::DELTA_BINARY_PACKED,
1910 ],
1911 );
1912 }
1913
1914 #[test]
1915 fn test_unsigned_roundtrip() {
1916 let schema = Arc::new(Schema::new(vec![
1917 Field::new("uint32", ArrowDataType::UInt32, true),
1918 Field::new("uint64", ArrowDataType::UInt64, true),
1919 ]));
1920
1921 let mut buf = Vec::with_capacity(1024);
1922 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1923
1924 let original = RecordBatch::try_new(
1925 schema,
1926 vec![
1927 Arc::new(UInt32Array::from_iter_values([
1928 0,
1929 i32::MAX as u32,
1930 u32::MAX,
1931 ])),
1932 Arc::new(UInt64Array::from_iter_values([
1933 0,
1934 i64::MAX as u64,
1935 u64::MAX,
1936 ])),
1937 ],
1938 )
1939 .unwrap();
1940
1941 writer.write(&original).unwrap();
1942 writer.close().unwrap();
1943
1944 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1945 let ret = reader.next().unwrap().unwrap();
1946 assert_eq!(ret, original);
1947
1948 ret.column(0)
1950 .as_any()
1951 .downcast_ref::<UInt32Array>()
1952 .unwrap();
1953
1954 ret.column(1)
1955 .as_any()
1956 .downcast_ref::<UInt64Array>()
1957 .unwrap();
1958 }
1959
1960 #[test]
1961 fn test_float16_roundtrip() -> Result<()> {
1962 let schema = Arc::new(Schema::new(vec![
1963 Field::new("float16", ArrowDataType::Float16, false),
1964 Field::new("float16-nullable", ArrowDataType::Float16, true),
1965 ]));
1966
1967 let mut buf = Vec::with_capacity(1024);
1968 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1969
1970 let original = RecordBatch::try_new(
1971 schema,
1972 vec![
1973 Arc::new(Float16Array::from_iter_values([
1974 f16::EPSILON,
1975 f16::MIN,
1976 f16::MAX,
1977 f16::NAN,
1978 f16::INFINITY,
1979 f16::NEG_INFINITY,
1980 f16::ONE,
1981 f16::NEG_ONE,
1982 f16::ZERO,
1983 f16::NEG_ZERO,
1984 f16::E,
1985 f16::PI,
1986 f16::FRAC_1_PI,
1987 ])),
1988 Arc::new(Float16Array::from(vec![
1989 None,
1990 None,
1991 None,
1992 Some(f16::NAN),
1993 Some(f16::INFINITY),
1994 Some(f16::NEG_INFINITY),
1995 None,
1996 None,
1997 None,
1998 None,
1999 None,
2000 None,
2001 Some(f16::FRAC_1_PI),
2002 ])),
2003 ],
2004 )?;
2005
2006 writer.write(&original)?;
2007 writer.close()?;
2008
2009 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2010 let ret = reader.next().unwrap()?;
2011 assert_eq!(ret, original);
2012
2013 ret.column(0).as_primitive::<Float16Type>();
2015 ret.column(1).as_primitive::<Float16Type>();
2016
2017 Ok(())
2018 }
2019
2020 #[test]
2021 fn test_time_utc_roundtrip() -> Result<()> {
2022 let schema = Arc::new(Schema::new(vec![
2023 Field::new(
2024 "time_millis",
2025 ArrowDataType::Time32(TimeUnit::Millisecond),
2026 true,
2027 )
2028 .with_metadata(HashMap::from_iter(vec![(
2029 "adjusted_to_utc".to_string(),
2030 "".to_string(),
2031 )])),
2032 Field::new(
2033 "time_micros",
2034 ArrowDataType::Time64(TimeUnit::Microsecond),
2035 true,
2036 )
2037 .with_metadata(HashMap::from_iter(vec![(
2038 "adjusted_to_utc".to_string(),
2039 "".to_string(),
2040 )])),
2041 ]));
2042
2043 let mut buf = Vec::with_capacity(1024);
2044 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2045
2046 let original = RecordBatch::try_new(
2047 schema,
2048 vec![
2049 Arc::new(Time32MillisecondArray::from(vec![
2050 Some(-1),
2051 Some(0),
2052 Some(86_399_000),
2053 Some(86_400_000),
2054 Some(86_401_000),
2055 None,
2056 ])),
2057 Arc::new(Time64MicrosecondArray::from(vec![
2058 Some(-1),
2059 Some(0),
2060 Some(86_399 * 1_000_000),
2061 Some(86_400 * 1_000_000),
2062 Some(86_401 * 1_000_000),
2063 None,
2064 ])),
2065 ],
2066 )?;
2067
2068 writer.write(&original)?;
2069 writer.close()?;
2070
2071 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2072 let ret = reader.next().unwrap()?;
2073 assert_eq!(ret, original);
2074
2075 ret.column(0).as_primitive::<Time32MillisecondType>();
2077 ret.column(1).as_primitive::<Time64MicrosecondType>();
2078
2079 Ok(())
2080 }
2081
2082 #[test]
2083 fn test_date32_roundtrip() -> Result<()> {
2084 use arrow_array::Date32Array;
2085
2086 let schema = Arc::new(Schema::new(vec![Field::new(
2087 "date32",
2088 ArrowDataType::Date32,
2089 false,
2090 )]));
2091
2092 let mut buf = Vec::with_capacity(1024);
2093
2094 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2095
2096 let original = RecordBatch::try_new(
2097 schema,
2098 vec![Arc::new(Date32Array::from(vec![
2099 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2100 ]))],
2101 )?;
2102
2103 writer.write(&original)?;
2104 writer.close()?;
2105
2106 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2107 let ret = reader.next().unwrap()?;
2108 assert_eq!(ret, original);
2109
2110 ret.column(0).as_primitive::<Date32Type>();
2112
2113 Ok(())
2114 }
2115
2116 #[test]
2117 fn test_date64_roundtrip() -> Result<()> {
2118 use arrow_array::Date64Array;
2119
2120 let schema = Arc::new(Schema::new(vec![
2121 Field::new("small-date64", ArrowDataType::Date64, false),
2122 Field::new("big-date64", ArrowDataType::Date64, false),
2123 Field::new("invalid-date64", ArrowDataType::Date64, false),
2124 ]));
2125
2126 let mut default_buf = Vec::with_capacity(1024);
2127 let mut coerce_buf = Vec::with_capacity(1024);
2128
2129 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2130
2131 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2132 let mut coerce_writer =
2133 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2134
2135 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2136
2137 let original = RecordBatch::try_new(
2138 schema,
2139 vec![
2140 Arc::new(Date64Array::from(vec![
2142 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2143 -1_000 * NUM_MILLISECONDS_IN_DAY,
2144 0,
2145 1_000 * NUM_MILLISECONDS_IN_DAY,
2146 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2147 ])),
2148 Arc::new(Date64Array::from(vec![
2150 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2151 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2152 0,
2153 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2154 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2155 ])),
2156 Arc::new(Date64Array::from(vec![
2158 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2159 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2160 1,
2161 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2162 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2163 ])),
2164 ],
2165 )?;
2166
2167 default_writer.write(&original)?;
2168 coerce_writer.write(&original)?;
2169
2170 default_writer.close()?;
2171 coerce_writer.close()?;
2172
2173 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2174 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2175
2176 let default_ret = default_reader.next().unwrap()?;
2177 let coerce_ret = coerce_reader.next().unwrap()?;
2178
2179 assert_eq!(default_ret, original);
2181
2182 assert_eq!(coerce_ret.column(0), original.column(0));
2184 assert_ne!(coerce_ret.column(1), original.column(1));
2185 assert_ne!(coerce_ret.column(2), original.column(2));
2186
2187 default_ret.column(0).as_primitive::<Date64Type>();
2189 coerce_ret.column(0).as_primitive::<Date64Type>();
2190
2191 Ok(())
2192 }
2193 struct RandFixedLenGen {}
2194
2195 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2196 fn r#gen(len: i32) -> FixedLenByteArray {
2197 let mut v = vec![0u8; len as usize];
2198 rng().fill_bytes(&mut v);
2199 ByteArray::from(v).into()
2200 }
2201 }
2202
2203 #[test]
2204 fn test_fixed_length_binary_column_reader() {
2205 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2206 20,
2207 ConvertedType::NONE,
2208 None,
2209 |vals| {
2210 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2211 for val in vals {
2212 match val {
2213 Some(b) => builder.append_value(b).unwrap(),
2214 None => builder.append_null(),
2215 }
2216 }
2217 Arc::new(builder.finish())
2218 },
2219 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2220 );
2221 }
2222
2223 #[test]
2224 fn test_interval_day_time_column_reader() {
2225 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2226 12,
2227 ConvertedType::INTERVAL,
2228 None,
2229 |vals| {
2230 Arc::new(
2231 vals.iter()
2232 .map(|x| {
2233 x.as_ref().map(|b| IntervalDayTime {
2234 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2235 milliseconds: i32::from_le_bytes(
2236 b.as_ref()[8..12].try_into().unwrap(),
2237 ),
2238 })
2239 })
2240 .collect::<IntervalDayTimeArray>(),
2241 )
2242 },
2243 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2244 );
2245 }
2246
2247 #[test]
2248 fn test_int96_single_column_reader_test() {
2249 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2250
2251 type TypeHintAndConversionFunction =
2252 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2253
2254 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2255 (None, |vals: &[Option<Int96>]| {
2257 Arc::new(TimestampNanosecondArray::from_iter(
2258 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2259 )) as ArrayRef
2260 }),
2261 (
2263 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2264 |vals: &[Option<Int96>]| {
2265 Arc::new(TimestampSecondArray::from_iter(
2266 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2267 )) as ArrayRef
2268 },
2269 ),
2270 (
2271 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2272 |vals: &[Option<Int96>]| {
2273 Arc::new(TimestampMillisecondArray::from_iter(
2274 vals.iter().map(|x| x.map(|x| x.to_millis())),
2275 )) as ArrayRef
2276 },
2277 ),
2278 (
2279 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2280 |vals: &[Option<Int96>]| {
2281 Arc::new(TimestampMicrosecondArray::from_iter(
2282 vals.iter().map(|x| x.map(|x| x.to_micros())),
2283 )) as ArrayRef
2284 },
2285 ),
2286 (
2287 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2288 |vals: &[Option<Int96>]| {
2289 Arc::new(TimestampNanosecondArray::from_iter(
2290 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2291 )) as ArrayRef
2292 },
2293 ),
2294 (
2296 Some(ArrowDataType::Timestamp(
2297 TimeUnit::Second,
2298 Some(Arc::from("-05:00")),
2299 )),
2300 |vals: &[Option<Int96>]| {
2301 Arc::new(
2302 TimestampSecondArray::from_iter(
2303 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2304 )
2305 .with_timezone("-05:00"),
2306 ) as ArrayRef
2307 },
2308 ),
2309 ];
2310
2311 resolutions.iter().for_each(|(arrow_type, converter)| {
2312 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2313 2,
2314 ConvertedType::NONE,
2315 arrow_type.clone(),
2316 converter,
2317 encodings,
2318 );
2319 })
2320 }
2321
2322 #[test]
2323 fn test_int96_from_spark_file_with_provided_schema() {
2324 use arrow_schema::DataType::Timestamp;
2328 let test_data = arrow::util::test_util::parquet_test_data();
2329 let path = format!("{test_data}/int96_from_spark.parquet");
2330 let file = File::open(path).unwrap();
2331
2332 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2333 "a",
2334 Timestamp(TimeUnit::Microsecond, None),
2335 true,
2336 )]));
2337 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2338
2339 let mut record_reader =
2340 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2341 .unwrap()
2342 .build()
2343 .unwrap();
2344
2345 let batch = record_reader.next().unwrap().unwrap();
2346 assert_eq!(batch.num_columns(), 1);
2347 let column = batch.column(0);
2348 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2349
2350 let expected = Arc::new(Int64Array::from(vec![
2351 Some(1704141296123456),
2352 Some(1704070800000000),
2353 Some(253402225200000000),
2354 Some(1735599600000000),
2355 None,
2356 Some(9089380393200000000),
2357 ]));
2358
2359 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2364 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2365
2366 assert_eq!(casted_timestamps.len(), expected.len());
2367
2368 casted_timestamps
2369 .iter()
2370 .zip(expected.iter())
2371 .for_each(|(lhs, rhs)| {
2372 assert_eq!(lhs, rhs);
2373 });
2374 }
2375
2376 #[test]
2377 fn test_int96_from_spark_file_without_provided_schema() {
2378 use arrow_schema::DataType::Timestamp;
2382 let test_data = arrow::util::test_util::parquet_test_data();
2383 let path = format!("{test_data}/int96_from_spark.parquet");
2384 let file = File::open(path).unwrap();
2385
2386 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2387 .unwrap()
2388 .build()
2389 .unwrap();
2390
2391 let batch = record_reader.next().unwrap().unwrap();
2392 assert_eq!(batch.num_columns(), 1);
2393 let column = batch.column(0);
2394 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2395
2396 let expected = Arc::new(Int64Array::from(vec![
2397 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2402 Some(-4864435138808946688), ]));
2404
2405 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2410 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2411
2412 assert_eq!(casted_timestamps.len(), expected.len());
2413
2414 casted_timestamps
2415 .iter()
2416 .zip(expected.iter())
2417 .for_each(|(lhs, rhs)| {
2418 assert_eq!(lhs, rhs);
2419 });
2420 }
2421
2422 struct RandUtf8Gen {}
2423
2424 impl RandGen<ByteArrayType> for RandUtf8Gen {
2425 fn r#gen(len: i32) -> ByteArray {
2426 Int32Type::r#gen(len).to_string().as_str().into()
2427 }
2428 }
2429
2430 #[test]
2431 fn test_utf8_single_column_reader_test() {
2432 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2433 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2434 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2435 })))
2436 }
2437
2438 let encodings = &[
2439 Encoding::PLAIN,
2440 Encoding::RLE_DICTIONARY,
2441 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2442 Encoding::DELTA_BYTE_ARRAY,
2443 ];
2444
2445 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2446 2,
2447 ConvertedType::NONE,
2448 None,
2449 |vals| {
2450 Arc::new(BinaryArray::from_iter(
2451 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2452 ))
2453 },
2454 encodings,
2455 );
2456
2457 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2458 2,
2459 ConvertedType::UTF8,
2460 None,
2461 string_converter::<i32>,
2462 encodings,
2463 );
2464
2465 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2466 2,
2467 ConvertedType::UTF8,
2468 Some(ArrowDataType::Utf8),
2469 string_converter::<i32>,
2470 encodings,
2471 );
2472
2473 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2474 2,
2475 ConvertedType::UTF8,
2476 Some(ArrowDataType::LargeUtf8),
2477 string_converter::<i64>,
2478 encodings,
2479 );
2480
2481 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2482 for key in &small_key_types {
2483 for encoding in encodings {
2484 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2485 opts.encoding = *encoding;
2486
2487 let data_type =
2488 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2489
2490 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2492 opts,
2493 2,
2494 ConvertedType::UTF8,
2495 Some(data_type.clone()),
2496 move |vals| {
2497 let vals = string_converter::<i32>(vals);
2498 arrow::compute::cast(&vals, &data_type).unwrap()
2499 },
2500 );
2501 }
2502 }
2503
2504 let key_types = [
2505 ArrowDataType::Int16,
2506 ArrowDataType::UInt16,
2507 ArrowDataType::Int32,
2508 ArrowDataType::UInt32,
2509 ArrowDataType::Int64,
2510 ArrowDataType::UInt64,
2511 ];
2512
2513 for key in &key_types {
2514 let data_type =
2515 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2516
2517 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2518 2,
2519 ConvertedType::UTF8,
2520 Some(data_type.clone()),
2521 move |vals| {
2522 let vals = string_converter::<i32>(vals);
2523 arrow::compute::cast(&vals, &data_type).unwrap()
2524 },
2525 encodings,
2526 );
2527
2528 let data_type = ArrowDataType::Dictionary(
2529 Box::new(key.clone()),
2530 Box::new(ArrowDataType::LargeUtf8),
2531 );
2532
2533 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2534 2,
2535 ConvertedType::UTF8,
2536 Some(data_type.clone()),
2537 move |vals| {
2538 let vals = string_converter::<i64>(vals);
2539 arrow::compute::cast(&vals, &data_type).unwrap()
2540 },
2541 encodings,
2542 );
2543 }
2544 }
2545
2546 #[test]
2547 fn test_decimal_nullable_struct() {
2548 let decimals = Decimal256Array::from_iter_values(
2549 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2550 );
2551
2552 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2553 "decimals",
2554 decimals.data_type().clone(),
2555 false,
2556 )])))
2557 .len(8)
2558 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2559 .child_data(vec![decimals.into_data()])
2560 .build()
2561 .unwrap();
2562
2563 let written =
2564 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2565 .unwrap();
2566
2567 let mut buffer = Vec::with_capacity(1024);
2568 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2569 writer.write(&written).unwrap();
2570 writer.close().unwrap();
2571
2572 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2573 .unwrap()
2574 .collect::<Result<Vec<_>, _>>()
2575 .unwrap();
2576
2577 assert_eq!(&written.slice(0, 3), &read[0]);
2578 assert_eq!(&written.slice(3, 3), &read[1]);
2579 assert_eq!(&written.slice(6, 2), &read[2]);
2580 }
2581
2582 #[test]
2583 fn test_int32_nullable_struct() {
2584 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2585 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2586 "int32",
2587 int32.data_type().clone(),
2588 false,
2589 )])))
2590 .len(8)
2591 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2592 .child_data(vec![int32.into_data()])
2593 .build()
2594 .unwrap();
2595
2596 let written =
2597 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2598 .unwrap();
2599
2600 let mut buffer = Vec::with_capacity(1024);
2601 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2602 writer.write(&written).unwrap();
2603 writer.close().unwrap();
2604
2605 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2606 .unwrap()
2607 .collect::<Result<Vec<_>, _>>()
2608 .unwrap();
2609
2610 assert_eq!(&written.slice(0, 3), &read[0]);
2611 assert_eq!(&written.slice(3, 3), &read[1]);
2612 assert_eq!(&written.slice(6, 2), &read[2]);
2613 }
2614
2615 #[test]
2616 fn test_decimal_list() {
2617 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2618
2619 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2621 decimals.data_type().clone(),
2622 false,
2623 ))))
2624 .len(7)
2625 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2626 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2627 .child_data(vec![decimals.into_data()])
2628 .build()
2629 .unwrap();
2630
2631 let written =
2632 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2633 .unwrap();
2634
2635 let mut buffer = Vec::with_capacity(1024);
2636 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2637 writer.write(&written).unwrap();
2638 writer.close().unwrap();
2639
2640 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2641 .unwrap()
2642 .collect::<Result<Vec<_>, _>>()
2643 .unwrap();
2644
2645 assert_eq!(&written.slice(0, 3), &read[0]);
2646 assert_eq!(&written.slice(3, 3), &read[1]);
2647 assert_eq!(&written.slice(6, 1), &read[2]);
2648 }
2649
2650 #[test]
2651 fn test_read_decimal_file() {
2652 use arrow_array::Decimal128Array;
2653 let testdata = arrow::util::test_util::parquet_test_data();
2654 let file_variants = vec![
2655 ("byte_array", 4),
2656 ("fixed_length", 25),
2657 ("int32", 4),
2658 ("int64", 10),
2659 ];
2660 for (prefix, target_precision) in file_variants {
2661 let path = format!("{testdata}/{prefix}_decimal.parquet");
2662 let file = File::open(path).unwrap();
2663 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2664
2665 let batch = record_reader.next().unwrap().unwrap();
2666 assert_eq!(batch.num_rows(), 24);
2667 let col = batch
2668 .column(0)
2669 .as_any()
2670 .downcast_ref::<Decimal128Array>()
2671 .unwrap();
2672
2673 let expected = 1..25;
2674
2675 assert_eq!(col.precision(), target_precision);
2676 assert_eq!(col.scale(), 2);
2677
2678 for (i, v) in expected.enumerate() {
2679 assert_eq!(col.value(i), v * 100_i128);
2680 }
2681 }
2682 }
2683
2684 #[test]
2685 fn test_read_float16_nonzeros_file() {
2686 use arrow_array::Float16Array;
2687 let testdata = arrow::util::test_util::parquet_test_data();
2688 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2690 let file = File::open(path).unwrap();
2691 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2692
2693 let batch = record_reader.next().unwrap().unwrap();
2694 assert_eq!(batch.num_rows(), 8);
2695 let col = batch
2696 .column(0)
2697 .as_any()
2698 .downcast_ref::<Float16Array>()
2699 .unwrap();
2700
2701 let f16_two = f16::ONE + f16::ONE;
2702
2703 assert_eq!(col.null_count(), 1);
2704 assert!(col.is_null(0));
2705 assert_eq!(col.value(1), f16::ONE);
2706 assert_eq!(col.value(2), -f16_two);
2707 assert!(col.value(3).is_nan());
2708 assert_eq!(col.value(4), f16::ZERO);
2709 assert!(col.value(4).is_sign_positive());
2710 assert_eq!(col.value(5), f16::NEG_ONE);
2711 assert_eq!(col.value(6), f16::NEG_ZERO);
2712 assert!(col.value(6).is_sign_negative());
2713 assert_eq!(col.value(7), f16_two);
2714 }
2715
2716 #[test]
2717 fn test_read_float16_zeros_file() {
2718 use arrow_array::Float16Array;
2719 let testdata = arrow::util::test_util::parquet_test_data();
2720 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2722 let file = File::open(path).unwrap();
2723 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2724
2725 let batch = record_reader.next().unwrap().unwrap();
2726 assert_eq!(batch.num_rows(), 3);
2727 let col = batch
2728 .column(0)
2729 .as_any()
2730 .downcast_ref::<Float16Array>()
2731 .unwrap();
2732
2733 assert_eq!(col.null_count(), 1);
2734 assert!(col.is_null(0));
2735 assert_eq!(col.value(1), f16::ZERO);
2736 assert!(col.value(1).is_sign_positive());
2737 assert!(col.value(2).is_nan());
2738 }
2739
2740 #[test]
2741 fn test_read_float32_float64_byte_stream_split() {
2742 let path = format!(
2743 "{}/byte_stream_split.zstd.parquet",
2744 arrow::util::test_util::parquet_test_data(),
2745 );
2746 let file = File::open(path).unwrap();
2747 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2748
2749 let mut row_count = 0;
2750 for batch in record_reader {
2751 let batch = batch.unwrap();
2752 row_count += batch.num_rows();
2753 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2754 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2755
2756 for &x in f32_col.values() {
2758 assert!(x > -10.0);
2759 assert!(x < 10.0);
2760 }
2761 for &x in f64_col.values() {
2762 assert!(x > -10.0);
2763 assert!(x < 10.0);
2764 }
2765 }
2766 assert_eq!(row_count, 300);
2767 }
2768
2769 #[test]
2770 fn test_read_extended_byte_stream_split() {
2771 let path = format!(
2772 "{}/byte_stream_split_extended.gzip.parquet",
2773 arrow::util::test_util::parquet_test_data(),
2774 );
2775 let file = File::open(path).unwrap();
2776 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2777
2778 let mut row_count = 0;
2779 for batch in record_reader {
2780 let batch = batch.unwrap();
2781 row_count += batch.num_rows();
2782
2783 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2785 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2786 assert_eq!(f16_col.len(), f16_bss.len());
2787 f16_col
2788 .iter()
2789 .zip(f16_bss.iter())
2790 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2791
2792 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2794 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2795 assert_eq!(f32_col.len(), f32_bss.len());
2796 f32_col
2797 .iter()
2798 .zip(f32_bss.iter())
2799 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2800
2801 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2803 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2804 assert_eq!(f64_col.len(), f64_bss.len());
2805 f64_col
2806 .iter()
2807 .zip(f64_bss.iter())
2808 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2809
2810 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2812 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2813 assert_eq!(i32_col.len(), i32_bss.len());
2814 i32_col
2815 .iter()
2816 .zip(i32_bss.iter())
2817 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2818
2819 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2821 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2822 assert_eq!(i64_col.len(), i64_bss.len());
2823 i64_col
2824 .iter()
2825 .zip(i64_bss.iter())
2826 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2827
2828 let flba_col = batch.column(10).as_fixed_size_binary();
2830 let flba_bss = batch.column(11).as_fixed_size_binary();
2831 assert_eq!(flba_col.len(), flba_bss.len());
2832 flba_col
2833 .iter()
2834 .zip(flba_bss.iter())
2835 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2836
2837 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2839 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2840 assert_eq!(dec_col.len(), dec_bss.len());
2841 dec_col
2842 .iter()
2843 .zip(dec_bss.iter())
2844 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2845 }
2846 assert_eq!(row_count, 200);
2847 }
2848
2849 #[test]
2850 fn test_read_incorrect_map_schema_file() {
2851 let testdata = arrow::util::test_util::parquet_test_data();
2852 let path = format!("{testdata}/incorrect_map_schema.parquet");
2854 let file = File::open(path).unwrap();
2855 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2856
2857 let batch = record_reader.next().unwrap().unwrap();
2858 assert_eq!(batch.num_rows(), 1);
2859
2860 let expected_schema = Schema::new(vec![Field::new(
2861 "my_map",
2862 ArrowDataType::Map(
2863 Arc::new(Field::new(
2864 "key_value",
2865 ArrowDataType::Struct(Fields::from(vec![
2866 Field::new("key", ArrowDataType::Utf8, false),
2867 Field::new("value", ArrowDataType::Utf8, true),
2868 ])),
2869 false,
2870 )),
2871 false,
2872 ),
2873 true,
2874 )]);
2875 assert_eq!(batch.schema().as_ref(), &expected_schema);
2876
2877 assert_eq!(batch.num_rows(), 1);
2878 assert_eq!(batch.column(0).null_count(), 0);
2879 assert_eq!(
2880 batch.column(0).as_map().keys().as_ref(),
2881 &StringArray::from(vec!["parent", "name"])
2882 );
2883 assert_eq!(
2884 batch.column(0).as_map().values().as_ref(),
2885 &StringArray::from(vec!["another", "report"])
2886 );
2887 }
2888
2889 #[test]
2890 fn test_read_dict_fixed_size_binary() {
2891 let schema = Arc::new(Schema::new(vec![Field::new(
2892 "a",
2893 ArrowDataType::Dictionary(
2894 Box::new(ArrowDataType::UInt8),
2895 Box::new(ArrowDataType::FixedSizeBinary(8)),
2896 ),
2897 true,
2898 )]));
2899 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2900 let values = FixedSizeBinaryArray::try_from_iter(
2901 vec![
2902 (0u8..8u8).collect::<Vec<u8>>(),
2903 (24u8..32u8).collect::<Vec<u8>>(),
2904 ]
2905 .into_iter(),
2906 )
2907 .unwrap();
2908 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2909 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2910
2911 let mut buffer = Vec::with_capacity(1024);
2912 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2913 writer.write(&batch).unwrap();
2914 writer.close().unwrap();
2915 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2916 .unwrap()
2917 .collect::<Result<Vec<_>, _>>()
2918 .unwrap();
2919
2920 assert_eq!(read.len(), 1);
2921 assert_eq!(&batch, &read[0])
2922 }
2923
2924 #[test]
2925 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2926 let struct_fields = Fields::from(vec![
2933 Field::new(
2934 "city",
2935 ArrowDataType::Dictionary(
2936 Box::new(ArrowDataType::UInt8),
2937 Box::new(ArrowDataType::Utf8),
2938 ),
2939 true,
2940 ),
2941 Field::new("name", ArrowDataType::Utf8, true),
2942 ]);
2943 let schema = Arc::new(Schema::new(vec![Field::new(
2944 "items",
2945 ArrowDataType::Struct(struct_fields.clone()),
2946 true,
2947 )]));
2948
2949 let items_arr = StructArray::new(
2950 struct_fields,
2951 vec![
2952 Arc::new(DictionaryArray::new(
2953 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2954 Arc::new(StringArray::from_iter_values(vec![
2955 "quebec",
2956 "fredericton",
2957 "halifax",
2958 ])),
2959 )),
2960 Arc::new(StringArray::from_iter_values(vec![
2961 "albert", "terry", "lance", "", "tim",
2962 ])),
2963 ],
2964 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2965 );
2966
2967 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2968 let mut buffer = Vec::with_capacity(1024);
2969 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2970 writer.write(&batch).unwrap();
2971 writer.close().unwrap();
2972 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2973 .unwrap()
2974 .collect::<Result<Vec<_>, _>>()
2975 .unwrap();
2976
2977 assert_eq!(read.len(), 1);
2978 assert_eq!(&batch, &read[0])
2979 }
2980
2981 #[derive(Clone)]
2983 struct TestOptions {
2984 num_row_groups: usize,
2987 num_rows: usize,
2989 record_batch_size: usize,
2991 null_percent: Option<usize>,
2993 write_batch_size: usize,
2998 max_data_page_size: usize,
3000 max_dict_page_size: usize,
3002 writer_version: WriterVersion,
3004 enabled_statistics: EnabledStatistics,
3006 encoding: Encoding,
3008 row_selections: Option<(RowSelection, usize)>,
3010 row_filter: Option<Vec<bool>>,
3012 limit: Option<usize>,
3014 offset: Option<usize>,
3016 }
3017
3018 impl std::fmt::Debug for TestOptions {
3020 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3021 f.debug_struct("TestOptions")
3022 .field("num_row_groups", &self.num_row_groups)
3023 .field("num_rows", &self.num_rows)
3024 .field("record_batch_size", &self.record_batch_size)
3025 .field("null_percent", &self.null_percent)
3026 .field("write_batch_size", &self.write_batch_size)
3027 .field("max_data_page_size", &self.max_data_page_size)
3028 .field("max_dict_page_size", &self.max_dict_page_size)
3029 .field("writer_version", &self.writer_version)
3030 .field("enabled_statistics", &self.enabled_statistics)
3031 .field("encoding", &self.encoding)
3032 .field("row_selections", &self.row_selections.is_some())
3033 .field("row_filter", &self.row_filter.is_some())
3034 .field("limit", &self.limit)
3035 .field("offset", &self.offset)
3036 .finish()
3037 }
3038 }
3039
3040 impl Default for TestOptions {
3041 fn default() -> Self {
3042 Self {
3043 num_row_groups: 2,
3044 num_rows: 100,
3045 record_batch_size: 15,
3046 null_percent: None,
3047 write_batch_size: 64,
3048 max_data_page_size: 1024 * 1024,
3049 max_dict_page_size: 1024 * 1024,
3050 writer_version: WriterVersion::PARQUET_1_0,
3051 enabled_statistics: EnabledStatistics::Page,
3052 encoding: Encoding::PLAIN,
3053 row_selections: None,
3054 row_filter: None,
3055 limit: None,
3056 offset: None,
3057 }
3058 }
3059 }
3060
3061 impl TestOptions {
3062 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3063 Self {
3064 num_row_groups,
3065 num_rows,
3066 record_batch_size,
3067 ..Default::default()
3068 }
3069 }
3070
3071 fn with_null_percent(self, null_percent: usize) -> Self {
3072 Self {
3073 null_percent: Some(null_percent),
3074 ..self
3075 }
3076 }
3077
3078 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3079 Self {
3080 max_data_page_size,
3081 ..self
3082 }
3083 }
3084
3085 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3086 Self {
3087 max_dict_page_size,
3088 ..self
3089 }
3090 }
3091
3092 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3093 Self {
3094 enabled_statistics,
3095 ..self
3096 }
3097 }
3098
3099 fn with_row_selections(self) -> Self {
3100 assert!(self.row_filter.is_none(), "Must set row selection first");
3101
3102 let mut rng = rng();
3103 let step = rng.random_range(self.record_batch_size..self.num_rows);
3104 let row_selections = create_test_selection(
3105 step,
3106 self.num_row_groups * self.num_rows,
3107 rng.random::<bool>(),
3108 );
3109 Self {
3110 row_selections: Some(row_selections),
3111 ..self
3112 }
3113 }
3114
3115 fn with_row_filter(self) -> Self {
3116 let row_count = match &self.row_selections {
3117 Some((_, count)) => *count,
3118 None => self.num_row_groups * self.num_rows,
3119 };
3120
3121 let mut rng = rng();
3122 Self {
3123 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3124 ..self
3125 }
3126 }
3127
3128 fn with_limit(self, limit: usize) -> Self {
3129 Self {
3130 limit: Some(limit),
3131 ..self
3132 }
3133 }
3134
3135 fn with_offset(self, offset: usize) -> Self {
3136 Self {
3137 offset: Some(offset),
3138 ..self
3139 }
3140 }
3141
3142 fn writer_props(&self) -> WriterProperties {
3143 let builder = WriterProperties::builder()
3144 .set_data_page_size_limit(self.max_data_page_size)
3145 .set_write_batch_size(self.write_batch_size)
3146 .set_writer_version(self.writer_version)
3147 .set_statistics_enabled(self.enabled_statistics);
3148
3149 let builder = match self.encoding {
3150 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3151 .set_dictionary_enabled(true)
3152 .set_dictionary_page_size_limit(self.max_dict_page_size),
3153 _ => builder
3154 .set_dictionary_enabled(false)
3155 .set_encoding(self.encoding),
3156 };
3157
3158 builder.build()
3159 }
3160 }
3161
3162 fn run_single_column_reader_tests<T, F, G>(
3169 rand_max: i32,
3170 converted_type: ConvertedType,
3171 arrow_type: Option<ArrowDataType>,
3172 converter: F,
3173 encodings: &[Encoding],
3174 ) where
3175 T: DataType,
3176 G: RandGen<T>,
3177 F: Fn(&[Option<T::T>]) -> ArrayRef,
3178 {
3179 let all_options = vec![
3180 TestOptions::new(2, 100, 15),
3183 TestOptions::new(3, 25, 5),
3188 TestOptions::new(4, 100, 25),
3192 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3194 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3196 TestOptions::new(2, 256, 127).with_null_percent(0),
3198 TestOptions::new(2, 256, 93).with_null_percent(25),
3200 TestOptions::new(4, 100, 25).with_limit(0),
3202 TestOptions::new(4, 100, 25).with_limit(50),
3204 TestOptions::new(4, 100, 25).with_limit(10),
3206 TestOptions::new(4, 100, 25).with_limit(101),
3208 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3210 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3212 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3214 TestOptions::new(2, 256, 91)
3216 .with_null_percent(25)
3217 .with_enabled_statistics(EnabledStatistics::Chunk),
3218 TestOptions::new(2, 256, 91)
3220 .with_null_percent(25)
3221 .with_enabled_statistics(EnabledStatistics::None),
3222 TestOptions::new(2, 128, 91)
3224 .with_null_percent(100)
3225 .with_enabled_statistics(EnabledStatistics::None),
3226 TestOptions::new(2, 100, 15).with_row_selections(),
3231 TestOptions::new(3, 25, 5).with_row_selections(),
3236 TestOptions::new(4, 100, 25).with_row_selections(),
3240 TestOptions::new(3, 256, 73)
3242 .with_max_data_page_size(128)
3243 .with_row_selections(),
3244 TestOptions::new(3, 256, 57)
3246 .with_max_dict_page_size(128)
3247 .with_row_selections(),
3248 TestOptions::new(2, 256, 127)
3250 .with_null_percent(0)
3251 .with_row_selections(),
3252 TestOptions::new(2, 256, 93)
3254 .with_null_percent(25)
3255 .with_row_selections(),
3256 TestOptions::new(2, 256, 93)
3258 .with_null_percent(25)
3259 .with_row_selections()
3260 .with_limit(10),
3261 TestOptions::new(2, 256, 93)
3263 .with_null_percent(25)
3264 .with_row_selections()
3265 .with_offset(20)
3266 .with_limit(10),
3267 TestOptions::new(4, 100, 25).with_row_filter(),
3271 TestOptions::new(4, 100, 25)
3273 .with_row_selections()
3274 .with_row_filter(),
3275 TestOptions::new(2, 256, 93)
3277 .with_null_percent(25)
3278 .with_max_data_page_size(10)
3279 .with_row_filter(),
3280 TestOptions::new(2, 256, 93)
3282 .with_null_percent(25)
3283 .with_max_data_page_size(10)
3284 .with_row_selections()
3285 .with_row_filter(),
3286 TestOptions::new(2, 256, 93)
3288 .with_enabled_statistics(EnabledStatistics::None)
3289 .with_max_data_page_size(10)
3290 .with_row_selections(),
3291 ];
3292
3293 all_options.into_iter().for_each(|opts| {
3294 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3295 for encoding in encodings {
3296 let opts = TestOptions {
3297 writer_version,
3298 encoding: *encoding,
3299 ..opts.clone()
3300 };
3301
3302 single_column_reader_test::<T, _, G>(
3303 opts,
3304 rand_max,
3305 converted_type,
3306 arrow_type.clone(),
3307 &converter,
3308 )
3309 }
3310 }
3311 });
3312 }
3313
3314 fn single_column_reader_test<T, F, G>(
3318 opts: TestOptions,
3319 rand_max: i32,
3320 converted_type: ConvertedType,
3321 arrow_type: Option<ArrowDataType>,
3322 converter: F,
3323 ) where
3324 T: DataType,
3325 G: RandGen<T>,
3326 F: Fn(&[Option<T::T>]) -> ArrayRef,
3327 {
3328 println!(
3330 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3331 T::get_physical_type(),
3332 converted_type,
3333 arrow_type,
3334 opts
3335 );
3336
3337 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3339 Some(null_percent) => {
3340 let mut rng = rng();
3341
3342 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3343 .map(|_| {
3344 std::iter::from_fn(|| {
3345 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3346 })
3347 .take(opts.num_rows)
3348 .collect()
3349 })
3350 .collect();
3351 (Repetition::OPTIONAL, Some(def_levels))
3352 }
3353 None => (Repetition::REQUIRED, None),
3354 };
3355
3356 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3358 .map(|idx| {
3359 let null_count = match def_levels.as_ref() {
3360 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3361 None => 0,
3362 };
3363 G::gen_vec(rand_max, opts.num_rows - null_count)
3364 })
3365 .collect();
3366
3367 let len = match T::get_physical_type() {
3368 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3369 crate::basic::Type::INT96 => 12,
3370 _ => -1,
3371 };
3372
3373 let fields = vec![Arc::new(
3374 Type::primitive_type_builder("leaf", T::get_physical_type())
3375 .with_repetition(repetition)
3376 .with_converted_type(converted_type)
3377 .with_length(len)
3378 .build()
3379 .unwrap(),
3380 )];
3381
3382 let schema = Arc::new(
3383 Type::group_type_builder("test_schema")
3384 .with_fields(fields)
3385 .build()
3386 .unwrap(),
3387 );
3388
3389 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3390
3391 let mut file = tempfile::tempfile().unwrap();
3392
3393 generate_single_column_file_with_data::<T>(
3394 &values,
3395 def_levels.as_ref(),
3396 file.try_clone().unwrap(), schema,
3398 arrow_field,
3399 &opts,
3400 )
3401 .unwrap();
3402
3403 file.rewind().unwrap();
3404
3405 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3406 opts.enabled_statistics == EnabledStatistics::Page,
3407 ));
3408
3409 let mut builder =
3410 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3411
3412 let expected_data = match opts.row_selections {
3413 Some((selections, row_count)) => {
3414 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3415
3416 let mut skip_data: Vec<Option<T::T>> = vec![];
3417 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3418 for select in dequeue {
3419 if select.skip {
3420 without_skip_data.drain(0..select.row_count);
3421 } else {
3422 skip_data.extend(without_skip_data.drain(0..select.row_count));
3423 }
3424 }
3425 builder = builder.with_row_selection(selections);
3426
3427 assert_eq!(skip_data.len(), row_count);
3428 skip_data
3429 }
3430 None => {
3431 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3433 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3434 expected_data
3435 }
3436 };
3437
3438 let mut expected_data = match opts.row_filter {
3439 Some(filter) => {
3440 let expected_data = expected_data
3441 .into_iter()
3442 .zip(filter.iter())
3443 .filter_map(|(d, f)| f.then(|| d))
3444 .collect();
3445
3446 let mut filter_offset = 0;
3447 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3448 ProjectionMask::all(),
3449 move |b| {
3450 let array = BooleanArray::from_iter(
3451 filter
3452 .iter()
3453 .skip(filter_offset)
3454 .take(b.num_rows())
3455 .map(|x| Some(*x)),
3456 );
3457 filter_offset += b.num_rows();
3458 Ok(array)
3459 },
3460 ))]);
3461
3462 builder = builder.with_row_filter(filter);
3463 expected_data
3464 }
3465 None => expected_data,
3466 };
3467
3468 if let Some(offset) = opts.offset {
3469 builder = builder.with_offset(offset);
3470 expected_data = expected_data.into_iter().skip(offset).collect();
3471 }
3472
3473 if let Some(limit) = opts.limit {
3474 builder = builder.with_limit(limit);
3475 expected_data = expected_data.into_iter().take(limit).collect();
3476 }
3477
3478 let mut record_reader = builder
3479 .with_batch_size(opts.record_batch_size)
3480 .build()
3481 .unwrap();
3482
3483 let mut total_read = 0;
3484 loop {
3485 let maybe_batch = record_reader.next();
3486 if total_read < expected_data.len() {
3487 let end = min(total_read + opts.record_batch_size, expected_data.len());
3488 let batch = maybe_batch.unwrap().unwrap();
3489 assert_eq!(end - total_read, batch.num_rows());
3490
3491 let a = converter(&expected_data[total_read..end]);
3492 let b = batch.column(0);
3493
3494 assert_eq!(a.data_type(), b.data_type());
3495 assert_eq!(a.to_data(), b.to_data());
3496 assert_eq!(
3497 a.as_any().type_id(),
3498 b.as_any().type_id(),
3499 "incorrect type ids"
3500 );
3501
3502 total_read = end;
3503 } else {
3504 assert!(maybe_batch.is_none());
3505 break;
3506 }
3507 }
3508 }
3509
3510 fn gen_expected_data<T: DataType>(
3511 def_levels: Option<&Vec<Vec<i16>>>,
3512 values: &[Vec<T::T>],
3513 ) -> Vec<Option<T::T>> {
3514 let data: Vec<Option<T::T>> = match def_levels {
3515 Some(levels) => {
3516 let mut values_iter = values.iter().flatten();
3517 levels
3518 .iter()
3519 .flatten()
3520 .map(|d| match d {
3521 1 => Some(values_iter.next().cloned().unwrap()),
3522 0 => None,
3523 _ => unreachable!(),
3524 })
3525 .collect()
3526 }
3527 None => values.iter().flatten().cloned().map(Some).collect(),
3528 };
3529 data
3530 }
3531
3532 fn generate_single_column_file_with_data<T: DataType>(
3533 values: &[Vec<T::T>],
3534 def_levels: Option<&Vec<Vec<i16>>>,
3535 file: File,
3536 schema: TypePtr,
3537 field: Option<Field>,
3538 opts: &TestOptions,
3539 ) -> Result<ParquetMetaData> {
3540 let mut writer_props = opts.writer_props();
3541 if let Some(field) = field {
3542 let arrow_schema = Schema::new(vec![field]);
3543 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3544 }
3545
3546 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3547
3548 for (idx, v) in values.iter().enumerate() {
3549 let def_levels = def_levels.map(|d| d[idx].as_slice());
3550 let mut row_group_writer = writer.next_row_group()?;
3551 {
3552 let mut column_writer = row_group_writer
3553 .next_column()?
3554 .expect("Column writer is none!");
3555
3556 column_writer
3557 .typed::<T>()
3558 .write_batch(v, def_levels, None)?;
3559
3560 column_writer.close()?;
3561 }
3562 row_group_writer.close()?;
3563 }
3564
3565 writer.close()
3566 }
3567
3568 fn get_test_file(file_name: &str) -> File {
3569 let mut path = PathBuf::new();
3570 path.push(arrow::util::test_util::arrow_test_data());
3571 path.push(file_name);
3572
3573 File::open(path.as_path()).expect("File not found!")
3574 }
3575
3576 #[test]
3577 fn test_read_structs() {
3578 let testdata = arrow::util::test_util::parquet_test_data();
3582 let path = format!("{testdata}/nested_structs.rust.parquet");
3583 let file = File::open(&path).unwrap();
3584 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3585
3586 for batch in record_batch_reader {
3587 batch.unwrap();
3588 }
3589
3590 let file = File::open(&path).unwrap();
3591 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3592
3593 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3594 let projected_reader = builder
3595 .with_projection(mask)
3596 .with_batch_size(60)
3597 .build()
3598 .unwrap();
3599
3600 let expected_schema = Schema::new(vec![
3601 Field::new(
3602 "roll_num",
3603 ArrowDataType::Struct(Fields::from(vec![Field::new(
3604 "count",
3605 ArrowDataType::UInt64,
3606 false,
3607 )])),
3608 false,
3609 ),
3610 Field::new(
3611 "PC_CUR",
3612 ArrowDataType::Struct(Fields::from(vec![
3613 Field::new("mean", ArrowDataType::Int64, false),
3614 Field::new("sum", ArrowDataType::Int64, false),
3615 ])),
3616 false,
3617 ),
3618 ]);
3619
3620 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3622
3623 for batch in projected_reader {
3624 let batch = batch.unwrap();
3625 assert_eq!(batch.schema().as_ref(), &expected_schema);
3626 }
3627 }
3628
3629 #[test]
3630 fn test_read_structs_by_name() {
3632 let testdata = arrow::util::test_util::parquet_test_data();
3633 let path = format!("{testdata}/nested_structs.rust.parquet");
3634 let file = File::open(&path).unwrap();
3635 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3636
3637 for batch in record_batch_reader {
3638 batch.unwrap();
3639 }
3640
3641 let file = File::open(&path).unwrap();
3642 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3643
3644 let mask = ProjectionMask::columns(
3645 builder.parquet_schema(),
3646 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3647 );
3648 let projected_reader = builder
3649 .with_projection(mask)
3650 .with_batch_size(60)
3651 .build()
3652 .unwrap();
3653
3654 let expected_schema = Schema::new(vec![
3655 Field::new(
3656 "roll_num",
3657 ArrowDataType::Struct(Fields::from(vec![Field::new(
3658 "count",
3659 ArrowDataType::UInt64,
3660 false,
3661 )])),
3662 false,
3663 ),
3664 Field::new(
3665 "PC_CUR",
3666 ArrowDataType::Struct(Fields::from(vec![
3667 Field::new("mean", ArrowDataType::Int64, false),
3668 Field::new("sum", ArrowDataType::Int64, false),
3669 ])),
3670 false,
3671 ),
3672 ]);
3673
3674 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3675
3676 for batch in projected_reader {
3677 let batch = batch.unwrap();
3678 assert_eq!(batch.schema().as_ref(), &expected_schema);
3679 }
3680 }
3681
3682 #[test]
3683 fn test_read_maps() {
3684 let testdata = arrow::util::test_util::parquet_test_data();
3685 let path = format!("{testdata}/nested_maps.snappy.parquet");
3686 let file = File::open(path).unwrap();
3687 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3688
3689 for batch in record_batch_reader {
3690 batch.unwrap();
3691 }
3692 }
3693
3694 #[test]
3696 fn test_unknown_logical_type() {
3697 let message_type = "message uk {
3698 OPTIONAL INT32 uki32 (UNKNOWN);
3699 OPTIONAL INT64 uki64 (UNKNOWN);
3700 OPTIONAL INT96 uki96 (UNKNOWN);
3701 OPTIONAL BOOLEAN ukbool (UNKNOWN);
3702 OPTIONAL FLOAT ukfloat (UNKNOWN);
3703 OPTIONAL DOUBLE ukdbl (UNKNOWN);
3704 OPTIONAL BYTE_ARRAY ukbytes (UNKNOWN);
3705 OPTIONAL FIXED_LEN_BYTE_ARRAY(10) ukflba (UNKNOWN);
3706 }";
3707
3708 let schema = Arc::new(parse_message_type(message_type).unwrap());
3709 let file = tempfile::tempfile().unwrap();
3710
3711 let mut writer =
3712 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3713 .unwrap();
3714
3715 let mut row_group_writer = writer.next_row_group().unwrap();
3716
3717 fn write_nulls<T: DataType>(row_group_writer: &mut SerializedRowGroupWriter<'_, File>) {
3718 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3719 column_writer
3721 .typed::<T>()
3722 .write_batch(&[], Some(&[0, 0, 0, 0]), None)
3723 .unwrap();
3724 column_writer.close().unwrap();
3725 }
3726
3727 write_nulls::<Int32Type>(&mut row_group_writer);
3729
3730 write_nulls::<Int64Type>(&mut row_group_writer);
3732
3733 write_nulls::<Int96Type>(&mut row_group_writer);
3735
3736 write_nulls::<BoolType>(&mut row_group_writer);
3738
3739 write_nulls::<FloatType>(&mut row_group_writer);
3741
3742 write_nulls::<DoubleType>(&mut row_group_writer);
3744
3745 write_nulls::<ByteArrayType>(&mut row_group_writer);
3747
3748 write_nulls::<FixedLenByteArrayType>(&mut row_group_writer);
3750
3751 row_group_writer.close().unwrap();
3752
3753 writer.close().unwrap();
3754
3755 let mut reader = ParquetRecordBatchReader::try_new(file, 4).unwrap();
3756 let batch = reader.next().unwrap().unwrap();
3757
3758 for col in batch.columns() {
3759 assert_eq!(col.len(), 4);
3760 assert_eq!(col.logical_null_count(), 4);
3761 assert_eq!(*col.data_type(), ArrowDataType::Null);
3762 }
3763 }
3764
3765 #[test]
3766 fn test_nested_nullability() {
3767 let message_type = "message nested {
3768 OPTIONAL Group group {
3769 REQUIRED INT32 leaf;
3770 }
3771 }";
3772
3773 let file = tempfile::tempfile().unwrap();
3774 let schema = Arc::new(parse_message_type(message_type).unwrap());
3775
3776 {
3777 let mut writer =
3779 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3780 .unwrap();
3781
3782 {
3783 let mut row_group_writer = writer.next_row_group().unwrap();
3784 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3785
3786 column_writer
3787 .typed::<Int32Type>()
3788 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3789 .unwrap();
3790
3791 column_writer.close().unwrap();
3792 row_group_writer.close().unwrap();
3793 }
3794
3795 writer.close().unwrap();
3796 }
3797
3798 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3799 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3800
3801 let reader = builder.with_projection(mask).build().unwrap();
3802
3803 let expected_schema = Schema::new(vec![Field::new(
3804 "group",
3805 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3806 true,
3807 )]);
3808
3809 let batch = reader.into_iter().next().unwrap().unwrap();
3810 assert_eq!(batch.schema().as_ref(), &expected_schema);
3811 assert_eq!(batch.num_rows(), 4);
3812 assert_eq!(batch.column(0).null_count(), 2);
3813 }
3814
3815 #[test]
3816 fn test_dictionary_preservation() {
3817 let fields = vec![Arc::new(
3818 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3819 .with_repetition(Repetition::OPTIONAL)
3820 .with_converted_type(ConvertedType::UTF8)
3821 .build()
3822 .unwrap(),
3823 )];
3824
3825 let schema = Arc::new(
3826 Type::group_type_builder("test_schema")
3827 .with_fields(fields)
3828 .build()
3829 .unwrap(),
3830 );
3831
3832 let dict_type = ArrowDataType::Dictionary(
3833 Box::new(ArrowDataType::Int32),
3834 Box::new(ArrowDataType::Utf8),
3835 );
3836
3837 let arrow_field = Field::new("leaf", dict_type, true);
3838
3839 let mut file = tempfile::tempfile().unwrap();
3840
3841 let values = vec![
3842 vec![
3843 ByteArray::from("hello"),
3844 ByteArray::from("a"),
3845 ByteArray::from("b"),
3846 ByteArray::from("d"),
3847 ],
3848 vec![
3849 ByteArray::from("c"),
3850 ByteArray::from("a"),
3851 ByteArray::from("b"),
3852 ],
3853 ];
3854
3855 let def_levels = vec![
3856 vec![1, 0, 0, 1, 0, 0, 1, 1],
3857 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3858 ];
3859
3860 let opts = TestOptions {
3861 encoding: Encoding::RLE_DICTIONARY,
3862 ..Default::default()
3863 };
3864
3865 generate_single_column_file_with_data::<ByteArrayType>(
3866 &values,
3867 Some(&def_levels),
3868 file.try_clone().unwrap(), schema,
3870 Some(arrow_field),
3871 &opts,
3872 )
3873 .unwrap();
3874
3875 file.rewind().unwrap();
3876
3877 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3878
3879 let batches = record_reader
3880 .collect::<Result<Vec<RecordBatch>, _>>()
3881 .unwrap();
3882
3883 assert_eq!(batches.len(), 6);
3884 assert!(batches.iter().all(|x| x.num_columns() == 1));
3885
3886 let row_counts = batches
3887 .iter()
3888 .map(|x| (x.num_rows(), x.column(0).null_count()))
3889 .collect::<Vec<_>>();
3890
3891 assert_eq!(
3892 row_counts,
3893 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3894 );
3895
3896 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3897
3898 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3900 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3902 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3903 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3905 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3906 }
3907
3908 #[test]
3909 fn test_read_null_list() {
3910 let testdata = arrow::util::test_util::parquet_test_data();
3911 let path = format!("{testdata}/null_list.parquet");
3912 let file = File::open(path).unwrap();
3913 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3914
3915 let batch = record_batch_reader.next().unwrap().unwrap();
3916 assert_eq!(batch.num_rows(), 1);
3917 assert_eq!(batch.num_columns(), 1);
3918 assert_eq!(batch.column(0).len(), 1);
3919
3920 let list = batch
3921 .column(0)
3922 .as_any()
3923 .downcast_ref::<ListArray>()
3924 .unwrap();
3925 assert_eq!(list.len(), 1);
3926 assert!(list.is_valid(0));
3927
3928 let val = list.value(0);
3929 assert_eq!(val.len(), 0);
3930 }
3931
3932 #[test]
3933 fn test_null_schema_inference() {
3934 let testdata = arrow::util::test_util::parquet_test_data();
3935 let path = format!("{testdata}/null_list.parquet");
3936 let file = File::open(path).unwrap();
3937
3938 let arrow_field = Field::new(
3939 "emptylist",
3940 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3941 true,
3942 );
3943
3944 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3945 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3946 let schema = builder.schema();
3947 assert_eq!(schema.fields().len(), 1);
3948 assert_eq!(schema.field(0), &arrow_field);
3949 }
3950
3951 #[test]
3952 fn test_skip_metadata() {
3953 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3954 let field = Field::new("col", col.data_type().clone(), true);
3955
3956 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3957
3958 let metadata = [("key".to_string(), "value".to_string())]
3959 .into_iter()
3960 .collect();
3961
3962 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3963
3964 assert_ne!(schema_with_metadata, schema_without_metadata);
3965
3966 let batch =
3967 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3968
3969 let file = |version: WriterVersion| {
3970 let props = WriterProperties::builder()
3971 .set_writer_version(version)
3972 .build();
3973
3974 let file = tempfile().unwrap();
3975 let mut writer =
3976 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3977 .unwrap();
3978 writer.write(&batch).unwrap();
3979 writer.close().unwrap();
3980 file
3981 };
3982
3983 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3984
3985 let v1_reader = file(WriterVersion::PARQUET_1_0);
3986 let v2_reader = file(WriterVersion::PARQUET_2_0);
3987
3988 let arrow_reader =
3989 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3990 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3991
3992 let reader =
3993 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3994 .unwrap()
3995 .build()
3996 .unwrap();
3997 assert_eq!(reader.schema(), schema_without_metadata);
3998
3999 let arrow_reader =
4000 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4001 assert_eq!(arrow_reader.schema(), schema_with_metadata);
4002
4003 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4004 .unwrap()
4005 .build()
4006 .unwrap();
4007 assert_eq!(reader.schema(), schema_without_metadata);
4008 }
4009
4010 fn write_parquet_from_iter<I, F>(value: I) -> File
4011 where
4012 I: IntoIterator<Item = (F, ArrayRef)>,
4013 F: AsRef<str>,
4014 {
4015 let batch = RecordBatch::try_from_iter(value).unwrap();
4016 let file = tempfile().unwrap();
4017 let mut writer =
4018 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4019 writer.write(&batch).unwrap();
4020 writer.close().unwrap();
4021 file
4022 }
4023
4024 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4025 where
4026 I: IntoIterator<Item = (F, ArrayRef)>,
4027 F: AsRef<str>,
4028 {
4029 let file = write_parquet_from_iter(value);
4030 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4031 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4032 file.try_clone().unwrap(),
4033 options_with_schema,
4034 );
4035 assert_eq!(builder.err().unwrap().to_string(), expected_error);
4036 }
4037
4038 #[test]
4039 fn test_schema_too_few_columns() {
4040 run_schema_test_with_error(
4041 vec![
4042 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4043 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4044 ],
4045 Arc::new(Schema::new(vec![Field::new(
4046 "int64",
4047 ArrowDataType::Int64,
4048 false,
4049 )])),
4050 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4051 );
4052 }
4053
4054 #[test]
4055 fn test_schema_too_many_columns() {
4056 run_schema_test_with_error(
4057 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4058 Arc::new(Schema::new(vec![
4059 Field::new("int64", ArrowDataType::Int64, false),
4060 Field::new("int32", ArrowDataType::Int32, false),
4061 ])),
4062 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4063 );
4064 }
4065
4066 #[test]
4067 fn test_schema_mismatched_column_names() {
4068 run_schema_test_with_error(
4069 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4070 Arc::new(Schema::new(vec![Field::new(
4071 "other",
4072 ArrowDataType::Int64,
4073 false,
4074 )])),
4075 "Arrow: incompatible arrow schema, expected field named int64 got other",
4076 );
4077 }
4078
4079 #[test]
4080 fn test_schema_incompatible_columns() {
4081 run_schema_test_with_error(
4082 vec![
4083 (
4084 "col1_invalid",
4085 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4086 ),
4087 (
4088 "col2_valid",
4089 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4090 ),
4091 (
4092 "col3_invalid",
4093 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4094 ),
4095 ],
4096 Arc::new(Schema::new(vec![
4097 Field::new("col1_invalid", ArrowDataType::Int32, false),
4098 Field::new("col2_valid", ArrowDataType::Int32, false),
4099 Field::new("col3_invalid", ArrowDataType::Int32, false),
4100 ])),
4101 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4102 );
4103 }
4104
4105 #[test]
4106 fn test_one_incompatible_nested_column() {
4107 let nested_fields = Fields::from(vec![
4108 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4109 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4110 ]);
4111 let nested = StructArray::try_new(
4112 nested_fields,
4113 vec![
4114 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4115 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4116 ],
4117 None,
4118 )
4119 .expect("struct array");
4120 let supplied_nested_fields = Fields::from(vec![
4121 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4122 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4123 ]);
4124 run_schema_test_with_error(
4125 vec![
4126 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4127 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4128 ("nested", Arc::new(nested) as ArrayRef),
4129 ],
4130 Arc::new(Schema::new(vec![
4131 Field::new("col1", ArrowDataType::Int64, false),
4132 Field::new("col2", ArrowDataType::Int32, false),
4133 Field::new(
4134 "nested",
4135 ArrowDataType::Struct(supplied_nested_fields),
4136 false,
4137 ),
4138 ])),
4139 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4140 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4141 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4142 );
4143 }
4144
4145 fn utf8_parquet() -> Bytes {
4147 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4148 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4149 let props = None;
4150 let mut parquet_data = vec![];
4152 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4153 writer.write(&batch).unwrap();
4154 writer.close().unwrap();
4155 Bytes::from(parquet_data)
4156 }
4157
4158 #[test]
4159 fn test_schema_error_bad_types() {
4160 let parquet_data = utf8_parquet();
4162
4163 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4165 "column1",
4166 arrow::datatypes::DataType::Int32,
4167 false,
4168 )]));
4169
4170 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4172 let err =
4173 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4174 .unwrap_err();
4175 assert_eq!(
4176 err.to_string(),
4177 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4178 )
4179 }
4180
4181 #[test]
4182 fn test_schema_error_bad_nullability() {
4183 let parquet_data = utf8_parquet();
4185
4186 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4188 "column1",
4189 arrow::datatypes::DataType::Utf8,
4190 true,
4191 )]));
4192
4193 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4195 let err =
4196 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4197 .unwrap_err();
4198 assert_eq!(
4199 err.to_string(),
4200 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4201 )
4202 }
4203
4204 #[test]
4205 fn test_read_binary_as_utf8() {
4206 let file = write_parquet_from_iter(vec![
4207 (
4208 "binary_to_utf8",
4209 Arc::new(BinaryArray::from(vec![
4210 b"one".as_ref(),
4211 b"two".as_ref(),
4212 b"three".as_ref(),
4213 ])) as ArrayRef,
4214 ),
4215 (
4216 "large_binary_to_large_utf8",
4217 Arc::new(LargeBinaryArray::from(vec![
4218 b"one".as_ref(),
4219 b"two".as_ref(),
4220 b"three".as_ref(),
4221 ])) as ArrayRef,
4222 ),
4223 (
4224 "binary_view_to_utf8_view",
4225 Arc::new(BinaryViewArray::from(vec![
4226 b"one".as_ref(),
4227 b"two".as_ref(),
4228 b"three".as_ref(),
4229 ])) as ArrayRef,
4230 ),
4231 ]);
4232 let supplied_fields = Fields::from(vec![
4233 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4234 Field::new(
4235 "large_binary_to_large_utf8",
4236 ArrowDataType::LargeUtf8,
4237 false,
4238 ),
4239 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4240 ]);
4241
4242 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4243 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4244 file.try_clone().unwrap(),
4245 options,
4246 )
4247 .expect("reader builder with schema")
4248 .build()
4249 .expect("reader with schema");
4250
4251 let batch = arrow_reader.next().unwrap().unwrap();
4252 assert_eq!(batch.num_columns(), 3);
4253 assert_eq!(batch.num_rows(), 3);
4254 assert_eq!(
4255 batch
4256 .column(0)
4257 .as_string::<i32>()
4258 .iter()
4259 .collect::<Vec<_>>(),
4260 vec![Some("one"), Some("two"), Some("three")]
4261 );
4262
4263 assert_eq!(
4264 batch
4265 .column(1)
4266 .as_string::<i64>()
4267 .iter()
4268 .collect::<Vec<_>>(),
4269 vec![Some("one"), Some("two"), Some("three")]
4270 );
4271
4272 assert_eq!(
4273 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4274 vec![Some("one"), Some("two"), Some("three")]
4275 );
4276 }
4277
4278 #[test]
4279 #[should_panic(expected = "Invalid UTF8 sequence at")]
4280 fn test_read_non_utf8_binary_as_utf8() {
4281 let file = write_parquet_from_iter(vec![(
4282 "non_utf8_binary",
4283 Arc::new(BinaryArray::from(vec![
4284 b"\xDE\x00\xFF".as_ref(),
4285 b"\xDE\x01\xAA".as_ref(),
4286 b"\xDE\x02\xFF".as_ref(),
4287 ])) as ArrayRef,
4288 )]);
4289 let supplied_fields = Fields::from(vec![Field::new(
4290 "non_utf8_binary",
4291 ArrowDataType::Utf8,
4292 false,
4293 )]);
4294
4295 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4296 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4297 file.try_clone().unwrap(),
4298 options,
4299 )
4300 .expect("reader builder with schema")
4301 .build()
4302 .expect("reader with schema");
4303 arrow_reader.next().unwrap().unwrap_err();
4304 }
4305
4306 #[test]
4307 fn test_with_schema() {
4308 let nested_fields = Fields::from(vec![
4309 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4310 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4311 ]);
4312
4313 let nested_arrays: Vec<ArrayRef> = vec![
4314 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4315 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4316 ];
4317
4318 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4319
4320 let file = write_parquet_from_iter(vec![
4321 (
4322 "int32_to_ts_second",
4323 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4324 ),
4325 (
4326 "date32_to_date64",
4327 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4328 ),
4329 ("nested", Arc::new(nested) as ArrayRef),
4330 ]);
4331
4332 let supplied_nested_fields = Fields::from(vec![
4333 Field::new(
4334 "utf8_to_dict",
4335 ArrowDataType::Dictionary(
4336 Box::new(ArrowDataType::Int32),
4337 Box::new(ArrowDataType::Utf8),
4338 ),
4339 false,
4340 ),
4341 Field::new(
4342 "int64_to_ts_nano",
4343 ArrowDataType::Timestamp(
4344 arrow::datatypes::TimeUnit::Nanosecond,
4345 Some("+10:00".into()),
4346 ),
4347 false,
4348 ),
4349 ]);
4350
4351 let supplied_schema = Arc::new(Schema::new(vec![
4352 Field::new(
4353 "int32_to_ts_second",
4354 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4355 false,
4356 ),
4357 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4358 Field::new(
4359 "nested",
4360 ArrowDataType::Struct(supplied_nested_fields),
4361 false,
4362 ),
4363 ]));
4364
4365 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4366 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4367 file.try_clone().unwrap(),
4368 options,
4369 )
4370 .expect("reader builder with schema")
4371 .build()
4372 .expect("reader with schema");
4373
4374 assert_eq!(arrow_reader.schema(), supplied_schema);
4375 let batch = arrow_reader.next().unwrap().unwrap();
4376 assert_eq!(batch.num_columns(), 3);
4377 assert_eq!(batch.num_rows(), 4);
4378 assert_eq!(
4379 batch
4380 .column(0)
4381 .as_any()
4382 .downcast_ref::<TimestampSecondArray>()
4383 .expect("downcast to timestamp second")
4384 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4385 .map(|v| v.to_string())
4386 .expect("value as datetime"),
4387 "1970-01-01 01:00:00 +01:00"
4388 );
4389 assert_eq!(
4390 batch
4391 .column(1)
4392 .as_any()
4393 .downcast_ref::<Date64Array>()
4394 .expect("downcast to date64")
4395 .value_as_date(0)
4396 .map(|v| v.to_string())
4397 .expect("value as date"),
4398 "1970-01-01"
4399 );
4400
4401 let nested = batch
4402 .column(2)
4403 .as_any()
4404 .downcast_ref::<StructArray>()
4405 .expect("downcast to struct");
4406
4407 let nested_dict = nested
4408 .column(0)
4409 .as_any()
4410 .downcast_ref::<Int32DictionaryArray>()
4411 .expect("downcast to dictionary");
4412
4413 assert_eq!(
4414 nested_dict
4415 .values()
4416 .as_any()
4417 .downcast_ref::<StringArray>()
4418 .expect("downcast to string")
4419 .iter()
4420 .collect::<Vec<_>>(),
4421 vec![Some("a"), Some("b")]
4422 );
4423
4424 assert_eq!(
4425 nested_dict.keys().iter().collect::<Vec<_>>(),
4426 vec![Some(0), Some(0), Some(0), Some(1)]
4427 );
4428
4429 assert_eq!(
4430 nested
4431 .column(1)
4432 .as_any()
4433 .downcast_ref::<TimestampNanosecondArray>()
4434 .expect("downcast to timestamp nanosecond")
4435 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4436 .map(|v| v.to_string())
4437 .expect("value as datetime"),
4438 "1970-01-01 10:00:00.000000001 +10:00"
4439 );
4440 }
4441
4442 #[test]
4443 fn test_empty_projection() {
4444 let testdata = arrow::util::test_util::parquet_test_data();
4445 let path = format!("{testdata}/alltypes_plain.parquet");
4446 let file = File::open(path).unwrap();
4447
4448 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4449 let file_metadata = builder.metadata().file_metadata();
4450 let expected_rows = file_metadata.num_rows() as usize;
4451
4452 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4453 let batch_reader = builder
4454 .with_projection(mask)
4455 .with_batch_size(2)
4456 .build()
4457 .unwrap();
4458
4459 let mut total_rows = 0;
4460 for maybe_batch in batch_reader {
4461 let batch = maybe_batch.unwrap();
4462 total_rows += batch.num_rows();
4463 assert_eq!(batch.num_columns(), 0);
4464 assert!(batch.num_rows() <= 2);
4465 }
4466
4467 assert_eq!(total_rows, expected_rows);
4468 }
4469
4470 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4471 let schema = Arc::new(Schema::new(vec![Field::new(
4472 "list",
4473 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4474 true,
4475 )]));
4476
4477 let mut buf = Vec::with_capacity(1024);
4478
4479 let mut writer = ArrowWriter::try_new(
4480 &mut buf,
4481 schema.clone(),
4482 Some(
4483 WriterProperties::builder()
4484 .set_max_row_group_row_count(Some(row_group_size))
4485 .build(),
4486 ),
4487 )
4488 .unwrap();
4489 for _ in 0..2 {
4490 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4491 for _ in 0..(batch_size) {
4492 list_builder.append(true);
4493 }
4494 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4495 .unwrap();
4496 writer.write(&batch).unwrap();
4497 }
4498 writer.close().unwrap();
4499
4500 let mut record_reader =
4501 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4502 assert_eq!(
4503 batch_size,
4504 record_reader.next().unwrap().unwrap().num_rows()
4505 );
4506 assert_eq!(
4507 batch_size,
4508 record_reader.next().unwrap().unwrap().num_rows()
4509 );
4510 }
4511
4512 #[test]
4513 fn test_row_group_exact_multiple() {
4514 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4515 test_row_group_batch(8, 8);
4516 test_row_group_batch(10, 8);
4517 test_row_group_batch(8, 10);
4518 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4519 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4520 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4521 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4522 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4523 }
4524
4525 fn get_expected_batches(
4528 column: &RecordBatch,
4529 selection: &RowSelection,
4530 batch_size: usize,
4531 ) -> Vec<RecordBatch> {
4532 let mut expected_batches = vec![];
4533
4534 let mut selection: VecDeque<_> = selection.clone().into();
4535 let mut row_offset = 0;
4536 let mut last_start = None;
4537 while row_offset < column.num_rows() && !selection.is_empty() {
4538 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4539 while batch_remaining > 0 && !selection.is_empty() {
4540 let (to_read, skip) = match selection.front_mut() {
4541 Some(selection) if selection.row_count > batch_remaining => {
4542 selection.row_count -= batch_remaining;
4543 (batch_remaining, selection.skip)
4544 }
4545 Some(_) => {
4546 let select = selection.pop_front().unwrap();
4547 (select.row_count, select.skip)
4548 }
4549 None => break,
4550 };
4551
4552 batch_remaining -= to_read;
4553
4554 match skip {
4555 true => {
4556 if let Some(last_start) = last_start.take() {
4557 expected_batches.push(column.slice(last_start, row_offset - last_start))
4558 }
4559 row_offset += to_read
4560 }
4561 false => {
4562 last_start.get_or_insert(row_offset);
4563 row_offset += to_read
4564 }
4565 }
4566 }
4567 }
4568
4569 if let Some(last_start) = last_start.take() {
4570 expected_batches.push(column.slice(last_start, row_offset - last_start))
4571 }
4572
4573 for batch in &expected_batches[..expected_batches.len() - 1] {
4575 assert_eq!(batch.num_rows(), batch_size);
4576 }
4577
4578 expected_batches
4579 }
4580
4581 fn create_test_selection(
4582 step_len: usize,
4583 total_len: usize,
4584 skip_first: bool,
4585 ) -> (RowSelection, usize) {
4586 let mut remaining = total_len;
4587 let mut skip = skip_first;
4588 let mut vec = vec![];
4589 let mut selected_count = 0;
4590 while remaining != 0 {
4591 let step = if remaining > step_len {
4592 step_len
4593 } else {
4594 remaining
4595 };
4596 vec.push(RowSelector {
4597 row_count: step,
4598 skip,
4599 });
4600 remaining -= step;
4601 if !skip {
4602 selected_count += step;
4603 }
4604 skip = !skip;
4605 }
4606 (vec.into(), selected_count)
4607 }
4608
4609 #[test]
4610 fn test_scan_row_with_selection() {
4611 let testdata = arrow::util::test_util::parquet_test_data();
4612 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4613 let test_file = File::open(&path).unwrap();
4614
4615 let mut serial_reader =
4616 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4617 let data = serial_reader.next().unwrap().unwrap();
4618
4619 let do_test = |batch_size: usize, selection_len: usize| {
4620 for skip_first in [false, true] {
4621 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4622
4623 let expected = get_expected_batches(&data, &selections, batch_size);
4624 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4625 assert_eq!(
4626 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4627 expected,
4628 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4629 );
4630 }
4631 };
4632
4633 do_test(1000, 1000);
4636
4637 do_test(20, 20);
4639
4640 do_test(20, 5);
4642
4643 do_test(20, 5);
4646
4647 fn create_skip_reader(
4648 test_file: &File,
4649 batch_size: usize,
4650 selections: RowSelection,
4651 ) -> ParquetRecordBatchReader {
4652 let options =
4653 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4654 let file = test_file.try_clone().unwrap();
4655 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4656 .unwrap()
4657 .with_batch_size(batch_size)
4658 .with_row_selection(selections)
4659 .build()
4660 .unwrap()
4661 }
4662 }
4663
4664 #[test]
4665 fn test_batch_size_overallocate() {
4666 let testdata = arrow::util::test_util::parquet_test_data();
4667 let path = format!("{testdata}/alltypes_plain.parquet");
4669 let test_file = File::open(path).unwrap();
4670
4671 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4672 let num_rows = builder.metadata.file_metadata().num_rows();
4673 let reader = builder
4674 .with_batch_size(1024)
4675 .with_projection(ProjectionMask::all())
4676 .build()
4677 .unwrap();
4678 assert_ne!(1024, num_rows);
4679 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4680 }
4681
4682 #[test]
4683 fn test_read_with_page_index_enabled() {
4684 let testdata = arrow::util::test_util::parquet_test_data();
4685
4686 {
4687 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4689 let test_file = File::open(path).unwrap();
4690 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4691 test_file,
4692 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4693 )
4694 .unwrap();
4695 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4696 let reader = builder.build().unwrap();
4697 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4698 assert_eq!(batches.len(), 8);
4699 }
4700
4701 {
4702 let path = format!("{testdata}/alltypes_plain.parquet");
4704 let test_file = File::open(path).unwrap();
4705 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4706 test_file,
4707 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4708 )
4709 .unwrap();
4710 assert!(builder.metadata().offset_index().is_none());
4713 let reader = builder.build().unwrap();
4714 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4715 assert_eq!(batches.len(), 1);
4716 }
4717 }
4718
4719 #[test]
4720 fn test_raw_repetition() {
4721 const MESSAGE_TYPE: &str = "
4722 message Log {
4723 OPTIONAL INT32 eventType;
4724 REPEATED INT32 category;
4725 REPEATED group filter {
4726 OPTIONAL INT32 error;
4727 }
4728 }
4729 ";
4730 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4731 let props = Default::default();
4732
4733 let mut buf = Vec::with_capacity(1024);
4734 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4735 let mut row_group_writer = writer.next_row_group().unwrap();
4736
4737 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4739 col_writer
4740 .typed::<Int32Type>()
4741 .write_batch(&[1], Some(&[1]), None)
4742 .unwrap();
4743 col_writer.close().unwrap();
4744 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4746 col_writer
4747 .typed::<Int32Type>()
4748 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4749 .unwrap();
4750 col_writer.close().unwrap();
4751 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4753 col_writer
4754 .typed::<Int32Type>()
4755 .write_batch(&[1], Some(&[1]), Some(&[0]))
4756 .unwrap();
4757 col_writer.close().unwrap();
4758
4759 let rg_md = row_group_writer.close().unwrap();
4760 assert_eq!(rg_md.num_rows(), 1);
4761 writer.close().unwrap();
4762
4763 let bytes = Bytes::from(buf);
4764
4765 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4766 let full = no_mask.next().unwrap().unwrap();
4767
4768 assert_eq!(full.num_columns(), 3);
4769
4770 for idx in 0..3 {
4771 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4772 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4773 let mut reader = b.with_projection(mask).build().unwrap();
4774 let projected = reader.next().unwrap().unwrap();
4775
4776 assert_eq!(projected.num_columns(), 1);
4777 assert_eq!(full.column(idx), projected.column(0));
4778 }
4779 }
4780
4781 #[test]
4782 fn test_read_lz4_raw() {
4783 let testdata = arrow::util::test_util::parquet_test_data();
4784 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4785 let file = File::open(path).unwrap();
4786
4787 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4788 .unwrap()
4789 .collect::<Result<Vec<_>, _>>()
4790 .unwrap();
4791 assert_eq!(batches.len(), 1);
4792 let batch = &batches[0];
4793
4794 assert_eq!(batch.num_columns(), 3);
4795 assert_eq!(batch.num_rows(), 4);
4796
4797 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4799 assert_eq!(
4800 a.values(),
4801 &[1593604800, 1593604800, 1593604801, 1593604801]
4802 );
4803
4804 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4805 let a: Vec<_> = a.iter().flatten().collect();
4806 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4807
4808 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4809 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4810 }
4811
4812 #[test]
4822 fn test_read_lz4_hadoop_fallback() {
4823 for file in [
4824 "hadoop_lz4_compressed.parquet",
4825 "non_hadoop_lz4_compressed.parquet",
4826 ] {
4827 let testdata = arrow::util::test_util::parquet_test_data();
4828 let path = format!("{testdata}/{file}");
4829 let file = File::open(path).unwrap();
4830 let expected_rows = 4;
4831
4832 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4833 .unwrap()
4834 .collect::<Result<Vec<_>, _>>()
4835 .unwrap();
4836 assert_eq!(batches.len(), 1);
4837 let batch = &batches[0];
4838
4839 assert_eq!(batch.num_columns(), 3);
4840 assert_eq!(batch.num_rows(), expected_rows);
4841
4842 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4843 assert_eq!(
4844 a.values(),
4845 &[1593604800, 1593604800, 1593604801, 1593604801]
4846 );
4847
4848 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4849 let b: Vec<_> = b.iter().flatten().collect();
4850 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4851
4852 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4853 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4854 }
4855 }
4856
4857 #[test]
4858 fn test_read_lz4_hadoop_large() {
4859 let testdata = arrow::util::test_util::parquet_test_data();
4860 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4861 let file = File::open(path).unwrap();
4862 let expected_rows = 10000;
4863
4864 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4865 .unwrap()
4866 .collect::<Result<Vec<_>, _>>()
4867 .unwrap();
4868 assert_eq!(batches.len(), 1);
4869 let batch = &batches[0];
4870
4871 assert_eq!(batch.num_columns(), 1);
4872 assert_eq!(batch.num_rows(), expected_rows);
4873
4874 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4875 let a: Vec<_> = a.iter().flatten().collect();
4876 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4877 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4878 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4879 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4880 }
4881
4882 #[test]
4883 #[cfg(feature = "snap")]
4884 fn test_read_nested_lists() {
4885 let testdata = arrow::util::test_util::parquet_test_data();
4886 let path = format!("{testdata}/nested_lists.snappy.parquet");
4887 let file = File::open(path).unwrap();
4888
4889 let f = file.try_clone().unwrap();
4890 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4891 let expected = reader.next().unwrap().unwrap();
4892 assert_eq!(expected.num_rows(), 3);
4893
4894 let selection = RowSelection::from(vec![
4895 RowSelector::skip(1),
4896 RowSelector::select(1),
4897 RowSelector::skip(1),
4898 ]);
4899 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4900 .unwrap()
4901 .with_row_selection(selection)
4902 .build()
4903 .unwrap();
4904
4905 let actual = reader.next().unwrap().unwrap();
4906 assert_eq!(actual.num_rows(), 1);
4907 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4908 }
4909
4910 #[test]
4911 fn test_arbitrary_decimal() {
4912 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4913 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4914 .with_precision_and_scale(19, 0)
4915 .unwrap();
4916 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4917 .with_precision_and_scale(12, 0)
4918 .unwrap();
4919 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4920 .with_precision_and_scale(17, 10)
4921 .unwrap();
4922
4923 let written = RecordBatch::try_from_iter([
4924 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4925 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4926 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4927 ])
4928 .unwrap();
4929
4930 let mut buffer = Vec::with_capacity(1024);
4931 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4932 writer.write(&written).unwrap();
4933 writer.close().unwrap();
4934
4935 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4936 .unwrap()
4937 .collect::<Result<Vec<_>, _>>()
4938 .unwrap();
4939
4940 assert_eq!(&written.slice(0, 8), &read[0]);
4941 }
4942
4943 #[test]
4944 fn test_list_skip() {
4945 let mut list = ListBuilder::new(Int32Builder::new());
4946 list.append_value([Some(1), Some(2)]);
4947 list.append_value([Some(3)]);
4948 list.append_value([Some(4)]);
4949 let list = list.finish();
4950 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4951
4952 let props = WriterProperties::builder()
4954 .set_data_page_row_count_limit(1)
4955 .set_write_batch_size(2)
4956 .build();
4957
4958 let mut buffer = Vec::with_capacity(1024);
4959 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4960 writer.write(&batch).unwrap();
4961 writer.close().unwrap();
4962
4963 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4964 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4965 .unwrap()
4966 .with_row_selection(selection.into())
4967 .build()
4968 .unwrap();
4969 let out = reader.next().unwrap().unwrap();
4970 assert_eq!(out.num_rows(), 1);
4971 assert_eq!(out, batch.slice(2, 1));
4972 }
4973
4974 fn test_decimal32_roundtrip() {
4975 let d = |values: Vec<i32>, p: u8| {
4976 let iter = values.into_iter();
4977 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4978 .with_precision_and_scale(p, 2)
4979 .unwrap()
4980 };
4981
4982 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4983 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4984
4985 let mut buffer = Vec::with_capacity(1024);
4986 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4987 writer.write(&batch).unwrap();
4988 writer.close().unwrap();
4989
4990 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4991 let t1 = builder.parquet_schema().columns()[0].physical_type();
4992 assert_eq!(t1, PhysicalType::INT32);
4993
4994 let mut reader = builder.build().unwrap();
4995 assert_eq!(batch.schema(), reader.schema());
4996
4997 let out = reader.next().unwrap().unwrap();
4998 assert_eq!(batch, out);
4999 }
5000
5001 fn test_decimal64_roundtrip() {
5002 let d = |values: Vec<i64>, p: u8| {
5006 let iter = values.into_iter();
5007 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5008 .with_precision_and_scale(p, 2)
5009 .unwrap()
5010 };
5011
5012 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5013 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5014 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5015
5016 let batch = RecordBatch::try_from_iter([
5017 ("d1", Arc::new(d1) as ArrayRef),
5018 ("d2", Arc::new(d2) as ArrayRef),
5019 ("d3", Arc::new(d3) as ArrayRef),
5020 ])
5021 .unwrap();
5022
5023 let mut buffer = Vec::with_capacity(1024);
5024 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5025 writer.write(&batch).unwrap();
5026 writer.close().unwrap();
5027
5028 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5029 let t1 = builder.parquet_schema().columns()[0].physical_type();
5030 assert_eq!(t1, PhysicalType::INT32);
5031 let t2 = builder.parquet_schema().columns()[1].physical_type();
5032 assert_eq!(t2, PhysicalType::INT64);
5033 let t3 = builder.parquet_schema().columns()[2].physical_type();
5034 assert_eq!(t3, PhysicalType::INT64);
5035
5036 let mut reader = builder.build().unwrap();
5037 assert_eq!(batch.schema(), reader.schema());
5038
5039 let out = reader.next().unwrap().unwrap();
5040 assert_eq!(batch, out);
5041 }
5042
5043 fn test_decimal_roundtrip<T: DecimalType>() {
5044 let d = |values: Vec<usize>, p: u8| {
5049 let iter = values.into_iter().map(T::Native::usize_as);
5050 PrimitiveArray::<T>::from_iter_values(iter)
5051 .with_precision_and_scale(p, 2)
5052 .unwrap()
5053 };
5054
5055 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5056 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5057 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5058 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5059
5060 let batch = RecordBatch::try_from_iter([
5061 ("d1", Arc::new(d1) as ArrayRef),
5062 ("d2", Arc::new(d2) as ArrayRef),
5063 ("d3", Arc::new(d3) as ArrayRef),
5064 ("d4", Arc::new(d4) as ArrayRef),
5065 ])
5066 .unwrap();
5067
5068 let mut buffer = Vec::with_capacity(1024);
5069 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5070 writer.write(&batch).unwrap();
5071 writer.close().unwrap();
5072
5073 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5074 let t1 = builder.parquet_schema().columns()[0].physical_type();
5075 assert_eq!(t1, PhysicalType::INT32);
5076 let t2 = builder.parquet_schema().columns()[1].physical_type();
5077 assert_eq!(t2, PhysicalType::INT64);
5078 let t3 = builder.parquet_schema().columns()[2].physical_type();
5079 assert_eq!(t3, PhysicalType::INT64);
5080 let t4 = builder.parquet_schema().columns()[3].physical_type();
5081 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5082
5083 let mut reader = builder.build().unwrap();
5084 assert_eq!(batch.schema(), reader.schema());
5085
5086 let out = reader.next().unwrap().unwrap();
5087 assert_eq!(batch, out);
5088 }
5089
5090 #[test]
5091 fn test_decimal() {
5092 test_decimal32_roundtrip();
5093 test_decimal64_roundtrip();
5094 test_decimal_roundtrip::<Decimal128Type>();
5095 test_decimal_roundtrip::<Decimal256Type>();
5096 }
5097
5098 #[test]
5099 fn test_list_selection() {
5100 let schema = Arc::new(Schema::new(vec![Field::new_list(
5101 "list",
5102 Field::new_list_field(ArrowDataType::Utf8, true),
5103 false,
5104 )]));
5105 let mut buf = Vec::with_capacity(1024);
5106
5107 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5108
5109 for i in 0..2 {
5110 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5111 for j in 0..1024 {
5112 list_a_builder.values().append_value(format!("{i} {j}"));
5113 list_a_builder.append(true);
5114 }
5115 let batch =
5116 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5117 .unwrap();
5118 writer.write(&batch).unwrap();
5119 }
5120 let _metadata = writer.close().unwrap();
5121
5122 let buf = Bytes::from(buf);
5123 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5124 .unwrap()
5125 .with_row_selection(RowSelection::from(vec![
5126 RowSelector::skip(100),
5127 RowSelector::select(924),
5128 RowSelector::skip(100),
5129 RowSelector::select(924),
5130 ]))
5131 .build()
5132 .unwrap();
5133
5134 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5135 let batch = concat_batches(&schema, &batches).unwrap();
5136
5137 assert_eq!(batch.num_rows(), 924 * 2);
5138 let list = batch.column(0).as_list::<i32>();
5139
5140 for w in list.value_offsets().windows(2) {
5141 assert_eq!(w[0] + 1, w[1])
5142 }
5143 let mut values = list.values().as_string::<i32>().iter();
5144
5145 for i in 0..2 {
5146 for j in 100..1024 {
5147 let expected = format!("{i} {j}");
5148 assert_eq!(values.next().unwrap().unwrap(), &expected);
5149 }
5150 }
5151 }
5152
5153 #[test]
5154 fn test_list_selection_fuzz() {
5155 let mut rng = rng();
5156 let schema = Arc::new(Schema::new(vec![Field::new_list(
5157 "list",
5158 Field::new_list(
5159 Field::LIST_FIELD_DEFAULT_NAME,
5160 Field::new_list_field(ArrowDataType::Int32, true),
5161 true,
5162 ),
5163 true,
5164 )]));
5165 let mut buf = Vec::with_capacity(1024);
5166 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5167
5168 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5169
5170 for _ in 0..2048 {
5171 if rng.random_bool(0.2) {
5172 list_a_builder.append(false);
5173 continue;
5174 }
5175
5176 let list_a_len = rng.random_range(0..10);
5177 let list_b_builder = list_a_builder.values();
5178
5179 for _ in 0..list_a_len {
5180 if rng.random_bool(0.2) {
5181 list_b_builder.append(false);
5182 continue;
5183 }
5184
5185 let list_b_len = rng.random_range(0..10);
5186 let int_builder = list_b_builder.values();
5187 for _ in 0..list_b_len {
5188 match rng.random_bool(0.2) {
5189 true => int_builder.append_null(),
5190 false => int_builder.append_value(rng.random()),
5191 }
5192 }
5193 list_b_builder.append(true)
5194 }
5195 list_a_builder.append(true);
5196 }
5197
5198 let array = Arc::new(list_a_builder.finish());
5199 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5200
5201 writer.write(&batch).unwrap();
5202 let _metadata = writer.close().unwrap();
5203
5204 let buf = Bytes::from(buf);
5205
5206 let cases = [
5207 vec![
5208 RowSelector::skip(100),
5209 RowSelector::select(924),
5210 RowSelector::skip(100),
5211 RowSelector::select(924),
5212 ],
5213 vec![
5214 RowSelector::select(924),
5215 RowSelector::skip(100),
5216 RowSelector::select(924),
5217 RowSelector::skip(100),
5218 ],
5219 vec![
5220 RowSelector::skip(1023),
5221 RowSelector::select(1),
5222 RowSelector::skip(1023),
5223 RowSelector::select(1),
5224 ],
5225 vec![
5226 RowSelector::select(1),
5227 RowSelector::skip(1023),
5228 RowSelector::select(1),
5229 RowSelector::skip(1023),
5230 ],
5231 ];
5232
5233 for batch_size in [100, 1024, 2048] {
5234 for selection in &cases {
5235 let selection = RowSelection::from(selection.clone());
5236 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5237 .unwrap()
5238 .with_row_selection(selection.clone())
5239 .with_batch_size(batch_size)
5240 .build()
5241 .unwrap();
5242
5243 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5244 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5245 assert_eq!(actual.num_rows(), selection.row_count());
5246
5247 let mut batch_offset = 0;
5248 let mut actual_offset = 0;
5249 for selector in selection.iter() {
5250 if selector.skip {
5251 batch_offset += selector.row_count;
5252 continue;
5253 }
5254
5255 assert_eq!(
5256 batch.slice(batch_offset, selector.row_count),
5257 actual.slice(actual_offset, selector.row_count)
5258 );
5259
5260 batch_offset += selector.row_count;
5261 actual_offset += selector.row_count;
5262 }
5263 }
5264 }
5265 }
5266
5267 #[test]
5268 fn test_read_old_nested_list() {
5269 use arrow::datatypes::DataType;
5270 use arrow::datatypes::ToByteSlice;
5271
5272 let testdata = arrow::util::test_util::parquet_test_data();
5273 let path = format!("{testdata}/old_list_structure.parquet");
5282 let test_file = File::open(path).unwrap();
5283
5284 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5286
5287 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5289
5290 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5292 "array",
5293 DataType::Int32,
5294 false,
5295 ))))
5296 .len(2)
5297 .add_buffer(a_value_offsets)
5298 .add_child_data(a_values.into_data())
5299 .build()
5300 .unwrap();
5301 let a = ListArray::from(a_list_data);
5302
5303 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5304 let mut reader = builder.build().unwrap();
5305 let out = reader.next().unwrap().unwrap();
5306 assert_eq!(out.num_rows(), 1);
5307 assert_eq!(out.num_columns(), 1);
5308 let c0 = out.column(0);
5310 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5311 let r0 = c0arr.value(0);
5313 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5314 assert_eq!(r0arr, &a);
5315 }
5316
5317 #[test]
5318 fn test_map_no_value() {
5319 let testdata = arrow::util::test_util::parquet_test_data();
5339 let path = format!("{testdata}/map_no_value.parquet");
5340 let file = File::open(path).unwrap();
5341
5342 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5343 .unwrap()
5344 .build()
5345 .unwrap();
5346 let out = reader.next().unwrap().unwrap();
5347 assert_eq!(out.num_rows(), 3);
5348 assert_eq!(out.num_columns(), 3);
5349 let c0 = out.column(1).as_list::<i32>();
5351 let c1 = out.column(2).as_list::<i32>();
5352 assert_eq!(c0.len(), c1.len());
5353 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5354 }
5355
5356 #[test]
5357 fn test_read_unknown_logical_type() {
5358 let testdata = arrow::util::test_util::parquet_test_data();
5359 let path = format!("{testdata}/unknown-logical-type.parquet");
5360 let test_file = File::open(path).unwrap();
5361
5362 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5363 .expect("Error creating reader builder");
5364
5365 let schema = builder.metadata().file_metadata().schema_descr();
5366 assert_eq!(
5367 schema.column(0).logical_type_ref(),
5368 Some(&LogicalType::String)
5369 );
5370 assert_eq!(
5371 schema.column(1).logical_type_ref(),
5372 Some(&LogicalType::_Unknown { field_id: 2555 })
5373 );
5374 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5375
5376 let mut reader = builder.build().unwrap();
5377 let out = reader.next().unwrap().unwrap();
5378 assert_eq!(out.num_rows(), 3);
5379 assert_eq!(out.num_columns(), 2);
5380 }
5381
5382 #[test]
5383 fn test_read_row_numbers() {
5384 let file = write_parquet_from_iter(vec![(
5385 "value",
5386 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5387 )]);
5388 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5389
5390 let row_number_field = Arc::new(
5391 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5392 );
5393
5394 let options = ArrowReaderOptions::new()
5395 .with_schema(Arc::new(Schema::new(supplied_fields)))
5396 .with_virtual_columns(vec![row_number_field.clone()])
5397 .unwrap();
5398 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5399 file.try_clone().unwrap(),
5400 options,
5401 )
5402 .expect("reader builder with schema")
5403 .build()
5404 .expect("reader with schema");
5405
5406 let batch = arrow_reader.next().unwrap().unwrap();
5407 let schema = Arc::new(Schema::new(vec![
5408 Field::new("value", ArrowDataType::Int64, false),
5409 (*row_number_field).clone(),
5410 ]));
5411
5412 assert_eq!(batch.schema(), schema);
5413 assert_eq!(batch.num_columns(), 2);
5414 assert_eq!(batch.num_rows(), 3);
5415 assert_eq!(
5416 batch
5417 .column(0)
5418 .as_primitive::<types::Int64Type>()
5419 .iter()
5420 .collect::<Vec<_>>(),
5421 vec![Some(1), Some(2), Some(3)]
5422 );
5423 assert_eq!(
5424 batch
5425 .column(1)
5426 .as_primitive::<types::Int64Type>()
5427 .iter()
5428 .collect::<Vec<_>>(),
5429 vec![Some(0), Some(1), Some(2)]
5430 );
5431 }
5432
5433 #[test]
5434 fn test_read_only_row_numbers() {
5435 let file = write_parquet_from_iter(vec![(
5436 "value",
5437 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5438 )]);
5439 let row_number_field = Arc::new(
5440 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5441 );
5442 let options = ArrowReaderOptions::new()
5443 .with_virtual_columns(vec![row_number_field.clone()])
5444 .unwrap();
5445 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5446 let num_columns = metadata
5447 .metadata
5448 .file_metadata()
5449 .schema_descr()
5450 .num_columns();
5451
5452 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5453 .with_projection(ProjectionMask::none(num_columns))
5454 .build()
5455 .expect("reader with schema");
5456
5457 let batch = arrow_reader.next().unwrap().unwrap();
5458 let schema = Arc::new(Schema::new(vec![row_number_field]));
5459
5460 assert_eq!(batch.schema(), schema);
5461 assert_eq!(batch.num_columns(), 1);
5462 assert_eq!(batch.num_rows(), 3);
5463 assert_eq!(
5464 batch
5465 .column(0)
5466 .as_primitive::<types::Int64Type>()
5467 .iter()
5468 .collect::<Vec<_>>(),
5469 vec![Some(0), Some(1), Some(2)]
5470 );
5471 }
5472
5473 #[test]
5474 fn test_read_row_numbers_row_group_order() -> Result<()> {
5475 let array = Int64Array::from_iter_values(5000..5100);
5477 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5478 let mut buffer = Vec::new();
5479 let options = WriterProperties::builder()
5480 .set_max_row_group_row_count(Some(50))
5481 .build();
5482 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5483 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5485 writer.write(&batch_chunk)?;
5486 }
5487 writer.close()?;
5488
5489 let row_number_field = Arc::new(
5490 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5491 );
5492
5493 let buffer = Bytes::from(buffer);
5494
5495 let options =
5496 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5497
5498 let arrow_reader =
5500 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5501 .build()?;
5502
5503 assert_eq!(
5504 ValuesAndRowNumbers {
5505 values: (5000..5100).collect(),
5506 row_numbers: (0..100).collect()
5507 },
5508 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5509 );
5510
5511 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5513 .with_row_groups(vec![1, 0])
5514 .build()?;
5515
5516 assert_eq!(
5517 ValuesAndRowNumbers {
5518 values: (5050..5100).chain(5000..5050).collect(),
5519 row_numbers: (50..100).chain(0..50).collect(),
5520 },
5521 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5522 );
5523
5524 Ok(())
5525 }
5526
5527 #[derive(Debug, PartialEq)]
5528 struct ValuesAndRowNumbers {
5529 values: Vec<i64>,
5530 row_numbers: Vec<i64>,
5531 }
5532 impl ValuesAndRowNumbers {
5533 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5534 let mut values = vec![];
5535 let mut row_numbers = vec![];
5536 for batch in reader {
5537 let batch = batch.expect("Could not read batch");
5538 values.extend(
5539 batch
5540 .column_by_name("col")
5541 .expect("Could not get col column")
5542 .as_primitive::<arrow::datatypes::Int64Type>()
5543 .iter()
5544 .map(|v| v.expect("Could not get value")),
5545 );
5546
5547 row_numbers.extend(
5548 batch
5549 .column_by_name("row_number")
5550 .expect("Could not get row_number column")
5551 .as_primitive::<arrow::datatypes::Int64Type>()
5552 .iter()
5553 .map(|v| v.expect("Could not get row number"))
5554 .collect::<Vec<_>>(),
5555 );
5556 }
5557 Self {
5558 values,
5559 row_numbers,
5560 }
5561 }
5562 }
5563
5564 #[test]
5565 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5566 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5568 assert_eq!(
5569 ArrowReaderOptions::new()
5570 .with_virtual_columns(vec![regular_field])
5571 .unwrap_err()
5572 .to_string(),
5573 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5574 );
5575 }
5576
5577 #[test]
5578 fn test_row_numbers_with_multiple_row_groups() {
5579 test_row_numbers_with_multiple_row_groups_helper(
5580 false,
5581 |path, selection, _row_filter, batch_size| {
5582 let file = File::open(path).unwrap();
5583 let row_number_field = Arc::new(
5584 Field::new("row_number", ArrowDataType::Int64, false)
5585 .with_extension_type(RowNumber),
5586 );
5587 let options = ArrowReaderOptions::new()
5588 .with_virtual_columns(vec![row_number_field])
5589 .unwrap();
5590 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5591 .unwrap()
5592 .with_row_selection(selection)
5593 .with_batch_size(batch_size)
5594 .build()
5595 .expect("Could not create reader");
5596 reader
5597 .collect::<Result<Vec<_>, _>>()
5598 .expect("Could not read")
5599 },
5600 );
5601 }
5602
5603 #[test]
5604 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5605 test_row_numbers_with_multiple_row_groups_helper(
5606 true,
5607 |path, selection, row_filter, batch_size| {
5608 let file = File::open(path).unwrap();
5609 let row_number_field = Arc::new(
5610 Field::new("row_number", ArrowDataType::Int64, false)
5611 .with_extension_type(RowNumber),
5612 );
5613 let options = ArrowReaderOptions::new()
5614 .with_virtual_columns(vec![row_number_field])
5615 .unwrap();
5616 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5617 .unwrap()
5618 .with_row_selection(selection)
5619 .with_batch_size(batch_size)
5620 .with_row_filter(row_filter.expect("No filter"))
5621 .build()
5622 .expect("Could not create reader");
5623 reader
5624 .collect::<Result<Vec<_>, _>>()
5625 .expect("Could not read")
5626 },
5627 );
5628 }
5629
5630 #[test]
5631 fn test_read_row_group_indices() {
5632 let array1 = Int64Array::from(vec![1, 2]);
5634 let array2 = Int64Array::from(vec![3, 4]);
5635 let array3 = Int64Array::from(vec![5, 6]);
5636
5637 let batch1 =
5638 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5639 let batch2 =
5640 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5641 let batch3 =
5642 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5643
5644 let mut buffer = Vec::new();
5645 let options = WriterProperties::builder()
5646 .set_max_row_group_row_count(Some(2))
5647 .build();
5648 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5649 writer.write(&batch1).unwrap();
5650 writer.write(&batch2).unwrap();
5651 writer.write(&batch3).unwrap();
5652 writer.close().unwrap();
5653
5654 let file = Bytes::from(buffer);
5655 let row_group_index_field = Arc::new(
5656 Field::new("row_group_index", ArrowDataType::Int64, false)
5657 .with_extension_type(RowGroupIndex),
5658 );
5659
5660 let options = ArrowReaderOptions::new()
5661 .with_virtual_columns(vec![row_group_index_field.clone()])
5662 .unwrap();
5663 let mut arrow_reader =
5664 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5665 .expect("reader builder with virtual columns")
5666 .build()
5667 .expect("reader with virtual columns");
5668
5669 let batch = arrow_reader.next().unwrap().unwrap();
5670
5671 assert_eq!(batch.num_columns(), 2);
5672 assert_eq!(batch.num_rows(), 6);
5673
5674 assert_eq!(
5675 batch
5676 .column(0)
5677 .as_primitive::<types::Int64Type>()
5678 .iter()
5679 .collect::<Vec<_>>(),
5680 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5681 );
5682
5683 assert_eq!(
5684 batch
5685 .column(1)
5686 .as_primitive::<types::Int64Type>()
5687 .iter()
5688 .collect::<Vec<_>>(),
5689 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5690 );
5691 }
5692
5693 #[test]
5694 fn test_read_only_row_group_indices() {
5695 let array1 = Int64Array::from(vec![1, 2, 3]);
5696 let array2 = Int64Array::from(vec![4, 5]);
5697
5698 let batch1 =
5699 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5700 let batch2 =
5701 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5702
5703 let mut buffer = Vec::new();
5704 let options = WriterProperties::builder()
5705 .set_max_row_group_row_count(Some(3))
5706 .build();
5707 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5708 writer.write(&batch1).unwrap();
5709 writer.write(&batch2).unwrap();
5710 writer.close().unwrap();
5711
5712 let file = Bytes::from(buffer);
5713 let row_group_index_field = Arc::new(
5714 Field::new("row_group_index", ArrowDataType::Int64, false)
5715 .with_extension_type(RowGroupIndex),
5716 );
5717
5718 let options = ArrowReaderOptions::new()
5719 .with_virtual_columns(vec![row_group_index_field.clone()])
5720 .unwrap();
5721 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5722 let num_columns = metadata
5723 .metadata
5724 .file_metadata()
5725 .schema_descr()
5726 .num_columns();
5727
5728 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5729 .with_projection(ProjectionMask::none(num_columns))
5730 .build()
5731 .expect("reader with virtual columns only");
5732
5733 let batch = arrow_reader.next().unwrap().unwrap();
5734 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5735
5736 assert_eq!(batch.schema(), schema);
5737 assert_eq!(batch.num_columns(), 1);
5738 assert_eq!(batch.num_rows(), 5);
5739
5740 assert_eq!(
5741 batch
5742 .column(0)
5743 .as_primitive::<types::Int64Type>()
5744 .iter()
5745 .collect::<Vec<_>>(),
5746 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5747 );
5748 }
5749
5750 #[test]
5751 fn test_read_row_group_indices_with_selection() -> Result<()> {
5752 let mut buffer = Vec::new();
5753 let options = WriterProperties::builder()
5754 .set_max_row_group_row_count(Some(10))
5755 .build();
5756
5757 let schema = Arc::new(Schema::new(vec![Field::new(
5758 "value",
5759 ArrowDataType::Int64,
5760 false,
5761 )]));
5762
5763 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5764
5765 for i in 0..3 {
5767 let start = i * 10;
5768 let array = Int64Array::from_iter_values(start..start + 10);
5769 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5770 writer.write(&batch)?;
5771 }
5772 writer.close()?;
5773
5774 let file = Bytes::from(buffer);
5775 let row_group_index_field = Arc::new(
5776 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5777 );
5778
5779 let options =
5780 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5781
5782 let arrow_reader =
5784 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5785 .with_row_groups(vec![2, 1, 0])
5786 .build()?;
5787
5788 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5789 let combined = concat_batches(&batches[0].schema(), &batches)?;
5790
5791 let values = combined.column(0).as_primitive::<types::Int64Type>();
5792 let first_val = values.value(0);
5793 let last_val = values.value(combined.num_rows() - 1);
5794 assert_eq!(first_val, 20);
5796 assert_eq!(last_val, 9);
5798
5799 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5800 assert_eq!(rg_indices.value(0), 2);
5801 assert_eq!(rg_indices.value(10), 1);
5802 assert_eq!(rg_indices.value(20), 0);
5803
5804 Ok(())
5805 }
5806
5807 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5808 use_filter: bool,
5809 test_case: F,
5810 ) where
5811 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5812 {
5813 let seed: u64 = random();
5814 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5815 let mut rng = StdRng::seed_from_u64(seed);
5816
5817 use tempfile::TempDir;
5818 let tempdir = TempDir::new().expect("Could not create temp dir");
5819
5820 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5821
5822 let path = tempdir.path().join("test.parquet");
5823 std::fs::write(&path, bytes).expect("Could not write file");
5824
5825 let mut case = vec![];
5826 let mut remaining = metadata.file_metadata().num_rows();
5827 while remaining > 0 {
5828 let row_count = rng.random_range(1..=remaining);
5829 remaining -= row_count;
5830 case.push(RowSelector {
5831 row_count: row_count as usize,
5832 skip: rng.random_bool(0.5),
5833 });
5834 }
5835
5836 let filter = use_filter.then(|| {
5837 let filter = (0..metadata.file_metadata().num_rows())
5838 .map(|_| rng.random_bool(0.99))
5839 .collect::<Vec<_>>();
5840 let mut filter_offset = 0;
5841 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5842 ProjectionMask::all(),
5843 move |b| {
5844 let array = BooleanArray::from_iter(
5845 filter
5846 .iter()
5847 .skip(filter_offset)
5848 .take(b.num_rows())
5849 .map(|x| Some(*x)),
5850 );
5851 filter_offset += b.num_rows();
5852 Ok(array)
5853 },
5854 ))])
5855 });
5856
5857 let selection = RowSelection::from(case);
5858 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5859
5860 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5861 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5862 return;
5863 }
5864 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5865 .expect("Failed to concatenate");
5866 let values = actual
5868 .column(0)
5869 .as_primitive::<types::Int64Type>()
5870 .iter()
5871 .collect::<Vec<_>>();
5872 let row_numbers = actual
5873 .column(1)
5874 .as_primitive::<types::Int64Type>()
5875 .iter()
5876 .collect::<Vec<_>>();
5877 assert_eq!(
5878 row_numbers
5879 .into_iter()
5880 .map(|number| number.map(|number| number + 1))
5881 .collect::<Vec<_>>(),
5882 values
5883 );
5884 }
5885
5886 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5887 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5888 "value",
5889 ArrowDataType::Int64,
5890 false,
5891 )])));
5892
5893 let mut buf = Vec::with_capacity(1024);
5894 let mut writer =
5895 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5896
5897 let mut values = 1..=rng.random_range(1..4096);
5898 while !values.is_empty() {
5899 let batch_values = values
5900 .by_ref()
5901 .take(rng.random_range(1..4096))
5902 .collect::<Vec<_>>();
5903 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5904 let batch =
5905 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5906 writer.write(&batch).expect("Could not write batch");
5907 writer.flush().expect("Could not flush");
5908 }
5909 let metadata = writer.close().expect("Could not close writer");
5910
5911 (Bytes::from(buf), metadata)
5912 }
5913}