1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{PredicateOptions, ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub const DEFAULT_BATCH_SIZE: usize = 1024;
62
63pub struct ArrowReaderBuilder<T> {
111 pub(crate) input: T,
119
120 pub(crate) metadata: Arc<ParquetMetaData>,
121
122 pub(crate) schema: SchemaRef,
123
124 pub(crate) fields: Option<Arc<ParquetField>>,
125
126 pub(crate) batch_size: usize,
127
128 pub(crate) row_groups: Option<Vec<usize>>,
129
130 pub(crate) projection: ProjectionMask,
131
132 pub(crate) filter: Option<RowFilter>,
133
134 pub(crate) selection: Option<RowSelection>,
135
136 pub(crate) row_selection_policy: RowSelectionPolicy,
137
138 pub(crate) limit: Option<usize>,
139
140 pub(crate) offset: Option<usize>,
141
142 pub(crate) metrics: ArrowReaderMetrics,
143
144 pub(crate) max_predicate_cache_size: usize,
145}
146
147impl<T: Debug> Debug for ArrowReaderBuilder<T> {
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("ArrowReaderBuilder<T>")
150 .field("input", &self.input)
151 .field("metadata", &self.metadata)
152 .field("schema", &self.schema)
153 .field("fields", &self.fields)
154 .field("batch_size", &self.batch_size)
155 .field("row_groups", &self.row_groups)
156 .field("projection", &self.projection)
157 .field("filter", &self.filter)
158 .field("selection", &self.selection)
159 .field("row_selection_policy", &self.row_selection_policy)
160 .field("limit", &self.limit)
161 .field("offset", &self.offset)
162 .field("metrics", &self.metrics)
163 .finish()
164 }
165}
166
167impl<T> ArrowReaderBuilder<T> {
168 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
169 Self {
170 input,
171 metadata: metadata.metadata,
172 schema: metadata.schema,
173 fields: metadata.fields,
174 batch_size: DEFAULT_BATCH_SIZE,
175 row_groups: None,
176 projection: ProjectionMask::all(),
177 filter: None,
178 selection: None,
179 row_selection_policy: RowSelectionPolicy::default(),
180 limit: None,
181 offset: None,
182 metrics: ArrowReaderMetrics::Disabled,
183 max_predicate_cache_size: 100 * 1024 * 1024, }
185 }
186
187 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
189 &self.metadata
190 }
191
192 pub fn parquet_schema(&self) -> &SchemaDescriptor {
194 self.metadata.file_metadata().schema_descr()
195 }
196
197 pub fn schema(&self) -> &SchemaRef {
199 &self.schema
200 }
201
202 pub fn with_batch_size(self, batch_size: usize) -> Self {
205 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
207 Self { batch_size, ..self }
208 }
209
210 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
214 Self {
215 row_groups: Some(row_groups),
216 ..self
217 }
218 }
219
220 pub fn with_projection(self, mask: ProjectionMask) -> Self {
222 Self {
223 projection: mask,
224 ..self
225 }
226 }
227
228 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
232 Self {
233 row_selection_policy: policy,
234 ..self
235 }
236 }
237
238 pub fn with_row_selection(self, selection: RowSelection) -> Self {
298 Self {
299 selection: Some(selection),
300 ..self
301 }
302 }
303
304 pub fn with_row_filter(self, filter: RowFilter) -> Self {
344 Self {
345 filter: Some(filter),
346 ..self
347 }
348 }
349
350 pub fn with_limit(self, limit: usize) -> Self {
358 Self {
359 limit: Some(limit),
360 ..self
361 }
362 }
363
364 pub fn with_offset(self, offset: usize) -> Self {
372 Self {
373 offset: Some(offset),
374 ..self
375 }
376 }
377
378 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
413 Self { metrics, ..self }
414 }
415
416 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
431 Self {
432 max_predicate_cache_size,
433 ..self
434 }
435 }
436}
437
438#[derive(Debug, Clone, Default)]
453pub struct ArrowReaderOptions {
454 skip_arrow_metadata: bool,
456 supplied_schema: Option<SchemaRef>,
461
462 pub(crate) column_index: PageIndexPolicy,
463 pub(crate) offset_index: PageIndexPolicy,
464
465 metadata_options: ParquetMetaDataOptions,
467 #[cfg(feature = "encryption")]
469 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
470
471 virtual_columns: Vec<FieldRef>,
472}
473
474impl ArrowReaderOptions {
475 pub fn new() -> Self {
477 Self::default()
478 }
479
480 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
487 Self {
488 skip_arrow_metadata,
489 ..self
490 }
491 }
492
493 pub fn with_schema(self, schema: SchemaRef) -> Self {
602 Self {
603 supplied_schema: Some(schema),
604 skip_arrow_metadata: true,
605 ..self
606 }
607 }
608
609 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
610 pub fn with_page_index(self, page_index: bool) -> Self {
623 self.with_page_index_policy(PageIndexPolicy::from(page_index))
624 }
625
626 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
634 self.with_column_index_policy(policy)
635 .with_offset_index_policy(policy)
636 }
637
638 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
645 self.column_index = policy;
646 self
647 }
648
649 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
656 self.offset_index = policy;
657 self
658 }
659
660 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
666 self.metadata_options.set_schema(schema);
667 self
668 }
669
670 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
681 self.metadata_options.set_encoding_stats_as_mask(val);
682 self
683 }
684
685 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
690 self.metadata_options.set_encoding_stats_policy(policy);
691 self
692 }
693
694 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
699 self.metadata_options.set_column_stats_policy(policy);
700 self
701 }
702
703 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
708 self.metadata_options.set_size_stats_policy(policy);
709 self
710 }
711
712 #[cfg(feature = "encryption")]
716 pub fn with_file_decryption_properties(
717 self,
718 file_decryption_properties: Arc<FileDecryptionProperties>,
719 ) -> Self {
720 Self {
721 file_decryption_properties: Some(file_decryption_properties),
722 ..self
723 }
724 }
725
726 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
779 for field in &virtual_columns {
781 if !is_virtual_column(field) {
782 return Err(ParquetError::General(format!(
783 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
784 field.name()
785 )));
786 }
787 }
788 Ok(Self {
789 virtual_columns,
790 ..self
791 })
792 }
793
794 #[deprecated(
795 since = "57.2.0",
796 note = "Use `column_index_policy` or `offset_index_policy` instead"
797 )]
798 pub fn page_index(&self) -> bool {
805 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
806 }
807
808 pub fn offset_index_policy(&self) -> PageIndexPolicy {
813 self.offset_index
814 }
815
816 pub fn column_index_policy(&self) -> PageIndexPolicy {
821 self.column_index
822 }
823
824 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
826 &self.metadata_options
827 }
828
829 #[cfg(feature = "encryption")]
834 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
835 self.file_decryption_properties.as_ref()
836 }
837}
838
839#[derive(Debug, Clone)]
854pub struct ArrowReaderMetadata {
855 pub(crate) metadata: Arc<ParquetMetaData>,
857 pub(crate) schema: SchemaRef,
859 pub(crate) fields: Option<Arc<ParquetField>>,
861}
862
863impl ArrowReaderMetadata {
864 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
876 let metadata = ParquetMetaDataReader::new()
877 .with_column_index_policy(options.column_index)
878 .with_offset_index_policy(options.offset_index)
879 .with_metadata_options(Some(options.metadata_options.clone()));
880 #[cfg(feature = "encryption")]
881 let metadata = metadata.with_decryption_properties(
882 options.file_decryption_properties.as_ref().map(Arc::clone),
883 );
884 let metadata = metadata.parse_and_finish(reader)?;
885 Self::try_new(Arc::new(metadata), options)
886 }
887
888 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
896 match options.supplied_schema {
897 Some(supplied_schema) => Self::with_supplied_schema(
898 metadata,
899 supplied_schema.clone(),
900 &options.virtual_columns,
901 ),
902 None => {
903 let kv_metadata = match options.skip_arrow_metadata {
904 true => None,
905 false => metadata.file_metadata().key_value_metadata(),
906 };
907
908 let (schema, fields) = parquet_to_arrow_schema_and_fields(
909 metadata.file_metadata().schema_descr(),
910 ProjectionMask::all(),
911 kv_metadata,
912 &options.virtual_columns,
913 )?;
914
915 Ok(Self {
916 metadata,
917 schema: Arc::new(schema),
918 fields: fields.map(Arc::new),
919 })
920 }
921 }
922 }
923
924 fn with_supplied_schema(
925 metadata: Arc<ParquetMetaData>,
926 supplied_schema: SchemaRef,
927 virtual_columns: &[FieldRef],
928 ) -> Result<Self> {
929 let parquet_schema = metadata.file_metadata().schema_descr();
930 let field_levels = parquet_to_arrow_field_levels_with_virtual(
931 parquet_schema,
932 ProjectionMask::all(),
933 Some(supplied_schema.fields()),
934 virtual_columns,
935 )?;
936 let fields = field_levels.fields;
937 let inferred_len = fields.len();
938 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
939 if inferred_len != supplied_len {
943 return Err(arrow_err!(format!(
944 "Incompatible supplied Arrow schema: expected {} columns received {}",
945 inferred_len, supplied_len
946 )));
947 }
948
949 let mut errors = Vec::new();
950
951 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
952
953 for (field1, field2) in field_iter {
954 if field1.data_type() != field2.data_type() {
955 errors.push(format!(
956 "data type mismatch for field {}: requested {} but found {}",
957 field1.name(),
958 field1.data_type(),
959 field2.data_type()
960 ));
961 }
962 if field1.is_nullable() != field2.is_nullable() {
963 errors.push(format!(
964 "nullability mismatch for field {}: expected {:?} but found {:?}",
965 field1.name(),
966 field1.is_nullable(),
967 field2.is_nullable()
968 ));
969 }
970 if field1.metadata() != field2.metadata() {
971 errors.push(format!(
972 "metadata mismatch for field {}: expected {:?} but found {:?}",
973 field1.name(),
974 field1.metadata(),
975 field2.metadata()
976 ));
977 }
978 }
979
980 if !errors.is_empty() {
981 let message = errors.join(", ");
982 return Err(ParquetError::ArrowError(format!(
983 "Incompatible supplied Arrow schema: {message}",
984 )));
985 }
986
987 Ok(Self {
988 metadata,
989 schema: supplied_schema,
990 fields: field_levels.levels.map(Arc::new),
991 })
992 }
993
994 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
996 &self.metadata
997 }
998
999 pub fn parquet_schema(&self) -> &SchemaDescriptor {
1001 self.metadata.file_metadata().schema_descr()
1002 }
1003
1004 pub fn schema(&self) -> &SchemaRef {
1006 &self.schema
1007 }
1008}
1009
1010#[doc(hidden)]
1011pub struct SyncReader<T: ChunkReader>(T);
1013
1014impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1015 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1016 f.debug_tuple("SyncReader").field(&self.0).finish()
1017 }
1018}
1019
1020pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1027
1028impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1029 pub fn try_new(reader: T) -> Result<Self> {
1060 Self::try_new_with_options(reader, Default::default())
1061 }
1062
1063 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1068 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1069 Ok(Self::new_with_metadata(reader, metadata))
1070 }
1071
1072 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1111 Self::new_builder(SyncReader(input), metadata)
1112 }
1113
1114 pub fn get_row_group_column_bloom_filter(
1120 &self,
1121 row_group_idx: usize,
1122 column_idx: usize,
1123 ) -> Result<Option<Sbbf>> {
1124 let metadata = self.metadata.row_group(row_group_idx);
1125 let column_metadata = metadata.column(column_idx);
1126
1127 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1128 offset
1129 .try_into()
1130 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1131 } else {
1132 return Ok(None);
1133 };
1134
1135 let buffer = match column_metadata.bloom_filter_length() {
1136 Some(length) => self.input.0.get_bytes(offset, length as usize),
1137 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1138 }?;
1139
1140 let (header, bitset_offset) =
1141 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1142
1143 match header.algorithm {
1144 BloomFilterAlgorithm::BLOCK => {
1145 }
1147 }
1148 match header.compression {
1149 BloomFilterCompression::UNCOMPRESSED => {
1150 }
1152 }
1153 match header.hash {
1154 BloomFilterHash::XXHASH => {
1155 }
1157 }
1158
1159 let bitset = match column_metadata.bloom_filter_length() {
1160 Some(_) => buffer.slice(
1161 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1162 - TryInto::<usize>::try_into(offset).unwrap())..,
1163 ),
1164 None => {
1165 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1166 ParquetError::General("Bloom filter length is invalid".to_string())
1167 })?;
1168 self.input.0.get_bytes(bitset_offset, bitset_length)?
1169 }
1170 };
1171 Ok(Some(Sbbf::new(&bitset)))
1172 }
1173
1174 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1178 let Self {
1179 input,
1180 metadata,
1181 schema: _,
1182 fields,
1183 batch_size,
1184 row_groups,
1185 projection,
1186 mut filter,
1187 selection,
1188 row_selection_policy,
1189 limit,
1190 offset,
1191 metrics,
1192 max_predicate_cache_size: _,
1194 } = self;
1195
1196 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1198
1199 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1200
1201 let reader = ReaderRowGroups {
1202 reader: Arc::new(input.0),
1203 metadata,
1204 row_groups,
1205 };
1206
1207 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1208 .with_selection(selection)
1209 .with_row_selection_policy(row_selection_policy);
1210
1211 if let Some(filter) = filter.as_mut() {
1213 for predicate in filter.predicates.iter_mut() {
1214 if !plan_builder.selects_any() {
1216 break;
1217 }
1218
1219 let mut cache_projection = predicate.projection().clone();
1220 cache_projection.intersect(&projection);
1221
1222 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1223 .with_batch_size(batch_size)
1224 .with_parquet_metadata(&reader.metadata)
1225 .build_array_reader(fields.as_deref(), predicate.projection())?;
1226
1227 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1228 }
1229 }
1230
1231 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1232 .with_batch_size(batch_size)
1233 .with_parquet_metadata(&reader.metadata)
1234 .build_array_reader(fields.as_deref(), &projection)?;
1235
1236 let read_plan = plan_builder
1237 .limited(reader.num_rows())
1238 .with_offset(offset)
1239 .with_limit(limit)
1240 .build_limited()
1241 .build();
1242
1243 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1244 }
1245}
1246
1247struct ReaderRowGroups<T: ChunkReader> {
1248 reader: Arc<T>,
1249
1250 metadata: Arc<ParquetMetaData>,
1251 row_groups: Vec<usize>,
1253}
1254
1255impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1256 fn num_rows(&self) -> usize {
1257 let meta = self.metadata.row_groups();
1258 self.row_groups
1259 .iter()
1260 .map(|x| meta[*x].num_rows() as usize)
1261 .sum()
1262 }
1263
1264 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1265 Ok(Box::new(ReaderPageIterator {
1266 column_idx: i,
1267 reader: self.reader.clone(),
1268 metadata: self.metadata.clone(),
1269 row_groups: self.row_groups.clone().into_iter(),
1270 }))
1271 }
1272
1273 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1274 Box::new(
1275 self.row_groups
1276 .iter()
1277 .map(move |i| self.metadata.row_group(*i)),
1278 )
1279 }
1280
1281 fn metadata(&self) -> &ParquetMetaData {
1282 self.metadata.as_ref()
1283 }
1284}
1285
1286struct ReaderPageIterator<T: ChunkReader> {
1287 reader: Arc<T>,
1288 column_idx: usize,
1289 row_groups: std::vec::IntoIter<usize>,
1290 metadata: Arc<ParquetMetaData>,
1291}
1292
1293impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1294 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1296 let rg = self.metadata.row_group(rg_idx);
1297 let column_chunk_metadata = rg.column(self.column_idx);
1298 let offset_index = self.metadata.offset_index();
1299 let page_locations = offset_index
1302 .filter(|i| !i[rg_idx].is_empty())
1303 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1304 let total_rows = rg.num_rows() as usize;
1305 let reader = self.reader.clone();
1306
1307 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1308 .add_crypto_context(
1309 rg_idx,
1310 self.column_idx,
1311 self.metadata.as_ref(),
1312 column_chunk_metadata,
1313 )
1314 }
1315}
1316
1317impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1318 type Item = Result<Box<dyn PageReader>>;
1319
1320 fn next(&mut self) -> Option<Self::Item> {
1321 let rg_idx = self.row_groups.next()?;
1322 let page_reader = self
1323 .next_page_reader(rg_idx)
1324 .map(|page_reader| Box::new(page_reader) as _);
1325 Some(page_reader)
1326 }
1327}
1328
1329impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1330
1331pub struct ParquetRecordBatchReader {
1342 array_reader: Box<dyn ArrayReader>,
1343 schema: SchemaRef,
1344 read_plan: ReadPlan,
1345}
1346
1347impl Debug for ParquetRecordBatchReader {
1348 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1349 f.debug_struct("ParquetRecordBatchReader")
1350 .field("array_reader", &"...")
1351 .field("schema", &self.schema)
1352 .field("read_plan", &self.read_plan)
1353 .finish()
1354 }
1355}
1356
1357impl Iterator for ParquetRecordBatchReader {
1358 type Item = Result<RecordBatch, ArrowError>;
1359
1360 fn next(&mut self) -> Option<Self::Item> {
1361 self.next_inner()
1362 .map_err(|arrow_err| arrow_err.into())
1363 .transpose()
1364 }
1365}
1366
1367impl ParquetRecordBatchReader {
1368 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1374 let mut read_records = 0;
1375 let batch_size = self.batch_size();
1376 if batch_size == 0 {
1377 return Ok(None);
1378 }
1379 match self.read_plan.row_selection_cursor_mut() {
1380 RowSelectionCursor::Mask(mask_cursor) => {
1381 while !mask_cursor.is_empty() {
1384 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1385 return Ok(None);
1386 };
1387
1388 if mask_chunk.initial_skip > 0 {
1389 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1390 if skipped != mask_chunk.initial_skip {
1391 return Err(general_err!(
1392 "failed to skip rows, expected {}, got {}",
1393 mask_chunk.initial_skip,
1394 skipped
1395 ));
1396 }
1397 }
1398
1399 if mask_chunk.chunk_rows == 0 {
1400 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1401 return Ok(None);
1402 }
1403 continue;
1404 }
1405
1406 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1407
1408 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1409 if read == 0 {
1410 return Err(general_err!(
1411 "reached end of column while expecting {} rows",
1412 mask_chunk.chunk_rows
1413 ));
1414 }
1415 if read != mask_chunk.chunk_rows {
1416 return Err(general_err!(
1417 "insufficient rows read from array reader - expected {}, got {}",
1418 mask_chunk.chunk_rows,
1419 read
1420 ));
1421 }
1422
1423 let array = self.array_reader.consume_batch()?;
1424 let struct_array = array.as_struct_opt().ok_or_else(|| {
1427 ArrowError::ParquetError(
1428 "Struct array reader should return struct array".to_string(),
1429 )
1430 })?;
1431
1432 let filtered_batch =
1433 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1434
1435 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1436 return Err(general_err!(
1437 "filtered rows mismatch selection - expected {}, got {}",
1438 mask_chunk.selected_rows,
1439 filtered_batch.num_rows()
1440 ));
1441 }
1442
1443 if filtered_batch.num_rows() == 0 {
1444 continue;
1445 }
1446
1447 return Ok(Some(filtered_batch));
1448 }
1449 }
1450 RowSelectionCursor::Selectors(selectors_cursor) => {
1451 while read_records < batch_size && !selectors_cursor.is_empty() {
1452 let front = selectors_cursor.next_selector();
1453 if front.skip {
1454 let skipped = self.array_reader.skip_records(front.row_count)?;
1455
1456 if skipped != front.row_count {
1457 return Err(general_err!(
1458 "failed to skip rows, expected {}, got {}",
1459 front.row_count,
1460 skipped
1461 ));
1462 }
1463 continue;
1464 }
1465
1466 if front.row_count == 0 {
1469 continue;
1470 }
1471
1472 let need_read = batch_size - read_records;
1474 let to_read = match front.row_count.checked_sub(need_read) {
1475 Some(remaining) if remaining != 0 => {
1476 selectors_cursor.return_selector(RowSelector::select(remaining));
1479 need_read
1480 }
1481 _ => front.row_count,
1482 };
1483 match self.array_reader.read_records(to_read)? {
1484 0 => break,
1485 rec => read_records += rec,
1486 };
1487 }
1488 }
1489 RowSelectionCursor::All => {
1490 self.array_reader.read_records(batch_size)?;
1491 }
1492 };
1493
1494 let array = self.array_reader.consume_batch()?;
1495 let struct_array = array.as_struct_opt().ok_or_else(|| {
1496 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1497 })?;
1498
1499 Ok(if struct_array.len() > 0 {
1500 Some(RecordBatch::from(struct_array))
1501 } else {
1502 None
1503 })
1504 }
1505}
1506
1507impl RecordBatchReader for ParquetRecordBatchReader {
1508 fn schema(&self) -> SchemaRef {
1513 self.schema.clone()
1514 }
1515}
1516
1517impl ParquetRecordBatchReader {
1518 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1522 ParquetRecordBatchReaderBuilder::try_new(reader)?
1523 .with_batch_size(batch_size)
1524 .build()
1525 }
1526
1527 pub fn try_new_with_row_groups(
1532 levels: &FieldLevels,
1533 row_groups: &dyn RowGroups,
1534 batch_size: usize,
1535 selection: Option<RowSelection>,
1536 ) -> Result<Self> {
1537 let metrics = ArrowReaderMetrics::disabled();
1539 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1540 .with_batch_size(batch_size)
1541 .with_parquet_metadata(row_groups.metadata())
1542 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1543
1544 let read_plan = ReadPlanBuilder::new(batch_size)
1545 .with_selection(selection)
1546 .build();
1547
1548 Ok(Self {
1549 array_reader,
1550 schema: Arc::new(Schema::new(levels.fields.clone())),
1551 read_plan,
1552 })
1553 }
1554
1555 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1559 let schema = match array_reader.get_data_type() {
1560 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1561 _ => unreachable!("Struct array reader's data type is not struct!"),
1562 };
1563
1564 Self {
1565 array_reader,
1566 schema: Arc::new(schema),
1567 read_plan,
1568 }
1569 }
1570
1571 #[inline(always)]
1572 pub(crate) fn batch_size(&self) -> usize {
1573 self.read_plan.batch_size()
1574 }
1575}
1576
1577#[cfg(test)]
1578pub(crate) mod tests {
1579 use std::cmp::min;
1580 use std::collections::{HashMap, VecDeque};
1581 use std::fmt::Formatter;
1582 use std::fs::File;
1583 use std::io::Seek;
1584 use std::path::PathBuf;
1585 use std::sync::Arc;
1586
1587 use rand::rngs::StdRng;
1588 use rand::{Rng, RngCore, SeedableRng, random, rng};
1589 use tempfile::tempfile;
1590
1591 use crate::arrow::arrow_reader::{
1592 ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1593 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1594 };
1595 use crate::arrow::schema::{
1596 add_encoded_arrow_schema_to_metadata,
1597 virtual_type::{RowGroupIndex, RowNumber},
1598 };
1599 use crate::arrow::{ArrowWriter, ProjectionMask};
1600 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1601 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1602 use crate::data_type::{
1603 BoolType, ByteArray, ByteArrayType, DataType, DoubleType, FixedLenByteArray,
1604 FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96, Int96Type,
1605 };
1606 use crate::errors::Result;
1607 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1608 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1609 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1610 use crate::schema::parser::parse_message_type;
1611 use crate::schema::types::{Type, TypePtr};
1612 use crate::util::test_common::rand_gen::RandGen;
1613 use arrow_array::builder::*;
1614 use arrow_array::cast::AsArray;
1615 use arrow_array::types::{
1616 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1617 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1618 Time64MicrosecondType,
1619 };
1620 use arrow_array::*;
1621 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1622 use arrow_data::{ArrayData, ArrayDataBuilder};
1623 use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1624 use arrow_select::concat::concat_batches;
1625 use bytes::Bytes;
1626 use half::f16;
1627 use num_traits::PrimInt;
1628
1629 #[test]
1630 fn test_arrow_reader_all_columns() {
1631 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1632
1633 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1634 let original_schema = Arc::clone(builder.schema());
1635 let reader = builder.build().unwrap();
1636
1637 assert_eq!(original_schema.fields(), reader.schema().fields());
1639 }
1640
1641 #[test]
1642 fn test_reuse_schema() {
1643 let file = get_test_file("parquet/alltypes-java.parquet");
1644
1645 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1646 let expected = builder.metadata;
1647 let schema = expected.file_metadata().schema_descr_ptr();
1648
1649 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1650 let builder =
1651 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1652
1653 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1655 }
1656
1657 #[test]
1658 fn test_page_encoding_stats_mask() {
1659 let testdata = arrow::util::test_util::parquet_test_data();
1660 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1661 let file = File::open(path).unwrap();
1662
1663 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1664 let builder =
1665 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1666
1667 let row_group_metadata = builder.metadata.row_group(0);
1668
1669 let page_encoding_stats = row_group_metadata
1671 .column(0)
1672 .page_encoding_stats_mask()
1673 .unwrap();
1674 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1675 let page_encoding_stats = row_group_metadata
1676 .column(2)
1677 .page_encoding_stats_mask()
1678 .unwrap();
1679 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1680 }
1681
1682 #[test]
1683 fn test_stats_stats_skipped() {
1684 let testdata = arrow::util::test_util::parquet_test_data();
1685 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1686 let file = File::open(path).unwrap();
1687
1688 let arrow_options = ArrowReaderOptions::new()
1690 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1691 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1692 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1693 file.try_clone().unwrap(),
1694 arrow_options,
1695 )
1696 .unwrap();
1697
1698 let row_group_metadata = builder.metadata.row_group(0);
1699 for column in row_group_metadata.columns() {
1700 assert!(column.page_encoding_stats().is_none());
1701 assert!(column.page_encoding_stats_mask().is_none());
1702 assert!(column.statistics().is_none());
1703 }
1704
1705 let arrow_options = ArrowReaderOptions::new()
1707 .with_encoding_stats_as_mask(true)
1708 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1709 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1710 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1711 file.try_clone().unwrap(),
1712 arrow_options,
1713 )
1714 .unwrap();
1715
1716 let row_group_metadata = builder.metadata.row_group(0);
1717 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1718 assert!(column.page_encoding_stats().is_none());
1719 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1720 assert_eq!(column.statistics().is_some(), idx == 0);
1721 }
1722 }
1723
1724 #[test]
1725 fn test_size_stats_stats_skipped() {
1726 let testdata = arrow::util::test_util::parquet_test_data();
1727 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1728 let file = File::open(path).unwrap();
1729
1730 let arrow_options =
1732 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1733 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1734 file.try_clone().unwrap(),
1735 arrow_options,
1736 )
1737 .unwrap();
1738
1739 let row_group_metadata = builder.metadata.row_group(0);
1740 for column in row_group_metadata.columns() {
1741 assert!(column.repetition_level_histogram().is_none());
1742 assert!(column.definition_level_histogram().is_none());
1743 assert!(column.unencoded_byte_array_data_bytes().is_none());
1744 }
1745
1746 let arrow_options = ArrowReaderOptions::new()
1748 .with_encoding_stats_as_mask(true)
1749 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1750 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1751 file.try_clone().unwrap(),
1752 arrow_options,
1753 )
1754 .unwrap();
1755
1756 let row_group_metadata = builder.metadata.row_group(0);
1757 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1758 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1759 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1760 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1761 }
1762 }
1763
1764 #[test]
1765 fn test_arrow_reader_single_column() {
1766 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1767
1768 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1769 let original_schema = Arc::clone(builder.schema());
1770
1771 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1772 let reader = builder.with_projection(mask).build().unwrap();
1773
1774 assert_eq!(1, reader.schema().fields().len());
1776 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1777 }
1778
1779 #[test]
1780 fn test_arrow_reader_single_column_by_name() {
1781 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1782
1783 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1784 let original_schema = Arc::clone(builder.schema());
1785
1786 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1787 let reader = builder.with_projection(mask).build().unwrap();
1788
1789 assert_eq!(1, reader.schema().fields().len());
1791 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1792 }
1793
1794 #[test]
1795 fn test_null_column_reader_test() {
1796 let mut file = tempfile::tempfile().unwrap();
1797
1798 let schema = "
1799 message message {
1800 OPTIONAL INT32 int32;
1801 }
1802 ";
1803 let schema = Arc::new(parse_message_type(schema).unwrap());
1804
1805 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1806 generate_single_column_file_with_data::<Int32Type>(
1807 &[vec![], vec![]],
1808 Some(&def_levels),
1809 file.try_clone().unwrap(), schema,
1811 Some(Field::new("int32", ArrowDataType::Null, true)),
1812 &Default::default(),
1813 )
1814 .unwrap();
1815
1816 file.rewind().unwrap();
1817
1818 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1819 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1820
1821 assert_eq!(batches.len(), 4);
1822 for batch in &batches[0..3] {
1823 assert_eq!(batch.num_rows(), 2);
1824 assert_eq!(batch.num_columns(), 1);
1825 assert_eq!(batch.column(0).null_count(), 2);
1826 }
1827
1828 assert_eq!(batches[3].num_rows(), 1);
1829 assert_eq!(batches[3].num_columns(), 1);
1830 assert_eq!(batches[3].column(0).null_count(), 1);
1831 }
1832
1833 #[test]
1834 fn test_primitive_single_column_reader_test() {
1835 run_single_column_reader_tests::<BoolType, _, BoolType>(
1836 2,
1837 ConvertedType::NONE,
1838 None,
1839 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1840 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1841 );
1842 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1843 2,
1844 ConvertedType::NONE,
1845 None,
1846 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1847 &[
1848 Encoding::PLAIN,
1849 Encoding::RLE_DICTIONARY,
1850 Encoding::DELTA_BINARY_PACKED,
1851 Encoding::BYTE_STREAM_SPLIT,
1852 ],
1853 );
1854 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1855 2,
1856 ConvertedType::NONE,
1857 None,
1858 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1859 &[
1860 Encoding::PLAIN,
1861 Encoding::RLE_DICTIONARY,
1862 Encoding::DELTA_BINARY_PACKED,
1863 Encoding::BYTE_STREAM_SPLIT,
1864 ],
1865 );
1866 run_single_column_reader_tests::<FloatType, _, FloatType>(
1867 2,
1868 ConvertedType::NONE,
1869 None,
1870 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1871 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1872 );
1873 }
1874
1875 #[test]
1876 fn test_unsigned_primitive_single_column_reader_test() {
1877 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1878 2,
1879 ConvertedType::UINT_32,
1880 Some(ArrowDataType::UInt32),
1881 |vals| {
1882 Arc::new(UInt32Array::from_iter(
1883 vals.iter().map(|x| x.map(|x| x as u32)),
1884 ))
1885 },
1886 &[
1887 Encoding::PLAIN,
1888 Encoding::RLE_DICTIONARY,
1889 Encoding::DELTA_BINARY_PACKED,
1890 ],
1891 );
1892 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1893 2,
1894 ConvertedType::UINT_64,
1895 Some(ArrowDataType::UInt64),
1896 |vals| {
1897 Arc::new(UInt64Array::from_iter(
1898 vals.iter().map(|x| x.map(|x| x as u64)),
1899 ))
1900 },
1901 &[
1902 Encoding::PLAIN,
1903 Encoding::RLE_DICTIONARY,
1904 Encoding::DELTA_BINARY_PACKED,
1905 ],
1906 );
1907 }
1908
1909 #[test]
1910 fn test_unsigned_roundtrip() {
1911 let schema = Arc::new(Schema::new(vec![
1912 Field::new("uint32", ArrowDataType::UInt32, true),
1913 Field::new("uint64", ArrowDataType::UInt64, true),
1914 ]));
1915
1916 let mut buf = Vec::with_capacity(1024);
1917 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1918
1919 let original = RecordBatch::try_new(
1920 schema,
1921 vec![
1922 Arc::new(UInt32Array::from_iter_values([
1923 0,
1924 i32::MAX as u32,
1925 u32::MAX,
1926 ])),
1927 Arc::new(UInt64Array::from_iter_values([
1928 0,
1929 i64::MAX as u64,
1930 u64::MAX,
1931 ])),
1932 ],
1933 )
1934 .unwrap();
1935
1936 writer.write(&original).unwrap();
1937 writer.close().unwrap();
1938
1939 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1940 let ret = reader.next().unwrap().unwrap();
1941 assert_eq!(ret, original);
1942
1943 ret.column(0)
1945 .as_any()
1946 .downcast_ref::<UInt32Array>()
1947 .unwrap();
1948
1949 ret.column(1)
1950 .as_any()
1951 .downcast_ref::<UInt64Array>()
1952 .unwrap();
1953 }
1954
1955 #[test]
1956 fn test_float16_roundtrip() -> Result<()> {
1957 let schema = Arc::new(Schema::new(vec![
1958 Field::new("float16", ArrowDataType::Float16, false),
1959 Field::new("float16-nullable", ArrowDataType::Float16, true),
1960 ]));
1961
1962 let mut buf = Vec::with_capacity(1024);
1963 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1964
1965 let original = RecordBatch::try_new(
1966 schema,
1967 vec![
1968 Arc::new(Float16Array::from_iter_values([
1969 f16::EPSILON,
1970 f16::MIN,
1971 f16::MAX,
1972 f16::NAN,
1973 f16::INFINITY,
1974 f16::NEG_INFINITY,
1975 f16::ONE,
1976 f16::NEG_ONE,
1977 f16::ZERO,
1978 f16::NEG_ZERO,
1979 f16::E,
1980 f16::PI,
1981 f16::FRAC_1_PI,
1982 ])),
1983 Arc::new(Float16Array::from(vec![
1984 None,
1985 None,
1986 None,
1987 Some(f16::NAN),
1988 Some(f16::INFINITY),
1989 Some(f16::NEG_INFINITY),
1990 None,
1991 None,
1992 None,
1993 None,
1994 None,
1995 None,
1996 Some(f16::FRAC_1_PI),
1997 ])),
1998 ],
1999 )?;
2000
2001 writer.write(&original)?;
2002 writer.close()?;
2003
2004 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2005 let ret = reader.next().unwrap()?;
2006 assert_eq!(ret, original);
2007
2008 ret.column(0).as_primitive::<Float16Type>();
2010 ret.column(1).as_primitive::<Float16Type>();
2011
2012 Ok(())
2013 }
2014
2015 #[test]
2016 fn test_time_utc_roundtrip() -> Result<()> {
2017 let schema = Arc::new(Schema::new(vec![
2018 Field::new(
2019 "time_millis",
2020 ArrowDataType::Time32(TimeUnit::Millisecond),
2021 true,
2022 )
2023 .with_metadata(HashMap::from_iter(vec![(
2024 "adjusted_to_utc".to_string(),
2025 "".to_string(),
2026 )])),
2027 Field::new(
2028 "time_micros",
2029 ArrowDataType::Time64(TimeUnit::Microsecond),
2030 true,
2031 )
2032 .with_metadata(HashMap::from_iter(vec![(
2033 "adjusted_to_utc".to_string(),
2034 "".to_string(),
2035 )])),
2036 ]));
2037
2038 let mut buf = Vec::with_capacity(1024);
2039 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2040
2041 let original = RecordBatch::try_new(
2042 schema,
2043 vec![
2044 Arc::new(Time32MillisecondArray::from(vec![
2045 Some(-1),
2046 Some(0),
2047 Some(86_399_000),
2048 Some(86_400_000),
2049 Some(86_401_000),
2050 None,
2051 ])),
2052 Arc::new(Time64MicrosecondArray::from(vec![
2053 Some(-1),
2054 Some(0),
2055 Some(86_399 * 1_000_000),
2056 Some(86_400 * 1_000_000),
2057 Some(86_401 * 1_000_000),
2058 None,
2059 ])),
2060 ],
2061 )?;
2062
2063 writer.write(&original)?;
2064 writer.close()?;
2065
2066 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2067 let ret = reader.next().unwrap()?;
2068 assert_eq!(ret, original);
2069
2070 ret.column(0).as_primitive::<Time32MillisecondType>();
2072 ret.column(1).as_primitive::<Time64MicrosecondType>();
2073
2074 Ok(())
2075 }
2076
2077 #[test]
2078 fn test_date32_roundtrip() -> Result<()> {
2079 use arrow_array::Date32Array;
2080
2081 let schema = Arc::new(Schema::new(vec![Field::new(
2082 "date32",
2083 ArrowDataType::Date32,
2084 false,
2085 )]));
2086
2087 let mut buf = Vec::with_capacity(1024);
2088
2089 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2090
2091 let original = RecordBatch::try_new(
2092 schema,
2093 vec![Arc::new(Date32Array::from(vec![
2094 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2095 ]))],
2096 )?;
2097
2098 writer.write(&original)?;
2099 writer.close()?;
2100
2101 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2102 let ret = reader.next().unwrap()?;
2103 assert_eq!(ret, original);
2104
2105 ret.column(0).as_primitive::<Date32Type>();
2107
2108 Ok(())
2109 }
2110
2111 #[test]
2112 fn test_date64_roundtrip() -> Result<()> {
2113 use arrow_array::Date64Array;
2114
2115 let schema = Arc::new(Schema::new(vec![
2116 Field::new("small-date64", ArrowDataType::Date64, false),
2117 Field::new("big-date64", ArrowDataType::Date64, false),
2118 Field::new("invalid-date64", ArrowDataType::Date64, false),
2119 ]));
2120
2121 let mut default_buf = Vec::with_capacity(1024);
2122 let mut coerce_buf = Vec::with_capacity(1024);
2123
2124 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2125
2126 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2127 let mut coerce_writer =
2128 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2129
2130 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2131
2132 let original = RecordBatch::try_new(
2133 schema,
2134 vec![
2135 Arc::new(Date64Array::from(vec![
2137 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2138 -1_000 * NUM_MILLISECONDS_IN_DAY,
2139 0,
2140 1_000 * NUM_MILLISECONDS_IN_DAY,
2141 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2142 ])),
2143 Arc::new(Date64Array::from(vec![
2145 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2146 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2147 0,
2148 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2149 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2150 ])),
2151 Arc::new(Date64Array::from(vec![
2153 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2154 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2155 1,
2156 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2157 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2158 ])),
2159 ],
2160 )?;
2161
2162 default_writer.write(&original)?;
2163 coerce_writer.write(&original)?;
2164
2165 default_writer.close()?;
2166 coerce_writer.close()?;
2167
2168 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2169 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2170
2171 let default_ret = default_reader.next().unwrap()?;
2172 let coerce_ret = coerce_reader.next().unwrap()?;
2173
2174 assert_eq!(default_ret, original);
2176
2177 assert_eq!(coerce_ret.column(0), original.column(0));
2179 assert_ne!(coerce_ret.column(1), original.column(1));
2180 assert_ne!(coerce_ret.column(2), original.column(2));
2181
2182 default_ret.column(0).as_primitive::<Date64Type>();
2184 coerce_ret.column(0).as_primitive::<Date64Type>();
2185
2186 Ok(())
2187 }
2188 struct RandFixedLenGen {}
2189
2190 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2191 fn r#gen(len: i32) -> FixedLenByteArray {
2192 let mut v = vec![0u8; len as usize];
2193 rng().fill_bytes(&mut v);
2194 ByteArray::from(v).into()
2195 }
2196 }
2197
2198 #[test]
2199 fn test_fixed_length_binary_column_reader() {
2200 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2201 20,
2202 ConvertedType::NONE,
2203 None,
2204 |vals| {
2205 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2206 for val in vals {
2207 match val {
2208 Some(b) => builder.append_value(b).unwrap(),
2209 None => builder.append_null(),
2210 }
2211 }
2212 Arc::new(builder.finish())
2213 },
2214 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2215 );
2216 }
2217
2218 #[test]
2219 fn test_interval_day_time_column_reader() {
2220 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2221 12,
2222 ConvertedType::INTERVAL,
2223 None,
2224 |vals| {
2225 Arc::new(
2226 vals.iter()
2227 .map(|x| {
2228 x.as_ref().map(|b| IntervalDayTime {
2229 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2230 milliseconds: i32::from_le_bytes(
2231 b.as_ref()[8..12].try_into().unwrap(),
2232 ),
2233 })
2234 })
2235 .collect::<IntervalDayTimeArray>(),
2236 )
2237 },
2238 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2239 );
2240 }
2241
2242 #[test]
2243 fn test_int96_single_column_reader_test() {
2244 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2245
2246 type TypeHintAndConversionFunction =
2247 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2248
2249 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2250 (None, |vals: &[Option<Int96>]| {
2252 Arc::new(TimestampNanosecondArray::from_iter(
2253 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2254 )) as ArrayRef
2255 }),
2256 (
2258 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2259 |vals: &[Option<Int96>]| {
2260 Arc::new(TimestampSecondArray::from_iter(
2261 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2262 )) as ArrayRef
2263 },
2264 ),
2265 (
2266 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2267 |vals: &[Option<Int96>]| {
2268 Arc::new(TimestampMillisecondArray::from_iter(
2269 vals.iter().map(|x| x.map(|x| x.to_millis())),
2270 )) as ArrayRef
2271 },
2272 ),
2273 (
2274 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2275 |vals: &[Option<Int96>]| {
2276 Arc::new(TimestampMicrosecondArray::from_iter(
2277 vals.iter().map(|x| x.map(|x| x.to_micros())),
2278 )) as ArrayRef
2279 },
2280 ),
2281 (
2282 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2283 |vals: &[Option<Int96>]| {
2284 Arc::new(TimestampNanosecondArray::from_iter(
2285 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2286 )) as ArrayRef
2287 },
2288 ),
2289 (
2291 Some(ArrowDataType::Timestamp(
2292 TimeUnit::Second,
2293 Some(Arc::from("-05:00")),
2294 )),
2295 |vals: &[Option<Int96>]| {
2296 Arc::new(
2297 TimestampSecondArray::from_iter(
2298 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2299 )
2300 .with_timezone("-05:00"),
2301 ) as ArrayRef
2302 },
2303 ),
2304 ];
2305
2306 resolutions.iter().for_each(|(arrow_type, converter)| {
2307 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2308 2,
2309 ConvertedType::NONE,
2310 arrow_type.clone(),
2311 converter,
2312 encodings,
2313 );
2314 })
2315 }
2316
2317 #[test]
2318 fn test_int96_from_spark_file_with_provided_schema() {
2319 use arrow_schema::DataType::Timestamp;
2323 let test_data = arrow::util::test_util::parquet_test_data();
2324 let path = format!("{test_data}/int96_from_spark.parquet");
2325 let file = File::open(path).unwrap();
2326
2327 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2328 "a",
2329 Timestamp(TimeUnit::Microsecond, None),
2330 true,
2331 )]));
2332 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2333
2334 let mut record_reader =
2335 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2336 .unwrap()
2337 .build()
2338 .unwrap();
2339
2340 let batch = record_reader.next().unwrap().unwrap();
2341 assert_eq!(batch.num_columns(), 1);
2342 let column = batch.column(0);
2343 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2344
2345 let expected = Arc::new(Int64Array::from(vec![
2346 Some(1704141296123456),
2347 Some(1704070800000000),
2348 Some(253402225200000000),
2349 Some(1735599600000000),
2350 None,
2351 Some(9089380393200000000),
2352 ]));
2353
2354 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2359 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2360
2361 assert_eq!(casted_timestamps.len(), expected.len());
2362
2363 casted_timestamps
2364 .iter()
2365 .zip(expected.iter())
2366 .for_each(|(lhs, rhs)| {
2367 assert_eq!(lhs, rhs);
2368 });
2369 }
2370
2371 #[test]
2372 fn test_int96_from_spark_file_without_provided_schema() {
2373 use arrow_schema::DataType::Timestamp;
2377 let test_data = arrow::util::test_util::parquet_test_data();
2378 let path = format!("{test_data}/int96_from_spark.parquet");
2379 let file = File::open(path).unwrap();
2380
2381 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2382 .unwrap()
2383 .build()
2384 .unwrap();
2385
2386 let batch = record_reader.next().unwrap().unwrap();
2387 assert_eq!(batch.num_columns(), 1);
2388 let column = batch.column(0);
2389 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2390
2391 let expected = Arc::new(Int64Array::from(vec![
2392 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2397 Some(-4864435138808946688), ]));
2399
2400 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2405 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2406
2407 assert_eq!(casted_timestamps.len(), expected.len());
2408
2409 casted_timestamps
2410 .iter()
2411 .zip(expected.iter())
2412 .for_each(|(lhs, rhs)| {
2413 assert_eq!(lhs, rhs);
2414 });
2415 }
2416
2417 struct RandUtf8Gen {}
2418
2419 impl RandGen<ByteArrayType> for RandUtf8Gen {
2420 fn r#gen(len: i32) -> ByteArray {
2421 Int32Type::r#gen(len).to_string().as_str().into()
2422 }
2423 }
2424
2425 #[test]
2426 fn test_utf8_single_column_reader_test() {
2427 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2428 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2429 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2430 })))
2431 }
2432
2433 let encodings = &[
2434 Encoding::PLAIN,
2435 Encoding::RLE_DICTIONARY,
2436 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2437 Encoding::DELTA_BYTE_ARRAY,
2438 ];
2439
2440 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2441 2,
2442 ConvertedType::NONE,
2443 None,
2444 |vals| {
2445 Arc::new(BinaryArray::from_iter(
2446 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2447 ))
2448 },
2449 encodings,
2450 );
2451
2452 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2453 2,
2454 ConvertedType::UTF8,
2455 None,
2456 string_converter::<i32>,
2457 encodings,
2458 );
2459
2460 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2461 2,
2462 ConvertedType::UTF8,
2463 Some(ArrowDataType::Utf8),
2464 string_converter::<i32>,
2465 encodings,
2466 );
2467
2468 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2469 2,
2470 ConvertedType::UTF8,
2471 Some(ArrowDataType::LargeUtf8),
2472 string_converter::<i64>,
2473 encodings,
2474 );
2475
2476 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2477 for key in &small_key_types {
2478 for encoding in encodings {
2479 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2480 opts.encoding = *encoding;
2481
2482 let data_type =
2483 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2484
2485 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2487 opts,
2488 2,
2489 ConvertedType::UTF8,
2490 Some(data_type.clone()),
2491 move |vals| {
2492 let vals = string_converter::<i32>(vals);
2493 arrow::compute::cast(&vals, &data_type).unwrap()
2494 },
2495 );
2496 }
2497 }
2498
2499 let key_types = [
2500 ArrowDataType::Int16,
2501 ArrowDataType::UInt16,
2502 ArrowDataType::Int32,
2503 ArrowDataType::UInt32,
2504 ArrowDataType::Int64,
2505 ArrowDataType::UInt64,
2506 ];
2507
2508 for key in &key_types {
2509 let data_type =
2510 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2511
2512 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2513 2,
2514 ConvertedType::UTF8,
2515 Some(data_type.clone()),
2516 move |vals| {
2517 let vals = string_converter::<i32>(vals);
2518 arrow::compute::cast(&vals, &data_type).unwrap()
2519 },
2520 encodings,
2521 );
2522
2523 let data_type = ArrowDataType::Dictionary(
2524 Box::new(key.clone()),
2525 Box::new(ArrowDataType::LargeUtf8),
2526 );
2527
2528 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2529 2,
2530 ConvertedType::UTF8,
2531 Some(data_type.clone()),
2532 move |vals| {
2533 let vals = string_converter::<i64>(vals);
2534 arrow::compute::cast(&vals, &data_type).unwrap()
2535 },
2536 encodings,
2537 );
2538 }
2539 }
2540
2541 #[test]
2542 fn test_decimal_nullable_struct() {
2543 let decimals = Decimal256Array::from_iter_values(
2544 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2545 );
2546
2547 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2548 "decimals",
2549 decimals.data_type().clone(),
2550 false,
2551 )])))
2552 .len(8)
2553 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2554 .child_data(vec![decimals.into_data()])
2555 .build()
2556 .unwrap();
2557
2558 let written =
2559 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2560 .unwrap();
2561
2562 let mut buffer = Vec::with_capacity(1024);
2563 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2564 writer.write(&written).unwrap();
2565 writer.close().unwrap();
2566
2567 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2568 .unwrap()
2569 .collect::<Result<Vec<_>, _>>()
2570 .unwrap();
2571
2572 assert_eq!(&written.slice(0, 3), &read[0]);
2573 assert_eq!(&written.slice(3, 3), &read[1]);
2574 assert_eq!(&written.slice(6, 2), &read[2]);
2575 }
2576
2577 #[test]
2578 fn test_int32_nullable_struct() {
2579 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2580 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2581 "int32",
2582 int32.data_type().clone(),
2583 false,
2584 )])))
2585 .len(8)
2586 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2587 .child_data(vec![int32.into_data()])
2588 .build()
2589 .unwrap();
2590
2591 let written =
2592 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2593 .unwrap();
2594
2595 let mut buffer = Vec::with_capacity(1024);
2596 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2597 writer.write(&written).unwrap();
2598 writer.close().unwrap();
2599
2600 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2601 .unwrap()
2602 .collect::<Result<Vec<_>, _>>()
2603 .unwrap();
2604
2605 assert_eq!(&written.slice(0, 3), &read[0]);
2606 assert_eq!(&written.slice(3, 3), &read[1]);
2607 assert_eq!(&written.slice(6, 2), &read[2]);
2608 }
2609
2610 #[test]
2611 fn test_decimal_list() {
2612 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2613
2614 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2616 decimals.data_type().clone(),
2617 false,
2618 ))))
2619 .len(7)
2620 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2621 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2622 .child_data(vec![decimals.into_data()])
2623 .build()
2624 .unwrap();
2625
2626 let written =
2627 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2628 .unwrap();
2629
2630 let mut buffer = Vec::with_capacity(1024);
2631 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2632 writer.write(&written).unwrap();
2633 writer.close().unwrap();
2634
2635 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2636 .unwrap()
2637 .collect::<Result<Vec<_>, _>>()
2638 .unwrap();
2639
2640 assert_eq!(&written.slice(0, 3), &read[0]);
2641 assert_eq!(&written.slice(3, 3), &read[1]);
2642 assert_eq!(&written.slice(6, 1), &read[2]);
2643 }
2644
2645 #[test]
2646 fn test_read_decimal_file() {
2647 use arrow_array::Decimal128Array;
2648 let testdata = arrow::util::test_util::parquet_test_data();
2649 let file_variants = vec![
2650 ("byte_array", 4),
2651 ("fixed_length", 25),
2652 ("int32", 4),
2653 ("int64", 10),
2654 ];
2655 for (prefix, target_precision) in file_variants {
2656 let path = format!("{testdata}/{prefix}_decimal.parquet");
2657 let file = File::open(path).unwrap();
2658 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2659
2660 let batch = record_reader.next().unwrap().unwrap();
2661 assert_eq!(batch.num_rows(), 24);
2662 let col = batch
2663 .column(0)
2664 .as_any()
2665 .downcast_ref::<Decimal128Array>()
2666 .unwrap();
2667
2668 let expected = 1..25;
2669
2670 assert_eq!(col.precision(), target_precision);
2671 assert_eq!(col.scale(), 2);
2672
2673 for (i, v) in expected.enumerate() {
2674 assert_eq!(col.value(i), v * 100_i128);
2675 }
2676 }
2677 }
2678
2679 #[test]
2680 fn test_read_float16_nonzeros_file() {
2681 use arrow_array::Float16Array;
2682 let testdata = arrow::util::test_util::parquet_test_data();
2683 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2685 let file = File::open(path).unwrap();
2686 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2687
2688 let batch = record_reader.next().unwrap().unwrap();
2689 assert_eq!(batch.num_rows(), 8);
2690 let col = batch
2691 .column(0)
2692 .as_any()
2693 .downcast_ref::<Float16Array>()
2694 .unwrap();
2695
2696 let f16_two = f16::ONE + f16::ONE;
2697
2698 assert_eq!(col.null_count(), 1);
2699 assert!(col.is_null(0));
2700 assert_eq!(col.value(1), f16::ONE);
2701 assert_eq!(col.value(2), -f16_two);
2702 assert!(col.value(3).is_nan());
2703 assert_eq!(col.value(4), f16::ZERO);
2704 assert!(col.value(4).is_sign_positive());
2705 assert_eq!(col.value(5), f16::NEG_ONE);
2706 assert_eq!(col.value(6), f16::NEG_ZERO);
2707 assert!(col.value(6).is_sign_negative());
2708 assert_eq!(col.value(7), f16_two);
2709 }
2710
2711 #[test]
2712 fn test_read_float16_zeros_file() {
2713 use arrow_array::Float16Array;
2714 let testdata = arrow::util::test_util::parquet_test_data();
2715 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2717 let file = File::open(path).unwrap();
2718 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2719
2720 let batch = record_reader.next().unwrap().unwrap();
2721 assert_eq!(batch.num_rows(), 3);
2722 let col = batch
2723 .column(0)
2724 .as_any()
2725 .downcast_ref::<Float16Array>()
2726 .unwrap();
2727
2728 assert_eq!(col.null_count(), 1);
2729 assert!(col.is_null(0));
2730 assert_eq!(col.value(1), f16::ZERO);
2731 assert!(col.value(1).is_sign_positive());
2732 assert!(col.value(2).is_nan());
2733 }
2734
2735 #[test]
2736 fn test_read_float32_float64_byte_stream_split() {
2737 let path = format!(
2738 "{}/byte_stream_split.zstd.parquet",
2739 arrow::util::test_util::parquet_test_data(),
2740 );
2741 let file = File::open(path).unwrap();
2742 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2743
2744 let mut row_count = 0;
2745 for batch in record_reader {
2746 let batch = batch.unwrap();
2747 row_count += batch.num_rows();
2748 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2749 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2750
2751 for &x in f32_col.values() {
2753 assert!(x > -10.0);
2754 assert!(x < 10.0);
2755 }
2756 for &x in f64_col.values() {
2757 assert!(x > -10.0);
2758 assert!(x < 10.0);
2759 }
2760 }
2761 assert_eq!(row_count, 300);
2762 }
2763
2764 #[test]
2765 fn test_read_extended_byte_stream_split() {
2766 let path = format!(
2767 "{}/byte_stream_split_extended.gzip.parquet",
2768 arrow::util::test_util::parquet_test_data(),
2769 );
2770 let file = File::open(path).unwrap();
2771 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2772
2773 let mut row_count = 0;
2774 for batch in record_reader {
2775 let batch = batch.unwrap();
2776 row_count += batch.num_rows();
2777
2778 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2780 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2781 assert_eq!(f16_col.len(), f16_bss.len());
2782 f16_col
2783 .iter()
2784 .zip(f16_bss.iter())
2785 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2786
2787 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2789 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2790 assert_eq!(f32_col.len(), f32_bss.len());
2791 f32_col
2792 .iter()
2793 .zip(f32_bss.iter())
2794 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2795
2796 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2798 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2799 assert_eq!(f64_col.len(), f64_bss.len());
2800 f64_col
2801 .iter()
2802 .zip(f64_bss.iter())
2803 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2804
2805 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2807 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2808 assert_eq!(i32_col.len(), i32_bss.len());
2809 i32_col
2810 .iter()
2811 .zip(i32_bss.iter())
2812 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2813
2814 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2816 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2817 assert_eq!(i64_col.len(), i64_bss.len());
2818 i64_col
2819 .iter()
2820 .zip(i64_bss.iter())
2821 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2822
2823 let flba_col = batch.column(10).as_fixed_size_binary();
2825 let flba_bss = batch.column(11).as_fixed_size_binary();
2826 assert_eq!(flba_col.len(), flba_bss.len());
2827 flba_col
2828 .iter()
2829 .zip(flba_bss.iter())
2830 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2831
2832 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2834 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2835 assert_eq!(dec_col.len(), dec_bss.len());
2836 dec_col
2837 .iter()
2838 .zip(dec_bss.iter())
2839 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2840 }
2841 assert_eq!(row_count, 200);
2842 }
2843
2844 #[test]
2845 fn test_read_incorrect_map_schema_file() {
2846 let testdata = arrow::util::test_util::parquet_test_data();
2847 let path = format!("{testdata}/incorrect_map_schema.parquet");
2849 let file = File::open(path).unwrap();
2850 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2851
2852 let batch = record_reader.next().unwrap().unwrap();
2853 assert_eq!(batch.num_rows(), 1);
2854
2855 let expected_schema = Schema::new(vec![Field::new(
2856 "my_map",
2857 ArrowDataType::Map(
2858 Arc::new(Field::new(
2859 "key_value",
2860 ArrowDataType::Struct(Fields::from(vec![
2861 Field::new("key", ArrowDataType::Utf8, false),
2862 Field::new("value", ArrowDataType::Utf8, true),
2863 ])),
2864 false,
2865 )),
2866 false,
2867 ),
2868 true,
2869 )]);
2870 assert_eq!(batch.schema().as_ref(), &expected_schema);
2871
2872 assert_eq!(batch.num_rows(), 1);
2873 assert_eq!(batch.column(0).null_count(), 0);
2874 assert_eq!(
2875 batch.column(0).as_map().keys().as_ref(),
2876 &StringArray::from(vec!["parent", "name"])
2877 );
2878 assert_eq!(
2879 batch.column(0).as_map().values().as_ref(),
2880 &StringArray::from(vec!["another", "report"])
2881 );
2882 }
2883
2884 #[test]
2885 fn test_read_dict_fixed_size_binary() {
2886 let schema = Arc::new(Schema::new(vec![Field::new(
2887 "a",
2888 ArrowDataType::Dictionary(
2889 Box::new(ArrowDataType::UInt8),
2890 Box::new(ArrowDataType::FixedSizeBinary(8)),
2891 ),
2892 true,
2893 )]));
2894 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2895 let values = FixedSizeBinaryArray::try_from_iter(
2896 vec![
2897 (0u8..8u8).collect::<Vec<u8>>(),
2898 (24u8..32u8).collect::<Vec<u8>>(),
2899 ]
2900 .into_iter(),
2901 )
2902 .unwrap();
2903 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2904 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2905
2906 let mut buffer = Vec::with_capacity(1024);
2907 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2908 writer.write(&batch).unwrap();
2909 writer.close().unwrap();
2910 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2911 .unwrap()
2912 .collect::<Result<Vec<_>, _>>()
2913 .unwrap();
2914
2915 assert_eq!(read.len(), 1);
2916 assert_eq!(&batch, &read[0])
2917 }
2918
2919 #[test]
2920 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2921 let struct_fields = Fields::from(vec![
2928 Field::new(
2929 "city",
2930 ArrowDataType::Dictionary(
2931 Box::new(ArrowDataType::UInt8),
2932 Box::new(ArrowDataType::Utf8),
2933 ),
2934 true,
2935 ),
2936 Field::new("name", ArrowDataType::Utf8, true),
2937 ]);
2938 let schema = Arc::new(Schema::new(vec![Field::new(
2939 "items",
2940 ArrowDataType::Struct(struct_fields.clone()),
2941 true,
2942 )]));
2943
2944 let items_arr = StructArray::new(
2945 struct_fields,
2946 vec![
2947 Arc::new(DictionaryArray::new(
2948 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2949 Arc::new(StringArray::from_iter_values(vec![
2950 "quebec",
2951 "fredericton",
2952 "halifax",
2953 ])),
2954 )),
2955 Arc::new(StringArray::from_iter_values(vec![
2956 "albert", "terry", "lance", "", "tim",
2957 ])),
2958 ],
2959 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2960 );
2961
2962 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2963 let mut buffer = Vec::with_capacity(1024);
2964 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2965 writer.write(&batch).unwrap();
2966 writer.close().unwrap();
2967 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2968 .unwrap()
2969 .collect::<Result<Vec<_>, _>>()
2970 .unwrap();
2971
2972 assert_eq!(read.len(), 1);
2973 assert_eq!(&batch, &read[0])
2974 }
2975
2976 #[derive(Clone)]
2978 struct TestOptions {
2979 num_row_groups: usize,
2982 num_rows: usize,
2984 record_batch_size: usize,
2986 null_percent: Option<usize>,
2988 write_batch_size: usize,
2993 max_data_page_size: usize,
2995 max_dict_page_size: usize,
2997 writer_version: WriterVersion,
2999 enabled_statistics: EnabledStatistics,
3001 encoding: Encoding,
3003 row_selections: Option<(RowSelection, usize)>,
3005 row_filter: Option<Vec<bool>>,
3007 limit: Option<usize>,
3009 offset: Option<usize>,
3011 }
3012
3013 impl std::fmt::Debug for TestOptions {
3015 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3016 f.debug_struct("TestOptions")
3017 .field("num_row_groups", &self.num_row_groups)
3018 .field("num_rows", &self.num_rows)
3019 .field("record_batch_size", &self.record_batch_size)
3020 .field("null_percent", &self.null_percent)
3021 .field("write_batch_size", &self.write_batch_size)
3022 .field("max_data_page_size", &self.max_data_page_size)
3023 .field("max_dict_page_size", &self.max_dict_page_size)
3024 .field("writer_version", &self.writer_version)
3025 .field("enabled_statistics", &self.enabled_statistics)
3026 .field("encoding", &self.encoding)
3027 .field("row_selections", &self.row_selections.is_some())
3028 .field("row_filter", &self.row_filter.is_some())
3029 .field("limit", &self.limit)
3030 .field("offset", &self.offset)
3031 .finish()
3032 }
3033 }
3034
3035 impl Default for TestOptions {
3036 fn default() -> Self {
3037 Self {
3038 num_row_groups: 2,
3039 num_rows: 100,
3040 record_batch_size: 15,
3041 null_percent: None,
3042 write_batch_size: 64,
3043 max_data_page_size: 1024 * 1024,
3044 max_dict_page_size: 1024 * 1024,
3045 writer_version: WriterVersion::PARQUET_1_0,
3046 enabled_statistics: EnabledStatistics::Page,
3047 encoding: Encoding::PLAIN,
3048 row_selections: None,
3049 row_filter: None,
3050 limit: None,
3051 offset: None,
3052 }
3053 }
3054 }
3055
3056 impl TestOptions {
3057 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3058 Self {
3059 num_row_groups,
3060 num_rows,
3061 record_batch_size,
3062 ..Default::default()
3063 }
3064 }
3065
3066 fn with_null_percent(self, null_percent: usize) -> Self {
3067 Self {
3068 null_percent: Some(null_percent),
3069 ..self
3070 }
3071 }
3072
3073 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3074 Self {
3075 max_data_page_size,
3076 ..self
3077 }
3078 }
3079
3080 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3081 Self {
3082 max_dict_page_size,
3083 ..self
3084 }
3085 }
3086
3087 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3088 Self {
3089 enabled_statistics,
3090 ..self
3091 }
3092 }
3093
3094 fn with_row_selections(self) -> Self {
3095 assert!(self.row_filter.is_none(), "Must set row selection first");
3096
3097 let mut rng = rng();
3098 let step = rng.random_range(self.record_batch_size..self.num_rows);
3099 let row_selections = create_test_selection(
3100 step,
3101 self.num_row_groups * self.num_rows,
3102 rng.random::<bool>(),
3103 );
3104 Self {
3105 row_selections: Some(row_selections),
3106 ..self
3107 }
3108 }
3109
3110 fn with_row_filter(self) -> Self {
3111 let row_count = match &self.row_selections {
3112 Some((_, count)) => *count,
3113 None => self.num_row_groups * self.num_rows,
3114 };
3115
3116 let mut rng = rng();
3117 Self {
3118 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3119 ..self
3120 }
3121 }
3122
3123 fn with_limit(self, limit: usize) -> Self {
3124 Self {
3125 limit: Some(limit),
3126 ..self
3127 }
3128 }
3129
3130 fn with_offset(self, offset: usize) -> Self {
3131 Self {
3132 offset: Some(offset),
3133 ..self
3134 }
3135 }
3136
3137 fn writer_props(&self) -> WriterProperties {
3138 let builder = WriterProperties::builder()
3139 .set_data_page_size_limit(self.max_data_page_size)
3140 .set_write_batch_size(self.write_batch_size)
3141 .set_writer_version(self.writer_version)
3142 .set_statistics_enabled(self.enabled_statistics);
3143
3144 let builder = match self.encoding {
3145 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3146 .set_dictionary_enabled(true)
3147 .set_dictionary_page_size_limit(self.max_dict_page_size),
3148 _ => builder
3149 .set_dictionary_enabled(false)
3150 .set_encoding(self.encoding),
3151 };
3152
3153 builder.build()
3154 }
3155 }
3156
3157 fn run_single_column_reader_tests<T, F, G>(
3164 rand_max: i32,
3165 converted_type: ConvertedType,
3166 arrow_type: Option<ArrowDataType>,
3167 converter: F,
3168 encodings: &[Encoding],
3169 ) where
3170 T: DataType,
3171 G: RandGen<T>,
3172 F: Fn(&[Option<T::T>]) -> ArrayRef,
3173 {
3174 let all_options = vec![
3175 TestOptions::new(2, 100, 15),
3178 TestOptions::new(3, 25, 5),
3183 TestOptions::new(4, 100, 25),
3187 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3189 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3191 TestOptions::new(2, 256, 127).with_null_percent(0),
3193 TestOptions::new(2, 256, 93).with_null_percent(25),
3195 TestOptions::new(4, 100, 25).with_limit(0),
3197 TestOptions::new(4, 100, 25).with_limit(50),
3199 TestOptions::new(4, 100, 25).with_limit(10),
3201 TestOptions::new(4, 100, 25).with_limit(101),
3203 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3205 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3207 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3209 TestOptions::new(2, 256, 91)
3211 .with_null_percent(25)
3212 .with_enabled_statistics(EnabledStatistics::Chunk),
3213 TestOptions::new(2, 256, 91)
3215 .with_null_percent(25)
3216 .with_enabled_statistics(EnabledStatistics::None),
3217 TestOptions::new(2, 128, 91)
3219 .with_null_percent(100)
3220 .with_enabled_statistics(EnabledStatistics::None),
3221 TestOptions::new(2, 100, 15).with_row_selections(),
3226 TestOptions::new(3, 25, 5).with_row_selections(),
3231 TestOptions::new(4, 100, 25).with_row_selections(),
3235 TestOptions::new(3, 256, 73)
3237 .with_max_data_page_size(128)
3238 .with_row_selections(),
3239 TestOptions::new(3, 256, 57)
3241 .with_max_dict_page_size(128)
3242 .with_row_selections(),
3243 TestOptions::new(2, 256, 127)
3245 .with_null_percent(0)
3246 .with_row_selections(),
3247 TestOptions::new(2, 256, 93)
3249 .with_null_percent(25)
3250 .with_row_selections(),
3251 TestOptions::new(2, 256, 93)
3253 .with_null_percent(25)
3254 .with_row_selections()
3255 .with_limit(10),
3256 TestOptions::new(2, 256, 93)
3258 .with_null_percent(25)
3259 .with_row_selections()
3260 .with_offset(20)
3261 .with_limit(10),
3262 TestOptions::new(4, 100, 25).with_row_filter(),
3266 TestOptions::new(4, 100, 25)
3268 .with_row_selections()
3269 .with_row_filter(),
3270 TestOptions::new(2, 256, 93)
3272 .with_null_percent(25)
3273 .with_max_data_page_size(10)
3274 .with_row_filter(),
3275 TestOptions::new(2, 256, 93)
3277 .with_null_percent(25)
3278 .with_max_data_page_size(10)
3279 .with_row_selections()
3280 .with_row_filter(),
3281 TestOptions::new(2, 256, 93)
3283 .with_enabled_statistics(EnabledStatistics::None)
3284 .with_max_data_page_size(10)
3285 .with_row_selections(),
3286 ];
3287
3288 all_options.into_iter().for_each(|opts| {
3289 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3290 for encoding in encodings {
3291 let opts = TestOptions {
3292 writer_version,
3293 encoding: *encoding,
3294 ..opts.clone()
3295 };
3296
3297 single_column_reader_test::<T, _, G>(
3298 opts,
3299 rand_max,
3300 converted_type,
3301 arrow_type.clone(),
3302 &converter,
3303 )
3304 }
3305 }
3306 });
3307 }
3308
3309 fn single_column_reader_test<T, F, G>(
3313 opts: TestOptions,
3314 rand_max: i32,
3315 converted_type: ConvertedType,
3316 arrow_type: Option<ArrowDataType>,
3317 converter: F,
3318 ) where
3319 T: DataType,
3320 G: RandGen<T>,
3321 F: Fn(&[Option<T::T>]) -> ArrayRef,
3322 {
3323 println!(
3325 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3326 T::get_physical_type(),
3327 converted_type,
3328 arrow_type,
3329 opts
3330 );
3331
3332 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3334 Some(null_percent) => {
3335 let mut rng = rng();
3336
3337 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3338 .map(|_| {
3339 std::iter::from_fn(|| {
3340 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3341 })
3342 .take(opts.num_rows)
3343 .collect()
3344 })
3345 .collect();
3346 (Repetition::OPTIONAL, Some(def_levels))
3347 }
3348 None => (Repetition::REQUIRED, None),
3349 };
3350
3351 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3353 .map(|idx| {
3354 let null_count = match def_levels.as_ref() {
3355 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3356 None => 0,
3357 };
3358 G::gen_vec(rand_max, opts.num_rows - null_count)
3359 })
3360 .collect();
3361
3362 let len = match T::get_physical_type() {
3363 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3364 crate::basic::Type::INT96 => 12,
3365 _ => -1,
3366 };
3367
3368 let fields = vec![Arc::new(
3369 Type::primitive_type_builder("leaf", T::get_physical_type())
3370 .with_repetition(repetition)
3371 .with_converted_type(converted_type)
3372 .with_length(len)
3373 .build()
3374 .unwrap(),
3375 )];
3376
3377 let schema = Arc::new(
3378 Type::group_type_builder("test_schema")
3379 .with_fields(fields)
3380 .build()
3381 .unwrap(),
3382 );
3383
3384 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3385
3386 let mut file = tempfile::tempfile().unwrap();
3387
3388 generate_single_column_file_with_data::<T>(
3389 &values,
3390 def_levels.as_ref(),
3391 file.try_clone().unwrap(), schema,
3393 arrow_field,
3394 &opts,
3395 )
3396 .unwrap();
3397
3398 file.rewind().unwrap();
3399
3400 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3401 opts.enabled_statistics == EnabledStatistics::Page,
3402 ));
3403
3404 let mut builder =
3405 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3406
3407 let expected_data = match opts.row_selections {
3408 Some((selections, row_count)) => {
3409 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3410
3411 let mut skip_data: Vec<Option<T::T>> = vec![];
3412 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3413 for select in dequeue {
3414 if select.skip {
3415 without_skip_data.drain(0..select.row_count);
3416 } else {
3417 skip_data.extend(without_skip_data.drain(0..select.row_count));
3418 }
3419 }
3420 builder = builder.with_row_selection(selections);
3421
3422 assert_eq!(skip_data.len(), row_count);
3423 skip_data
3424 }
3425 None => {
3426 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3428 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3429 expected_data
3430 }
3431 };
3432
3433 let mut expected_data = match opts.row_filter {
3434 Some(filter) => {
3435 let expected_data = expected_data
3436 .into_iter()
3437 .zip(filter.iter())
3438 .filter_map(|(d, f)| f.then(|| d))
3439 .collect();
3440
3441 let mut filter_offset = 0;
3442 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3443 ProjectionMask::all(),
3444 move |b| {
3445 let array = BooleanArray::from_iter(
3446 filter
3447 .iter()
3448 .skip(filter_offset)
3449 .take(b.num_rows())
3450 .map(|x| Some(*x)),
3451 );
3452 filter_offset += b.num_rows();
3453 Ok(array)
3454 },
3455 ))]);
3456
3457 builder = builder.with_row_filter(filter);
3458 expected_data
3459 }
3460 None => expected_data,
3461 };
3462
3463 if let Some(offset) = opts.offset {
3464 builder = builder.with_offset(offset);
3465 expected_data = expected_data.into_iter().skip(offset).collect();
3466 }
3467
3468 if let Some(limit) = opts.limit {
3469 builder = builder.with_limit(limit);
3470 expected_data = expected_data.into_iter().take(limit).collect();
3471 }
3472
3473 let mut record_reader = builder
3474 .with_batch_size(opts.record_batch_size)
3475 .build()
3476 .unwrap();
3477
3478 let mut total_read = 0;
3479 loop {
3480 let maybe_batch = record_reader.next();
3481 if total_read < expected_data.len() {
3482 let end = min(total_read + opts.record_batch_size, expected_data.len());
3483 let batch = maybe_batch.unwrap().unwrap();
3484 assert_eq!(end - total_read, batch.num_rows());
3485
3486 let a = converter(&expected_data[total_read..end]);
3487 let b = batch.column(0);
3488
3489 assert_eq!(a.data_type(), b.data_type());
3490 assert_eq!(a.to_data(), b.to_data());
3491 assert_eq!(
3492 a.as_any().type_id(),
3493 b.as_any().type_id(),
3494 "incorrect type ids"
3495 );
3496
3497 total_read = end;
3498 } else {
3499 assert!(maybe_batch.is_none());
3500 break;
3501 }
3502 }
3503 }
3504
3505 fn gen_expected_data<T: DataType>(
3506 def_levels: Option<&Vec<Vec<i16>>>,
3507 values: &[Vec<T::T>],
3508 ) -> Vec<Option<T::T>> {
3509 let data: Vec<Option<T::T>> = match def_levels {
3510 Some(levels) => {
3511 let mut values_iter = values.iter().flatten();
3512 levels
3513 .iter()
3514 .flatten()
3515 .map(|d| match d {
3516 1 => Some(values_iter.next().cloned().unwrap()),
3517 0 => None,
3518 _ => unreachable!(),
3519 })
3520 .collect()
3521 }
3522 None => values.iter().flatten().cloned().map(Some).collect(),
3523 };
3524 data
3525 }
3526
3527 fn generate_single_column_file_with_data<T: DataType>(
3528 values: &[Vec<T::T>],
3529 def_levels: Option<&Vec<Vec<i16>>>,
3530 file: File,
3531 schema: TypePtr,
3532 field: Option<Field>,
3533 opts: &TestOptions,
3534 ) -> Result<ParquetMetaData> {
3535 let mut writer_props = opts.writer_props();
3536 if let Some(field) = field {
3537 let arrow_schema = Schema::new(vec![field]);
3538 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3539 }
3540
3541 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3542
3543 for (idx, v) in values.iter().enumerate() {
3544 let def_levels = def_levels.map(|d| d[idx].as_slice());
3545 let mut row_group_writer = writer.next_row_group()?;
3546 {
3547 let mut column_writer = row_group_writer
3548 .next_column()?
3549 .expect("Column writer is none!");
3550
3551 column_writer
3552 .typed::<T>()
3553 .write_batch(v, def_levels, None)?;
3554
3555 column_writer.close()?;
3556 }
3557 row_group_writer.close()?;
3558 }
3559
3560 writer.close()
3561 }
3562
3563 fn get_test_file(file_name: &str) -> File {
3564 let mut path = PathBuf::new();
3565 path.push(arrow::util::test_util::arrow_test_data());
3566 path.push(file_name);
3567
3568 File::open(path.as_path()).expect("File not found!")
3569 }
3570
3571 #[test]
3572 fn test_read_structs() {
3573 let testdata = arrow::util::test_util::parquet_test_data();
3577 let path = format!("{testdata}/nested_structs.rust.parquet");
3578 let file = File::open(&path).unwrap();
3579 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3580
3581 for batch in record_batch_reader {
3582 batch.unwrap();
3583 }
3584
3585 let file = File::open(&path).unwrap();
3586 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3587
3588 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3589 let projected_reader = builder
3590 .with_projection(mask)
3591 .with_batch_size(60)
3592 .build()
3593 .unwrap();
3594
3595 let expected_schema = Schema::new(vec![
3596 Field::new(
3597 "roll_num",
3598 ArrowDataType::Struct(Fields::from(vec![Field::new(
3599 "count",
3600 ArrowDataType::UInt64,
3601 false,
3602 )])),
3603 false,
3604 ),
3605 Field::new(
3606 "PC_CUR",
3607 ArrowDataType::Struct(Fields::from(vec![
3608 Field::new("mean", ArrowDataType::Int64, false),
3609 Field::new("sum", ArrowDataType::Int64, false),
3610 ])),
3611 false,
3612 ),
3613 ]);
3614
3615 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3617
3618 for batch in projected_reader {
3619 let batch = batch.unwrap();
3620 assert_eq!(batch.schema().as_ref(), &expected_schema);
3621 }
3622 }
3623
3624 #[test]
3625 fn test_read_structs_by_name() {
3627 let testdata = arrow::util::test_util::parquet_test_data();
3628 let path = format!("{testdata}/nested_structs.rust.parquet");
3629 let file = File::open(&path).unwrap();
3630 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3631
3632 for batch in record_batch_reader {
3633 batch.unwrap();
3634 }
3635
3636 let file = File::open(&path).unwrap();
3637 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3638
3639 let mask = ProjectionMask::columns(
3640 builder.parquet_schema(),
3641 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3642 );
3643 let projected_reader = builder
3644 .with_projection(mask)
3645 .with_batch_size(60)
3646 .build()
3647 .unwrap();
3648
3649 let expected_schema = Schema::new(vec![
3650 Field::new(
3651 "roll_num",
3652 ArrowDataType::Struct(Fields::from(vec![Field::new(
3653 "count",
3654 ArrowDataType::UInt64,
3655 false,
3656 )])),
3657 false,
3658 ),
3659 Field::new(
3660 "PC_CUR",
3661 ArrowDataType::Struct(Fields::from(vec![
3662 Field::new("mean", ArrowDataType::Int64, false),
3663 Field::new("sum", ArrowDataType::Int64, false),
3664 ])),
3665 false,
3666 ),
3667 ]);
3668
3669 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3670
3671 for batch in projected_reader {
3672 let batch = batch.unwrap();
3673 assert_eq!(batch.schema().as_ref(), &expected_schema);
3674 }
3675 }
3676
3677 #[test]
3678 fn test_read_maps() {
3679 let testdata = arrow::util::test_util::parquet_test_data();
3680 let path = format!("{testdata}/nested_maps.snappy.parquet");
3681 let file = File::open(path).unwrap();
3682 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3683
3684 for batch in record_batch_reader {
3685 batch.unwrap();
3686 }
3687 }
3688
3689 #[test]
3691 fn test_unknown_logical_type() {
3692 let message_type = "message uk {
3693 OPTIONAL INT32 uki32 (UNKNOWN);
3694 OPTIONAL INT64 uki64 (UNKNOWN);
3695 OPTIONAL INT96 uki96 (UNKNOWN);
3696 OPTIONAL BOOLEAN ukbool (UNKNOWN);
3697 OPTIONAL FLOAT ukfloat (UNKNOWN);
3698 OPTIONAL DOUBLE ukdbl (UNKNOWN);
3699 OPTIONAL BYTE_ARRAY ukbytes (UNKNOWN);
3700 OPTIONAL FIXED_LEN_BYTE_ARRAY(10) ukflba (UNKNOWN);
3701 }";
3702
3703 let schema = Arc::new(parse_message_type(message_type).unwrap());
3704 let file = tempfile::tempfile().unwrap();
3705
3706 let mut writer =
3707 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3708 .unwrap();
3709
3710 let mut row_group_writer = writer.next_row_group().unwrap();
3711
3712 fn write_nulls<T: DataType>(row_group_writer: &mut SerializedRowGroupWriter<'_, File>) {
3713 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3714 column_writer
3716 .typed::<T>()
3717 .write_batch(&[], Some(&[0, 0, 0, 0]), None)
3718 .unwrap();
3719 column_writer.close().unwrap();
3720 }
3721
3722 write_nulls::<Int32Type>(&mut row_group_writer);
3724
3725 write_nulls::<Int64Type>(&mut row_group_writer);
3727
3728 write_nulls::<Int96Type>(&mut row_group_writer);
3730
3731 write_nulls::<BoolType>(&mut row_group_writer);
3733
3734 write_nulls::<FloatType>(&mut row_group_writer);
3736
3737 write_nulls::<DoubleType>(&mut row_group_writer);
3739
3740 write_nulls::<ByteArrayType>(&mut row_group_writer);
3742
3743 write_nulls::<FixedLenByteArrayType>(&mut row_group_writer);
3745
3746 row_group_writer.close().unwrap();
3747
3748 writer.close().unwrap();
3749
3750 let mut reader = ParquetRecordBatchReader::try_new(file, 4).unwrap();
3751 let batch = reader.next().unwrap().unwrap();
3752
3753 for col in batch.columns() {
3754 assert_eq!(col.len(), 4);
3755 assert_eq!(col.logical_null_count(), 4);
3756 assert_eq!(*col.data_type(), ArrowDataType::Null);
3757 }
3758 }
3759
3760 #[test]
3761 fn test_nested_nullability() {
3762 let message_type = "message nested {
3763 OPTIONAL Group group {
3764 REQUIRED INT32 leaf;
3765 }
3766 }";
3767
3768 let file = tempfile::tempfile().unwrap();
3769 let schema = Arc::new(parse_message_type(message_type).unwrap());
3770
3771 {
3772 let mut writer =
3774 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3775 .unwrap();
3776
3777 {
3778 let mut row_group_writer = writer.next_row_group().unwrap();
3779 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3780
3781 column_writer
3782 .typed::<Int32Type>()
3783 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3784 .unwrap();
3785
3786 column_writer.close().unwrap();
3787 row_group_writer.close().unwrap();
3788 }
3789
3790 writer.close().unwrap();
3791 }
3792
3793 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3794 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3795
3796 let reader = builder.with_projection(mask).build().unwrap();
3797
3798 let expected_schema = Schema::new(vec![Field::new(
3799 "group",
3800 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3801 true,
3802 )]);
3803
3804 let batch = reader.into_iter().next().unwrap().unwrap();
3805 assert_eq!(batch.schema().as_ref(), &expected_schema);
3806 assert_eq!(batch.num_rows(), 4);
3807 assert_eq!(batch.column(0).null_count(), 2);
3808 }
3809
3810 #[test]
3811 fn test_dictionary_preservation() {
3812 let fields = vec![Arc::new(
3813 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3814 .with_repetition(Repetition::OPTIONAL)
3815 .with_converted_type(ConvertedType::UTF8)
3816 .build()
3817 .unwrap(),
3818 )];
3819
3820 let schema = Arc::new(
3821 Type::group_type_builder("test_schema")
3822 .with_fields(fields)
3823 .build()
3824 .unwrap(),
3825 );
3826
3827 let dict_type = ArrowDataType::Dictionary(
3828 Box::new(ArrowDataType::Int32),
3829 Box::new(ArrowDataType::Utf8),
3830 );
3831
3832 let arrow_field = Field::new("leaf", dict_type, true);
3833
3834 let mut file = tempfile::tempfile().unwrap();
3835
3836 let values = vec![
3837 vec![
3838 ByteArray::from("hello"),
3839 ByteArray::from("a"),
3840 ByteArray::from("b"),
3841 ByteArray::from("d"),
3842 ],
3843 vec![
3844 ByteArray::from("c"),
3845 ByteArray::from("a"),
3846 ByteArray::from("b"),
3847 ],
3848 ];
3849
3850 let def_levels = vec![
3851 vec![1, 0, 0, 1, 0, 0, 1, 1],
3852 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3853 ];
3854
3855 let opts = TestOptions {
3856 encoding: Encoding::RLE_DICTIONARY,
3857 ..Default::default()
3858 };
3859
3860 generate_single_column_file_with_data::<ByteArrayType>(
3861 &values,
3862 Some(&def_levels),
3863 file.try_clone().unwrap(), schema,
3865 Some(arrow_field),
3866 &opts,
3867 )
3868 .unwrap();
3869
3870 file.rewind().unwrap();
3871
3872 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3873
3874 let batches = record_reader
3875 .collect::<Result<Vec<RecordBatch>, _>>()
3876 .unwrap();
3877
3878 assert_eq!(batches.len(), 6);
3879 assert!(batches.iter().all(|x| x.num_columns() == 1));
3880
3881 let row_counts = batches
3882 .iter()
3883 .map(|x| (x.num_rows(), x.column(0).null_count()))
3884 .collect::<Vec<_>>();
3885
3886 assert_eq!(
3887 row_counts,
3888 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3889 );
3890
3891 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3892
3893 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3895 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3897 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3898 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3900 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3901 }
3902
3903 #[test]
3904 fn test_read_null_list() {
3905 let testdata = arrow::util::test_util::parquet_test_data();
3906 let path = format!("{testdata}/null_list.parquet");
3907 let file = File::open(path).unwrap();
3908 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3909
3910 let batch = record_batch_reader.next().unwrap().unwrap();
3911 assert_eq!(batch.num_rows(), 1);
3912 assert_eq!(batch.num_columns(), 1);
3913 assert_eq!(batch.column(0).len(), 1);
3914
3915 let list = batch
3916 .column(0)
3917 .as_any()
3918 .downcast_ref::<ListArray>()
3919 .unwrap();
3920 assert_eq!(list.len(), 1);
3921 assert!(list.is_valid(0));
3922
3923 let val = list.value(0);
3924 assert_eq!(val.len(), 0);
3925 }
3926
3927 #[test]
3928 fn test_null_schema_inference() {
3929 let testdata = arrow::util::test_util::parquet_test_data();
3930 let path = format!("{testdata}/null_list.parquet");
3931 let file = File::open(path).unwrap();
3932
3933 let arrow_field = Field::new(
3934 "emptylist",
3935 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3936 true,
3937 );
3938
3939 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3940 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3941 let schema = builder.schema();
3942 assert_eq!(schema.fields().len(), 1);
3943 assert_eq!(schema.field(0), &arrow_field);
3944 }
3945
3946 #[test]
3947 fn test_skip_metadata() {
3948 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3949 let field = Field::new("col", col.data_type().clone(), true);
3950
3951 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3952
3953 let metadata = [("key".to_string(), "value".to_string())]
3954 .into_iter()
3955 .collect();
3956
3957 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3958
3959 assert_ne!(schema_with_metadata, schema_without_metadata);
3960
3961 let batch =
3962 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3963
3964 let file = |version: WriterVersion| {
3965 let props = WriterProperties::builder()
3966 .set_writer_version(version)
3967 .build();
3968
3969 let file = tempfile().unwrap();
3970 let mut writer =
3971 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3972 .unwrap();
3973 writer.write(&batch).unwrap();
3974 writer.close().unwrap();
3975 file
3976 };
3977
3978 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3979
3980 let v1_reader = file(WriterVersion::PARQUET_1_0);
3981 let v2_reader = file(WriterVersion::PARQUET_2_0);
3982
3983 let arrow_reader =
3984 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3985 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3986
3987 let reader =
3988 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3989 .unwrap()
3990 .build()
3991 .unwrap();
3992 assert_eq!(reader.schema(), schema_without_metadata);
3993
3994 let arrow_reader =
3995 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3996 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3997
3998 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3999 .unwrap()
4000 .build()
4001 .unwrap();
4002 assert_eq!(reader.schema(), schema_without_metadata);
4003 }
4004
4005 fn write_parquet_from_iter<I, F>(value: I) -> File
4006 where
4007 I: IntoIterator<Item = (F, ArrayRef)>,
4008 F: AsRef<str>,
4009 {
4010 let batch = RecordBatch::try_from_iter(value).unwrap();
4011 let file = tempfile().unwrap();
4012 let mut writer =
4013 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4014 writer.write(&batch).unwrap();
4015 writer.close().unwrap();
4016 file
4017 }
4018
4019 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4020 where
4021 I: IntoIterator<Item = (F, ArrayRef)>,
4022 F: AsRef<str>,
4023 {
4024 let file = write_parquet_from_iter(value);
4025 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4026 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4027 file.try_clone().unwrap(),
4028 options_with_schema,
4029 );
4030 assert_eq!(builder.err().unwrap().to_string(), expected_error);
4031 }
4032
4033 #[test]
4034 fn test_schema_too_few_columns() {
4035 run_schema_test_with_error(
4036 vec![
4037 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4038 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4039 ],
4040 Arc::new(Schema::new(vec![Field::new(
4041 "int64",
4042 ArrowDataType::Int64,
4043 false,
4044 )])),
4045 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4046 );
4047 }
4048
4049 #[test]
4050 fn test_schema_too_many_columns() {
4051 run_schema_test_with_error(
4052 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4053 Arc::new(Schema::new(vec![
4054 Field::new("int64", ArrowDataType::Int64, false),
4055 Field::new("int32", ArrowDataType::Int32, false),
4056 ])),
4057 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4058 );
4059 }
4060
4061 #[test]
4062 fn test_schema_mismatched_column_names() {
4063 run_schema_test_with_error(
4064 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4065 Arc::new(Schema::new(vec![Field::new(
4066 "other",
4067 ArrowDataType::Int64,
4068 false,
4069 )])),
4070 "Arrow: incompatible arrow schema, expected field named int64 got other",
4071 );
4072 }
4073
4074 #[test]
4075 fn test_schema_incompatible_columns() {
4076 run_schema_test_with_error(
4077 vec![
4078 (
4079 "col1_invalid",
4080 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4081 ),
4082 (
4083 "col2_valid",
4084 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4085 ),
4086 (
4087 "col3_invalid",
4088 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4089 ),
4090 ],
4091 Arc::new(Schema::new(vec![
4092 Field::new("col1_invalid", ArrowDataType::Int32, false),
4093 Field::new("col2_valid", ArrowDataType::Int32, false),
4094 Field::new("col3_invalid", ArrowDataType::Int32, false),
4095 ])),
4096 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4097 );
4098 }
4099
4100 #[test]
4101 fn test_one_incompatible_nested_column() {
4102 let nested_fields = Fields::from(vec![
4103 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4104 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4105 ]);
4106 let nested = StructArray::try_new(
4107 nested_fields,
4108 vec![
4109 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4110 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4111 ],
4112 None,
4113 )
4114 .expect("struct array");
4115 let supplied_nested_fields = Fields::from(vec![
4116 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4117 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4118 ]);
4119 run_schema_test_with_error(
4120 vec![
4121 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4122 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4123 ("nested", Arc::new(nested) as ArrayRef),
4124 ],
4125 Arc::new(Schema::new(vec![
4126 Field::new("col1", ArrowDataType::Int64, false),
4127 Field::new("col2", ArrowDataType::Int32, false),
4128 Field::new(
4129 "nested",
4130 ArrowDataType::Struct(supplied_nested_fields),
4131 false,
4132 ),
4133 ])),
4134 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4135 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4136 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4137 );
4138 }
4139
4140 fn utf8_parquet() -> Bytes {
4142 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4143 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4144 let props = None;
4145 let mut parquet_data = vec![];
4147 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4148 writer.write(&batch).unwrap();
4149 writer.close().unwrap();
4150 Bytes::from(parquet_data)
4151 }
4152
4153 #[test]
4154 fn test_schema_error_bad_types() {
4155 let parquet_data = utf8_parquet();
4157
4158 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4160 "column1",
4161 arrow::datatypes::DataType::Int32,
4162 false,
4163 )]));
4164
4165 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4167 let err =
4168 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4169 .unwrap_err();
4170 assert_eq!(
4171 err.to_string(),
4172 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4173 )
4174 }
4175
4176 #[test]
4177 fn test_schema_error_bad_nullability() {
4178 let parquet_data = utf8_parquet();
4180
4181 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4183 "column1",
4184 arrow::datatypes::DataType::Utf8,
4185 true,
4186 )]));
4187
4188 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4190 let err =
4191 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4192 .unwrap_err();
4193 assert_eq!(
4194 err.to_string(),
4195 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4196 )
4197 }
4198
4199 #[test]
4200 fn test_read_binary_as_utf8() {
4201 let file = write_parquet_from_iter(vec![
4202 (
4203 "binary_to_utf8",
4204 Arc::new(BinaryArray::from(vec![
4205 b"one".as_ref(),
4206 b"two".as_ref(),
4207 b"three".as_ref(),
4208 ])) as ArrayRef,
4209 ),
4210 (
4211 "large_binary_to_large_utf8",
4212 Arc::new(LargeBinaryArray::from(vec![
4213 b"one".as_ref(),
4214 b"two".as_ref(),
4215 b"three".as_ref(),
4216 ])) as ArrayRef,
4217 ),
4218 (
4219 "binary_view_to_utf8_view",
4220 Arc::new(BinaryViewArray::from(vec![
4221 b"one".as_ref(),
4222 b"two".as_ref(),
4223 b"three".as_ref(),
4224 ])) as ArrayRef,
4225 ),
4226 ]);
4227 let supplied_fields = Fields::from(vec![
4228 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4229 Field::new(
4230 "large_binary_to_large_utf8",
4231 ArrowDataType::LargeUtf8,
4232 false,
4233 ),
4234 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4235 ]);
4236
4237 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4238 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4239 file.try_clone().unwrap(),
4240 options,
4241 )
4242 .expect("reader builder with schema")
4243 .build()
4244 .expect("reader with schema");
4245
4246 let batch = arrow_reader.next().unwrap().unwrap();
4247 assert_eq!(batch.num_columns(), 3);
4248 assert_eq!(batch.num_rows(), 3);
4249 assert_eq!(
4250 batch
4251 .column(0)
4252 .as_string::<i32>()
4253 .iter()
4254 .collect::<Vec<_>>(),
4255 vec![Some("one"), Some("two"), Some("three")]
4256 );
4257
4258 assert_eq!(
4259 batch
4260 .column(1)
4261 .as_string::<i64>()
4262 .iter()
4263 .collect::<Vec<_>>(),
4264 vec![Some("one"), Some("two"), Some("three")]
4265 );
4266
4267 assert_eq!(
4268 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4269 vec![Some("one"), Some("two"), Some("three")]
4270 );
4271 }
4272
4273 #[test]
4274 #[should_panic(expected = "Invalid UTF8 sequence at")]
4275 fn test_read_non_utf8_binary_as_utf8() {
4276 let file = write_parquet_from_iter(vec![(
4277 "non_utf8_binary",
4278 Arc::new(BinaryArray::from(vec![
4279 b"\xDE\x00\xFF".as_ref(),
4280 b"\xDE\x01\xAA".as_ref(),
4281 b"\xDE\x02\xFF".as_ref(),
4282 ])) as ArrayRef,
4283 )]);
4284 let supplied_fields = Fields::from(vec![Field::new(
4285 "non_utf8_binary",
4286 ArrowDataType::Utf8,
4287 false,
4288 )]);
4289
4290 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4291 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4292 file.try_clone().unwrap(),
4293 options,
4294 )
4295 .expect("reader builder with schema")
4296 .build()
4297 .expect("reader with schema");
4298 arrow_reader.next().unwrap().unwrap_err();
4299 }
4300
4301 #[test]
4302 fn test_with_schema() {
4303 let nested_fields = Fields::from(vec![
4304 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4305 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4306 ]);
4307
4308 let nested_arrays: Vec<ArrayRef> = vec![
4309 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4310 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4311 ];
4312
4313 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4314
4315 let file = write_parquet_from_iter(vec![
4316 (
4317 "int32_to_ts_second",
4318 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4319 ),
4320 (
4321 "date32_to_date64",
4322 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4323 ),
4324 ("nested", Arc::new(nested) as ArrayRef),
4325 ]);
4326
4327 let supplied_nested_fields = Fields::from(vec![
4328 Field::new(
4329 "utf8_to_dict",
4330 ArrowDataType::Dictionary(
4331 Box::new(ArrowDataType::Int32),
4332 Box::new(ArrowDataType::Utf8),
4333 ),
4334 false,
4335 ),
4336 Field::new(
4337 "int64_to_ts_nano",
4338 ArrowDataType::Timestamp(
4339 arrow::datatypes::TimeUnit::Nanosecond,
4340 Some("+10:00".into()),
4341 ),
4342 false,
4343 ),
4344 ]);
4345
4346 let supplied_schema = Arc::new(Schema::new(vec![
4347 Field::new(
4348 "int32_to_ts_second",
4349 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4350 false,
4351 ),
4352 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4353 Field::new(
4354 "nested",
4355 ArrowDataType::Struct(supplied_nested_fields),
4356 false,
4357 ),
4358 ]));
4359
4360 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4361 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4362 file.try_clone().unwrap(),
4363 options,
4364 )
4365 .expect("reader builder with schema")
4366 .build()
4367 .expect("reader with schema");
4368
4369 assert_eq!(arrow_reader.schema(), supplied_schema);
4370 let batch = arrow_reader.next().unwrap().unwrap();
4371 assert_eq!(batch.num_columns(), 3);
4372 assert_eq!(batch.num_rows(), 4);
4373 assert_eq!(
4374 batch
4375 .column(0)
4376 .as_any()
4377 .downcast_ref::<TimestampSecondArray>()
4378 .expect("downcast to timestamp second")
4379 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4380 .map(|v| v.to_string())
4381 .expect("value as datetime"),
4382 "1970-01-01 01:00:00 +01:00"
4383 );
4384 assert_eq!(
4385 batch
4386 .column(1)
4387 .as_any()
4388 .downcast_ref::<Date64Array>()
4389 .expect("downcast to date64")
4390 .value_as_date(0)
4391 .map(|v| v.to_string())
4392 .expect("value as date"),
4393 "1970-01-01"
4394 );
4395
4396 let nested = batch
4397 .column(2)
4398 .as_any()
4399 .downcast_ref::<StructArray>()
4400 .expect("downcast to struct");
4401
4402 let nested_dict = nested
4403 .column(0)
4404 .as_any()
4405 .downcast_ref::<Int32DictionaryArray>()
4406 .expect("downcast to dictionary");
4407
4408 assert_eq!(
4409 nested_dict
4410 .values()
4411 .as_any()
4412 .downcast_ref::<StringArray>()
4413 .expect("downcast to string")
4414 .iter()
4415 .collect::<Vec<_>>(),
4416 vec![Some("a"), Some("b")]
4417 );
4418
4419 assert_eq!(
4420 nested_dict.keys().iter().collect::<Vec<_>>(),
4421 vec![Some(0), Some(0), Some(0), Some(1)]
4422 );
4423
4424 assert_eq!(
4425 nested
4426 .column(1)
4427 .as_any()
4428 .downcast_ref::<TimestampNanosecondArray>()
4429 .expect("downcast to timestamp nanosecond")
4430 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4431 .map(|v| v.to_string())
4432 .expect("value as datetime"),
4433 "1970-01-01 10:00:00.000000001 +10:00"
4434 );
4435 }
4436
4437 #[test]
4438 fn test_empty_projection() {
4439 let testdata = arrow::util::test_util::parquet_test_data();
4440 let path = format!("{testdata}/alltypes_plain.parquet");
4441 let file = File::open(path).unwrap();
4442
4443 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4444 let file_metadata = builder.metadata().file_metadata();
4445 let expected_rows = file_metadata.num_rows() as usize;
4446
4447 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4448 let batch_reader = builder
4449 .with_projection(mask)
4450 .with_batch_size(2)
4451 .build()
4452 .unwrap();
4453
4454 let mut total_rows = 0;
4455 for maybe_batch in batch_reader {
4456 let batch = maybe_batch.unwrap();
4457 total_rows += batch.num_rows();
4458 assert_eq!(batch.num_columns(), 0);
4459 assert!(batch.num_rows() <= 2);
4460 }
4461
4462 assert_eq!(total_rows, expected_rows);
4463 }
4464
4465 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4466 let schema = Arc::new(Schema::new(vec![Field::new(
4467 "list",
4468 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4469 true,
4470 )]));
4471
4472 let mut buf = Vec::with_capacity(1024);
4473
4474 let mut writer = ArrowWriter::try_new(
4475 &mut buf,
4476 schema.clone(),
4477 Some(
4478 WriterProperties::builder()
4479 .set_max_row_group_row_count(Some(row_group_size))
4480 .build(),
4481 ),
4482 )
4483 .unwrap();
4484 for _ in 0..2 {
4485 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4486 for _ in 0..(batch_size) {
4487 list_builder.append(true);
4488 }
4489 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4490 .unwrap();
4491 writer.write(&batch).unwrap();
4492 }
4493 writer.close().unwrap();
4494
4495 let mut record_reader =
4496 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4497 assert_eq!(
4498 batch_size,
4499 record_reader.next().unwrap().unwrap().num_rows()
4500 );
4501 assert_eq!(
4502 batch_size,
4503 record_reader.next().unwrap().unwrap().num_rows()
4504 );
4505 }
4506
4507 #[test]
4508 fn test_row_group_exact_multiple() {
4509 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4510 test_row_group_batch(8, 8);
4511 test_row_group_batch(10, 8);
4512 test_row_group_batch(8, 10);
4513 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4514 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4515 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4516 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4517 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4518 }
4519
4520 fn get_expected_batches(
4523 column: &RecordBatch,
4524 selection: &RowSelection,
4525 batch_size: usize,
4526 ) -> Vec<RecordBatch> {
4527 let mut expected_batches = vec![];
4528
4529 let mut selection: VecDeque<_> = selection.clone().into();
4530 let mut row_offset = 0;
4531 let mut last_start = None;
4532 while row_offset < column.num_rows() && !selection.is_empty() {
4533 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4534 while batch_remaining > 0 && !selection.is_empty() {
4535 let (to_read, skip) = match selection.front_mut() {
4536 Some(selection) if selection.row_count > batch_remaining => {
4537 selection.row_count -= batch_remaining;
4538 (batch_remaining, selection.skip)
4539 }
4540 Some(_) => {
4541 let select = selection.pop_front().unwrap();
4542 (select.row_count, select.skip)
4543 }
4544 None => break,
4545 };
4546
4547 batch_remaining -= to_read;
4548
4549 match skip {
4550 true => {
4551 if let Some(last_start) = last_start.take() {
4552 expected_batches.push(column.slice(last_start, row_offset - last_start))
4553 }
4554 row_offset += to_read
4555 }
4556 false => {
4557 last_start.get_or_insert(row_offset);
4558 row_offset += to_read
4559 }
4560 }
4561 }
4562 }
4563
4564 if let Some(last_start) = last_start.take() {
4565 expected_batches.push(column.slice(last_start, row_offset - last_start))
4566 }
4567
4568 for batch in &expected_batches[..expected_batches.len() - 1] {
4570 assert_eq!(batch.num_rows(), batch_size);
4571 }
4572
4573 expected_batches
4574 }
4575
4576 fn create_test_selection(
4577 step_len: usize,
4578 total_len: usize,
4579 skip_first: bool,
4580 ) -> (RowSelection, usize) {
4581 let mut remaining = total_len;
4582 let mut skip = skip_first;
4583 let mut vec = vec![];
4584 let mut selected_count = 0;
4585 while remaining != 0 {
4586 let step = if remaining > step_len {
4587 step_len
4588 } else {
4589 remaining
4590 };
4591 vec.push(RowSelector {
4592 row_count: step,
4593 skip,
4594 });
4595 remaining -= step;
4596 if !skip {
4597 selected_count += step;
4598 }
4599 skip = !skip;
4600 }
4601 (vec.into(), selected_count)
4602 }
4603
4604 #[test]
4605 fn test_scan_row_with_selection() {
4606 let testdata = arrow::util::test_util::parquet_test_data();
4607 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4608 let test_file = File::open(&path).unwrap();
4609
4610 let mut serial_reader =
4611 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4612 let data = serial_reader.next().unwrap().unwrap();
4613
4614 let do_test = |batch_size: usize, selection_len: usize| {
4615 for skip_first in [false, true] {
4616 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4617
4618 let expected = get_expected_batches(&data, &selections, batch_size);
4619 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4620 assert_eq!(
4621 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4622 expected,
4623 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4624 );
4625 }
4626 };
4627
4628 do_test(1000, 1000);
4631
4632 do_test(20, 20);
4634
4635 do_test(20, 5);
4637
4638 do_test(20, 5);
4641
4642 fn create_skip_reader(
4643 test_file: &File,
4644 batch_size: usize,
4645 selections: RowSelection,
4646 ) -> ParquetRecordBatchReader {
4647 let options =
4648 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4649 let file = test_file.try_clone().unwrap();
4650 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4651 .unwrap()
4652 .with_batch_size(batch_size)
4653 .with_row_selection(selections)
4654 .build()
4655 .unwrap()
4656 }
4657 }
4658
4659 #[test]
4660 fn test_batch_size_overallocate() {
4661 let testdata = arrow::util::test_util::parquet_test_data();
4662 let path = format!("{testdata}/alltypes_plain.parquet");
4664 let test_file = File::open(path).unwrap();
4665
4666 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4667 let num_rows = builder.metadata.file_metadata().num_rows();
4668 let reader = builder
4669 .with_batch_size(1024)
4670 .with_projection(ProjectionMask::all())
4671 .build()
4672 .unwrap();
4673 assert_ne!(1024, num_rows);
4674 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4675 }
4676
4677 #[test]
4678 fn test_read_with_page_index_enabled() {
4679 let testdata = arrow::util::test_util::parquet_test_data();
4680
4681 {
4682 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4684 let test_file = File::open(path).unwrap();
4685 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4686 test_file,
4687 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4688 )
4689 .unwrap();
4690 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4691 let reader = builder.build().unwrap();
4692 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4693 assert_eq!(batches.len(), 8);
4694 }
4695
4696 {
4697 let path = format!("{testdata}/alltypes_plain.parquet");
4699 let test_file = File::open(path).unwrap();
4700 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4701 test_file,
4702 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4703 )
4704 .unwrap();
4705 assert!(builder.metadata().offset_index().is_none());
4708 let reader = builder.build().unwrap();
4709 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4710 assert_eq!(batches.len(), 1);
4711 }
4712 }
4713
4714 #[test]
4715 fn test_raw_repetition() {
4716 const MESSAGE_TYPE: &str = "
4717 message Log {
4718 OPTIONAL INT32 eventType;
4719 REPEATED INT32 category;
4720 REPEATED group filter {
4721 OPTIONAL INT32 error;
4722 }
4723 }
4724 ";
4725 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4726 let props = Default::default();
4727
4728 let mut buf = Vec::with_capacity(1024);
4729 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4730 let mut row_group_writer = writer.next_row_group().unwrap();
4731
4732 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4734 col_writer
4735 .typed::<Int32Type>()
4736 .write_batch(&[1], Some(&[1]), None)
4737 .unwrap();
4738 col_writer.close().unwrap();
4739 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4741 col_writer
4742 .typed::<Int32Type>()
4743 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4744 .unwrap();
4745 col_writer.close().unwrap();
4746 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4748 col_writer
4749 .typed::<Int32Type>()
4750 .write_batch(&[1], Some(&[1]), Some(&[0]))
4751 .unwrap();
4752 col_writer.close().unwrap();
4753
4754 let rg_md = row_group_writer.close().unwrap();
4755 assert_eq!(rg_md.num_rows(), 1);
4756 writer.close().unwrap();
4757
4758 let bytes = Bytes::from(buf);
4759
4760 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4761 let full = no_mask.next().unwrap().unwrap();
4762
4763 assert_eq!(full.num_columns(), 3);
4764
4765 for idx in 0..3 {
4766 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4767 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4768 let mut reader = b.with_projection(mask).build().unwrap();
4769 let projected = reader.next().unwrap().unwrap();
4770
4771 assert_eq!(projected.num_columns(), 1);
4772 assert_eq!(full.column(idx), projected.column(0));
4773 }
4774 }
4775
4776 #[test]
4777 fn test_read_lz4_raw() {
4778 let testdata = arrow::util::test_util::parquet_test_data();
4779 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4780 let file = File::open(path).unwrap();
4781
4782 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4783 .unwrap()
4784 .collect::<Result<Vec<_>, _>>()
4785 .unwrap();
4786 assert_eq!(batches.len(), 1);
4787 let batch = &batches[0];
4788
4789 assert_eq!(batch.num_columns(), 3);
4790 assert_eq!(batch.num_rows(), 4);
4791
4792 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4794 assert_eq!(
4795 a.values(),
4796 &[1593604800, 1593604800, 1593604801, 1593604801]
4797 );
4798
4799 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4800 let a: Vec<_> = a.iter().flatten().collect();
4801 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4802
4803 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4804 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4805 }
4806
4807 #[test]
4817 fn test_read_lz4_hadoop_fallback() {
4818 for file in [
4819 "hadoop_lz4_compressed.parquet",
4820 "non_hadoop_lz4_compressed.parquet",
4821 ] {
4822 let testdata = arrow::util::test_util::parquet_test_data();
4823 let path = format!("{testdata}/{file}");
4824 let file = File::open(path).unwrap();
4825 let expected_rows = 4;
4826
4827 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4828 .unwrap()
4829 .collect::<Result<Vec<_>, _>>()
4830 .unwrap();
4831 assert_eq!(batches.len(), 1);
4832 let batch = &batches[0];
4833
4834 assert_eq!(batch.num_columns(), 3);
4835 assert_eq!(batch.num_rows(), expected_rows);
4836
4837 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4838 assert_eq!(
4839 a.values(),
4840 &[1593604800, 1593604800, 1593604801, 1593604801]
4841 );
4842
4843 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4844 let b: Vec<_> = b.iter().flatten().collect();
4845 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4846
4847 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4848 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4849 }
4850 }
4851
4852 #[test]
4853 fn test_read_lz4_hadoop_large() {
4854 let testdata = arrow::util::test_util::parquet_test_data();
4855 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4856 let file = File::open(path).unwrap();
4857 let expected_rows = 10000;
4858
4859 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4860 .unwrap()
4861 .collect::<Result<Vec<_>, _>>()
4862 .unwrap();
4863 assert_eq!(batches.len(), 1);
4864 let batch = &batches[0];
4865
4866 assert_eq!(batch.num_columns(), 1);
4867 assert_eq!(batch.num_rows(), expected_rows);
4868
4869 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4870 let a: Vec<_> = a.iter().flatten().collect();
4871 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4872 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4873 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4874 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4875 }
4876
4877 #[test]
4878 #[cfg(feature = "snap")]
4879 fn test_read_nested_lists() {
4880 let testdata = arrow::util::test_util::parquet_test_data();
4881 let path = format!("{testdata}/nested_lists.snappy.parquet");
4882 let file = File::open(path).unwrap();
4883
4884 let f = file.try_clone().unwrap();
4885 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4886 let expected = reader.next().unwrap().unwrap();
4887 assert_eq!(expected.num_rows(), 3);
4888
4889 let selection = RowSelection::from(vec![
4890 RowSelector::skip(1),
4891 RowSelector::select(1),
4892 RowSelector::skip(1),
4893 ]);
4894 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4895 .unwrap()
4896 .with_row_selection(selection)
4897 .build()
4898 .unwrap();
4899
4900 let actual = reader.next().unwrap().unwrap();
4901 assert_eq!(actual.num_rows(), 1);
4902 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4903 }
4904
4905 #[test]
4906 fn test_arbitrary_decimal() {
4907 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4908 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4909 .with_precision_and_scale(19, 0)
4910 .unwrap();
4911 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4912 .with_precision_and_scale(12, 0)
4913 .unwrap();
4914 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4915 .with_precision_and_scale(17, 10)
4916 .unwrap();
4917
4918 let written = RecordBatch::try_from_iter([
4919 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4920 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4921 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4922 ])
4923 .unwrap();
4924
4925 let mut buffer = Vec::with_capacity(1024);
4926 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4927 writer.write(&written).unwrap();
4928 writer.close().unwrap();
4929
4930 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4931 .unwrap()
4932 .collect::<Result<Vec<_>, _>>()
4933 .unwrap();
4934
4935 assert_eq!(&written.slice(0, 8), &read[0]);
4936 }
4937
4938 #[test]
4939 fn test_list_skip() {
4940 let mut list = ListBuilder::new(Int32Builder::new());
4941 list.append_value([Some(1), Some(2)]);
4942 list.append_value([Some(3)]);
4943 list.append_value([Some(4)]);
4944 let list = list.finish();
4945 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4946
4947 let props = WriterProperties::builder()
4949 .set_data_page_row_count_limit(1)
4950 .set_write_batch_size(2)
4951 .build();
4952
4953 let mut buffer = Vec::with_capacity(1024);
4954 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4955 writer.write(&batch).unwrap();
4956 writer.close().unwrap();
4957
4958 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4959 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4960 .unwrap()
4961 .with_row_selection(selection.into())
4962 .build()
4963 .unwrap();
4964 let out = reader.next().unwrap().unwrap();
4965 assert_eq!(out.num_rows(), 1);
4966 assert_eq!(out, batch.slice(2, 1));
4967 }
4968
4969 fn test_decimal32_roundtrip() {
4970 let d = |values: Vec<i32>, p: u8| {
4971 let iter = values.into_iter();
4972 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4973 .with_precision_and_scale(p, 2)
4974 .unwrap()
4975 };
4976
4977 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4978 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4979
4980 let mut buffer = Vec::with_capacity(1024);
4981 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4982 writer.write(&batch).unwrap();
4983 writer.close().unwrap();
4984
4985 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4986 let t1 = builder.parquet_schema().columns()[0].physical_type();
4987 assert_eq!(t1, PhysicalType::INT32);
4988
4989 let mut reader = builder.build().unwrap();
4990 assert_eq!(batch.schema(), reader.schema());
4991
4992 let out = reader.next().unwrap().unwrap();
4993 assert_eq!(batch, out);
4994 }
4995
4996 fn test_decimal64_roundtrip() {
4997 let d = |values: Vec<i64>, p: u8| {
5001 let iter = values.into_iter();
5002 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5003 .with_precision_and_scale(p, 2)
5004 .unwrap()
5005 };
5006
5007 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5008 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5009 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5010
5011 let batch = RecordBatch::try_from_iter([
5012 ("d1", Arc::new(d1) as ArrayRef),
5013 ("d2", Arc::new(d2) as ArrayRef),
5014 ("d3", Arc::new(d3) as ArrayRef),
5015 ])
5016 .unwrap();
5017
5018 let mut buffer = Vec::with_capacity(1024);
5019 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5020 writer.write(&batch).unwrap();
5021 writer.close().unwrap();
5022
5023 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5024 let t1 = builder.parquet_schema().columns()[0].physical_type();
5025 assert_eq!(t1, PhysicalType::INT32);
5026 let t2 = builder.parquet_schema().columns()[1].physical_type();
5027 assert_eq!(t2, PhysicalType::INT64);
5028 let t3 = builder.parquet_schema().columns()[2].physical_type();
5029 assert_eq!(t3, PhysicalType::INT64);
5030
5031 let mut reader = builder.build().unwrap();
5032 assert_eq!(batch.schema(), reader.schema());
5033
5034 let out = reader.next().unwrap().unwrap();
5035 assert_eq!(batch, out);
5036 }
5037
5038 fn test_decimal_roundtrip<T: DecimalType>() {
5039 let d = |values: Vec<usize>, p: u8| {
5044 let iter = values.into_iter().map(T::Native::usize_as);
5045 PrimitiveArray::<T>::from_iter_values(iter)
5046 .with_precision_and_scale(p, 2)
5047 .unwrap()
5048 };
5049
5050 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5051 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5052 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5053 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5054
5055 let batch = RecordBatch::try_from_iter([
5056 ("d1", Arc::new(d1) as ArrayRef),
5057 ("d2", Arc::new(d2) as ArrayRef),
5058 ("d3", Arc::new(d3) as ArrayRef),
5059 ("d4", Arc::new(d4) as ArrayRef),
5060 ])
5061 .unwrap();
5062
5063 let mut buffer = Vec::with_capacity(1024);
5064 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5065 writer.write(&batch).unwrap();
5066 writer.close().unwrap();
5067
5068 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5069 let t1 = builder.parquet_schema().columns()[0].physical_type();
5070 assert_eq!(t1, PhysicalType::INT32);
5071 let t2 = builder.parquet_schema().columns()[1].physical_type();
5072 assert_eq!(t2, PhysicalType::INT64);
5073 let t3 = builder.parquet_schema().columns()[2].physical_type();
5074 assert_eq!(t3, PhysicalType::INT64);
5075 let t4 = builder.parquet_schema().columns()[3].physical_type();
5076 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5077
5078 let mut reader = builder.build().unwrap();
5079 assert_eq!(batch.schema(), reader.schema());
5080
5081 let out = reader.next().unwrap().unwrap();
5082 assert_eq!(batch, out);
5083 }
5084
5085 #[test]
5086 fn test_decimal() {
5087 test_decimal32_roundtrip();
5088 test_decimal64_roundtrip();
5089 test_decimal_roundtrip::<Decimal128Type>();
5090 test_decimal_roundtrip::<Decimal256Type>();
5091 }
5092
5093 #[test]
5094 fn test_list_selection() {
5095 let schema = Arc::new(Schema::new(vec![Field::new_list(
5096 "list",
5097 Field::new_list_field(ArrowDataType::Utf8, true),
5098 false,
5099 )]));
5100 let mut buf = Vec::with_capacity(1024);
5101
5102 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5103
5104 for i in 0..2 {
5105 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5106 for j in 0..1024 {
5107 list_a_builder.values().append_value(format!("{i} {j}"));
5108 list_a_builder.append(true);
5109 }
5110 let batch =
5111 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5112 .unwrap();
5113 writer.write(&batch).unwrap();
5114 }
5115 let _metadata = writer.close().unwrap();
5116
5117 let buf = Bytes::from(buf);
5118 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5119 .unwrap()
5120 .with_row_selection(RowSelection::from(vec![
5121 RowSelector::skip(100),
5122 RowSelector::select(924),
5123 RowSelector::skip(100),
5124 RowSelector::select(924),
5125 ]))
5126 .build()
5127 .unwrap();
5128
5129 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5130 let batch = concat_batches(&schema, &batches).unwrap();
5131
5132 assert_eq!(batch.num_rows(), 924 * 2);
5133 let list = batch.column(0).as_list::<i32>();
5134
5135 for w in list.value_offsets().windows(2) {
5136 assert_eq!(w[0] + 1, w[1])
5137 }
5138 let mut values = list.values().as_string::<i32>().iter();
5139
5140 for i in 0..2 {
5141 for j in 100..1024 {
5142 let expected = format!("{i} {j}");
5143 assert_eq!(values.next().unwrap().unwrap(), &expected);
5144 }
5145 }
5146 }
5147
5148 #[test]
5149 fn test_list_selection_fuzz() {
5150 let mut rng = rng();
5151 let schema = Arc::new(Schema::new(vec![Field::new_list(
5152 "list",
5153 Field::new_list(
5154 Field::LIST_FIELD_DEFAULT_NAME,
5155 Field::new_list_field(ArrowDataType::Int32, true),
5156 true,
5157 ),
5158 true,
5159 )]));
5160 let mut buf = Vec::with_capacity(1024);
5161 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5162
5163 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5164
5165 for _ in 0..2048 {
5166 if rng.random_bool(0.2) {
5167 list_a_builder.append(false);
5168 continue;
5169 }
5170
5171 let list_a_len = rng.random_range(0..10);
5172 let list_b_builder = list_a_builder.values();
5173
5174 for _ in 0..list_a_len {
5175 if rng.random_bool(0.2) {
5176 list_b_builder.append(false);
5177 continue;
5178 }
5179
5180 let list_b_len = rng.random_range(0..10);
5181 let int_builder = list_b_builder.values();
5182 for _ in 0..list_b_len {
5183 match rng.random_bool(0.2) {
5184 true => int_builder.append_null(),
5185 false => int_builder.append_value(rng.random()),
5186 }
5187 }
5188 list_b_builder.append(true)
5189 }
5190 list_a_builder.append(true);
5191 }
5192
5193 let array = Arc::new(list_a_builder.finish());
5194 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5195
5196 writer.write(&batch).unwrap();
5197 let _metadata = writer.close().unwrap();
5198
5199 let buf = Bytes::from(buf);
5200
5201 let cases = [
5202 vec![
5203 RowSelector::skip(100),
5204 RowSelector::select(924),
5205 RowSelector::skip(100),
5206 RowSelector::select(924),
5207 ],
5208 vec![
5209 RowSelector::select(924),
5210 RowSelector::skip(100),
5211 RowSelector::select(924),
5212 RowSelector::skip(100),
5213 ],
5214 vec![
5215 RowSelector::skip(1023),
5216 RowSelector::select(1),
5217 RowSelector::skip(1023),
5218 RowSelector::select(1),
5219 ],
5220 vec![
5221 RowSelector::select(1),
5222 RowSelector::skip(1023),
5223 RowSelector::select(1),
5224 RowSelector::skip(1023),
5225 ],
5226 ];
5227
5228 for batch_size in [100, 1024, 2048] {
5229 for selection in &cases {
5230 let selection = RowSelection::from(selection.clone());
5231 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5232 .unwrap()
5233 .with_row_selection(selection.clone())
5234 .with_batch_size(batch_size)
5235 .build()
5236 .unwrap();
5237
5238 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5239 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5240 assert_eq!(actual.num_rows(), selection.row_count());
5241
5242 let mut batch_offset = 0;
5243 let mut actual_offset = 0;
5244 for selector in selection.iter() {
5245 if selector.skip {
5246 batch_offset += selector.row_count;
5247 continue;
5248 }
5249
5250 assert_eq!(
5251 batch.slice(batch_offset, selector.row_count),
5252 actual.slice(actual_offset, selector.row_count)
5253 );
5254
5255 batch_offset += selector.row_count;
5256 actual_offset += selector.row_count;
5257 }
5258 }
5259 }
5260 }
5261
5262 #[test]
5263 fn test_read_old_nested_list() {
5264 use arrow::datatypes::DataType;
5265 use arrow::datatypes::ToByteSlice;
5266
5267 let testdata = arrow::util::test_util::parquet_test_data();
5268 let path = format!("{testdata}/old_list_structure.parquet");
5277 let test_file = File::open(path).unwrap();
5278
5279 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5281
5282 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5284
5285 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5287 "array",
5288 DataType::Int32,
5289 false,
5290 ))))
5291 .len(2)
5292 .add_buffer(a_value_offsets)
5293 .add_child_data(a_values.into_data())
5294 .build()
5295 .unwrap();
5296 let a = ListArray::from(a_list_data);
5297
5298 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5299 let mut reader = builder.build().unwrap();
5300 let out = reader.next().unwrap().unwrap();
5301 assert_eq!(out.num_rows(), 1);
5302 assert_eq!(out.num_columns(), 1);
5303 let c0 = out.column(0);
5305 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5306 let r0 = c0arr.value(0);
5308 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5309 assert_eq!(r0arr, &a);
5310 }
5311
5312 #[test]
5313 fn test_map_no_value() {
5314 let testdata = arrow::util::test_util::parquet_test_data();
5334 let path = format!("{testdata}/map_no_value.parquet");
5335 let file = File::open(path).unwrap();
5336
5337 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5338 .unwrap()
5339 .build()
5340 .unwrap();
5341 let out = reader.next().unwrap().unwrap();
5342 assert_eq!(out.num_rows(), 3);
5343 assert_eq!(out.num_columns(), 3);
5344 let c0 = out.column(1).as_list::<i32>();
5346 let c1 = out.column(2).as_list::<i32>();
5347 assert_eq!(c0.len(), c1.len());
5348 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5349 }
5350
5351 #[test]
5352 fn test_read_unknown_logical_type() {
5353 let testdata = arrow::util::test_util::parquet_test_data();
5354 let path = format!("{testdata}/unknown-logical-type.parquet");
5355 let test_file = File::open(path).unwrap();
5356
5357 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5358 .expect("Error creating reader builder");
5359
5360 let schema = builder.metadata().file_metadata().schema_descr();
5361 assert_eq!(
5362 schema.column(0).logical_type_ref(),
5363 Some(&LogicalType::String)
5364 );
5365 assert_eq!(
5366 schema.column(1).logical_type_ref(),
5367 Some(&LogicalType::_Unknown { field_id: 2555 })
5368 );
5369 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5370
5371 let mut reader = builder.build().unwrap();
5372 let out = reader.next().unwrap().unwrap();
5373 assert_eq!(out.num_rows(), 3);
5374 assert_eq!(out.num_columns(), 2);
5375 }
5376
5377 #[test]
5378 fn test_read_row_numbers() {
5379 let file = write_parquet_from_iter(vec![(
5380 "value",
5381 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5382 )]);
5383 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5384
5385 let row_number_field = Arc::new(
5386 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5387 );
5388
5389 let options = ArrowReaderOptions::new()
5390 .with_schema(Arc::new(Schema::new(supplied_fields)))
5391 .with_virtual_columns(vec![row_number_field.clone()])
5392 .unwrap();
5393 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5394 file.try_clone().unwrap(),
5395 options,
5396 )
5397 .expect("reader builder with schema")
5398 .build()
5399 .expect("reader with schema");
5400
5401 let batch = arrow_reader.next().unwrap().unwrap();
5402 let schema = Arc::new(Schema::new(vec![
5403 Field::new("value", ArrowDataType::Int64, false),
5404 (*row_number_field).clone(),
5405 ]));
5406
5407 assert_eq!(batch.schema(), schema);
5408 assert_eq!(batch.num_columns(), 2);
5409 assert_eq!(batch.num_rows(), 3);
5410 assert_eq!(
5411 batch
5412 .column(0)
5413 .as_primitive::<types::Int64Type>()
5414 .iter()
5415 .collect::<Vec<_>>(),
5416 vec![Some(1), Some(2), Some(3)]
5417 );
5418 assert_eq!(
5419 batch
5420 .column(1)
5421 .as_primitive::<types::Int64Type>()
5422 .iter()
5423 .collect::<Vec<_>>(),
5424 vec![Some(0), Some(1), Some(2)]
5425 );
5426 }
5427
5428 #[test]
5429 fn test_read_only_row_numbers() {
5430 let file = write_parquet_from_iter(vec![(
5431 "value",
5432 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5433 )]);
5434 let row_number_field = Arc::new(
5435 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5436 );
5437 let options = ArrowReaderOptions::new()
5438 .with_virtual_columns(vec![row_number_field.clone()])
5439 .unwrap();
5440 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5441 let num_columns = metadata
5442 .metadata
5443 .file_metadata()
5444 .schema_descr()
5445 .num_columns();
5446
5447 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5448 .with_projection(ProjectionMask::none(num_columns))
5449 .build()
5450 .expect("reader with schema");
5451
5452 let batch = arrow_reader.next().unwrap().unwrap();
5453 let schema = Arc::new(Schema::new(vec![row_number_field]));
5454
5455 assert_eq!(batch.schema(), schema);
5456 assert_eq!(batch.num_columns(), 1);
5457 assert_eq!(batch.num_rows(), 3);
5458 assert_eq!(
5459 batch
5460 .column(0)
5461 .as_primitive::<types::Int64Type>()
5462 .iter()
5463 .collect::<Vec<_>>(),
5464 vec![Some(0), Some(1), Some(2)]
5465 );
5466 }
5467
5468 #[test]
5469 fn test_read_row_numbers_row_group_order() -> Result<()> {
5470 let array = Int64Array::from_iter_values(5000..5100);
5472 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5473 let mut buffer = Vec::new();
5474 let options = WriterProperties::builder()
5475 .set_max_row_group_row_count(Some(50))
5476 .build();
5477 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5478 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5480 writer.write(&batch_chunk)?;
5481 }
5482 writer.close()?;
5483
5484 let row_number_field = Arc::new(
5485 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5486 );
5487
5488 let buffer = Bytes::from(buffer);
5489
5490 let options =
5491 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5492
5493 let arrow_reader =
5495 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5496 .build()?;
5497
5498 assert_eq!(
5499 ValuesAndRowNumbers {
5500 values: (5000..5100).collect(),
5501 row_numbers: (0..100).collect()
5502 },
5503 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5504 );
5505
5506 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5508 .with_row_groups(vec![1, 0])
5509 .build()?;
5510
5511 assert_eq!(
5512 ValuesAndRowNumbers {
5513 values: (5050..5100).chain(5000..5050).collect(),
5514 row_numbers: (50..100).chain(0..50).collect(),
5515 },
5516 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5517 );
5518
5519 Ok(())
5520 }
5521
5522 #[derive(Debug, PartialEq)]
5523 struct ValuesAndRowNumbers {
5524 values: Vec<i64>,
5525 row_numbers: Vec<i64>,
5526 }
5527 impl ValuesAndRowNumbers {
5528 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5529 let mut values = vec![];
5530 let mut row_numbers = vec![];
5531 for batch in reader {
5532 let batch = batch.expect("Could not read batch");
5533 values.extend(
5534 batch
5535 .column_by_name("col")
5536 .expect("Could not get col column")
5537 .as_primitive::<arrow::datatypes::Int64Type>()
5538 .iter()
5539 .map(|v| v.expect("Could not get value")),
5540 );
5541
5542 row_numbers.extend(
5543 batch
5544 .column_by_name("row_number")
5545 .expect("Could not get row_number column")
5546 .as_primitive::<arrow::datatypes::Int64Type>()
5547 .iter()
5548 .map(|v| v.expect("Could not get row number"))
5549 .collect::<Vec<_>>(),
5550 );
5551 }
5552 Self {
5553 values,
5554 row_numbers,
5555 }
5556 }
5557 }
5558
5559 #[test]
5560 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5561 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5563 assert_eq!(
5564 ArrowReaderOptions::new()
5565 .with_virtual_columns(vec![regular_field])
5566 .unwrap_err()
5567 .to_string(),
5568 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5569 );
5570 }
5571
5572 #[test]
5573 fn test_row_numbers_with_multiple_row_groups() {
5574 test_row_numbers_with_multiple_row_groups_helper(
5575 false,
5576 |path, selection, _row_filter, batch_size| {
5577 let file = File::open(path).unwrap();
5578 let row_number_field = Arc::new(
5579 Field::new("row_number", ArrowDataType::Int64, false)
5580 .with_extension_type(RowNumber),
5581 );
5582 let options = ArrowReaderOptions::new()
5583 .with_virtual_columns(vec![row_number_field])
5584 .unwrap();
5585 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5586 .unwrap()
5587 .with_row_selection(selection)
5588 .with_batch_size(batch_size)
5589 .build()
5590 .expect("Could not create reader");
5591 reader
5592 .collect::<Result<Vec<_>, _>>()
5593 .expect("Could not read")
5594 },
5595 );
5596 }
5597
5598 #[test]
5599 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5600 test_row_numbers_with_multiple_row_groups_helper(
5601 true,
5602 |path, selection, row_filter, batch_size| {
5603 let file = File::open(path).unwrap();
5604 let row_number_field = Arc::new(
5605 Field::new("row_number", ArrowDataType::Int64, false)
5606 .with_extension_type(RowNumber),
5607 );
5608 let options = ArrowReaderOptions::new()
5609 .with_virtual_columns(vec![row_number_field])
5610 .unwrap();
5611 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5612 .unwrap()
5613 .with_row_selection(selection)
5614 .with_batch_size(batch_size)
5615 .with_row_filter(row_filter.expect("No filter"))
5616 .build()
5617 .expect("Could not create reader");
5618 reader
5619 .collect::<Result<Vec<_>, _>>()
5620 .expect("Could not read")
5621 },
5622 );
5623 }
5624
5625 #[test]
5626 fn test_read_row_group_indices() {
5627 let array1 = Int64Array::from(vec![1, 2]);
5629 let array2 = Int64Array::from(vec![3, 4]);
5630 let array3 = Int64Array::from(vec![5, 6]);
5631
5632 let batch1 =
5633 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5634 let batch2 =
5635 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5636 let batch3 =
5637 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5638
5639 let mut buffer = Vec::new();
5640 let options = WriterProperties::builder()
5641 .set_max_row_group_row_count(Some(2))
5642 .build();
5643 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5644 writer.write(&batch1).unwrap();
5645 writer.write(&batch2).unwrap();
5646 writer.write(&batch3).unwrap();
5647 writer.close().unwrap();
5648
5649 let file = Bytes::from(buffer);
5650 let row_group_index_field = Arc::new(
5651 Field::new("row_group_index", ArrowDataType::Int64, false)
5652 .with_extension_type(RowGroupIndex),
5653 );
5654
5655 let options = ArrowReaderOptions::new()
5656 .with_virtual_columns(vec![row_group_index_field.clone()])
5657 .unwrap();
5658 let mut arrow_reader =
5659 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5660 .expect("reader builder with virtual columns")
5661 .build()
5662 .expect("reader with virtual columns");
5663
5664 let batch = arrow_reader.next().unwrap().unwrap();
5665
5666 assert_eq!(batch.num_columns(), 2);
5667 assert_eq!(batch.num_rows(), 6);
5668
5669 assert_eq!(
5670 batch
5671 .column(0)
5672 .as_primitive::<types::Int64Type>()
5673 .iter()
5674 .collect::<Vec<_>>(),
5675 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5676 );
5677
5678 assert_eq!(
5679 batch
5680 .column(1)
5681 .as_primitive::<types::Int64Type>()
5682 .iter()
5683 .collect::<Vec<_>>(),
5684 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5685 );
5686 }
5687
5688 #[test]
5689 fn test_read_only_row_group_indices() {
5690 let array1 = Int64Array::from(vec![1, 2, 3]);
5691 let array2 = Int64Array::from(vec![4, 5]);
5692
5693 let batch1 =
5694 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5695 let batch2 =
5696 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5697
5698 let mut buffer = Vec::new();
5699 let options = WriterProperties::builder()
5700 .set_max_row_group_row_count(Some(3))
5701 .build();
5702 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5703 writer.write(&batch1).unwrap();
5704 writer.write(&batch2).unwrap();
5705 writer.close().unwrap();
5706
5707 let file = Bytes::from(buffer);
5708 let row_group_index_field = Arc::new(
5709 Field::new("row_group_index", ArrowDataType::Int64, false)
5710 .with_extension_type(RowGroupIndex),
5711 );
5712
5713 let options = ArrowReaderOptions::new()
5714 .with_virtual_columns(vec![row_group_index_field.clone()])
5715 .unwrap();
5716 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5717 let num_columns = metadata
5718 .metadata
5719 .file_metadata()
5720 .schema_descr()
5721 .num_columns();
5722
5723 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5724 .with_projection(ProjectionMask::none(num_columns))
5725 .build()
5726 .expect("reader with virtual columns only");
5727
5728 let batch = arrow_reader.next().unwrap().unwrap();
5729 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5730
5731 assert_eq!(batch.schema(), schema);
5732 assert_eq!(batch.num_columns(), 1);
5733 assert_eq!(batch.num_rows(), 5);
5734
5735 assert_eq!(
5736 batch
5737 .column(0)
5738 .as_primitive::<types::Int64Type>()
5739 .iter()
5740 .collect::<Vec<_>>(),
5741 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5742 );
5743 }
5744
5745 #[test]
5746 fn test_read_row_group_indices_with_selection() -> Result<()> {
5747 let mut buffer = Vec::new();
5748 let options = WriterProperties::builder()
5749 .set_max_row_group_row_count(Some(10))
5750 .build();
5751
5752 let schema = Arc::new(Schema::new(vec![Field::new(
5753 "value",
5754 ArrowDataType::Int64,
5755 false,
5756 )]));
5757
5758 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5759
5760 for i in 0..3 {
5762 let start = i * 10;
5763 let array = Int64Array::from_iter_values(start..start + 10);
5764 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5765 writer.write(&batch)?;
5766 }
5767 writer.close()?;
5768
5769 let file = Bytes::from(buffer);
5770 let row_group_index_field = Arc::new(
5771 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5772 );
5773
5774 let options =
5775 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5776
5777 let arrow_reader =
5779 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5780 .with_row_groups(vec![2, 1, 0])
5781 .build()?;
5782
5783 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5784 let combined = concat_batches(&batches[0].schema(), &batches)?;
5785
5786 let values = combined.column(0).as_primitive::<types::Int64Type>();
5787 let first_val = values.value(0);
5788 let last_val = values.value(combined.num_rows() - 1);
5789 assert_eq!(first_val, 20);
5791 assert_eq!(last_val, 9);
5793
5794 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5795 assert_eq!(rg_indices.value(0), 2);
5796 assert_eq!(rg_indices.value(10), 1);
5797 assert_eq!(rg_indices.value(20), 0);
5798
5799 Ok(())
5800 }
5801
5802 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5803 use_filter: bool,
5804 test_case: F,
5805 ) where
5806 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5807 {
5808 let seed: u64 = random();
5809 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5810 let mut rng = StdRng::seed_from_u64(seed);
5811
5812 use tempfile::TempDir;
5813 let tempdir = TempDir::new().expect("Could not create temp dir");
5814
5815 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5816
5817 let path = tempdir.path().join("test.parquet");
5818 std::fs::write(&path, bytes).expect("Could not write file");
5819
5820 let mut case = vec![];
5821 let mut remaining = metadata.file_metadata().num_rows();
5822 while remaining > 0 {
5823 let row_count = rng.random_range(1..=remaining);
5824 remaining -= row_count;
5825 case.push(RowSelector {
5826 row_count: row_count as usize,
5827 skip: rng.random_bool(0.5),
5828 });
5829 }
5830
5831 let filter = use_filter.then(|| {
5832 let filter = (0..metadata.file_metadata().num_rows())
5833 .map(|_| rng.random_bool(0.99))
5834 .collect::<Vec<_>>();
5835 let mut filter_offset = 0;
5836 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5837 ProjectionMask::all(),
5838 move |b| {
5839 let array = BooleanArray::from_iter(
5840 filter
5841 .iter()
5842 .skip(filter_offset)
5843 .take(b.num_rows())
5844 .map(|x| Some(*x)),
5845 );
5846 filter_offset += b.num_rows();
5847 Ok(array)
5848 },
5849 ))])
5850 });
5851
5852 let selection = RowSelection::from(case);
5853 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5854
5855 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5856 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5857 return;
5858 }
5859 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5860 .expect("Failed to concatenate");
5861 let values = actual
5863 .column(0)
5864 .as_primitive::<types::Int64Type>()
5865 .iter()
5866 .collect::<Vec<_>>();
5867 let row_numbers = actual
5868 .column(1)
5869 .as_primitive::<types::Int64Type>()
5870 .iter()
5871 .collect::<Vec<_>>();
5872 assert_eq!(
5873 row_numbers
5874 .into_iter()
5875 .map(|number| number.map(|number| number + 1))
5876 .collect::<Vec<_>>(),
5877 values
5878 );
5879 }
5880
5881 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5882 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5883 "value",
5884 ArrowDataType::Int64,
5885 false,
5886 )])));
5887
5888 let mut buf = Vec::with_capacity(1024);
5889 let mut writer =
5890 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5891
5892 let mut values = 1..=rng.random_range(1..4096);
5893 while !values.is_empty() {
5894 let batch_values = values
5895 .by_ref()
5896 .take(rng.random_range(1..4096))
5897 .collect::<Vec<_>>();
5898 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5899 let batch =
5900 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5901 writer.write(&batch).expect("Could not write batch");
5902 writer.flush().expect("Could not flush");
5903 }
5904 let metadata = writer.close().expect("Could not close writer");
5905
5906 (Bytes::from(buf), metadata)
5907 }
5908}