1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{PredicateOptions, ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub const DEFAULT_BATCH_SIZE: usize = 1024;
62
63pub struct ArrowReaderBuilder<T> {
111 pub(crate) input: T,
120
121 pub(crate) metadata: Arc<ParquetMetaData>,
122
123 pub(crate) schema: SchemaRef,
124
125 pub(crate) fields: Option<Arc<ParquetField>>,
126
127 pub(crate) batch_size: usize,
128
129 pub(crate) row_groups: Option<Vec<usize>>,
130
131 pub(crate) projection: ProjectionMask,
132
133 pub(crate) filter: Option<RowFilter>,
134
135 pub(crate) selection: Option<RowSelection>,
136
137 pub(crate) row_selection_policy: RowSelectionPolicy,
138
139 pub(crate) limit: Option<usize>,
140
141 pub(crate) offset: Option<usize>,
142
143 pub(crate) metrics: ArrowReaderMetrics,
144
145 pub(crate) max_predicate_cache_size: usize,
146}
147
148impl<T: Debug> Debug for ArrowReaderBuilder<T> {
149 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("ArrowReaderBuilder<T>")
151 .field("input", &self.input)
152 .field("metadata", &self.metadata)
153 .field("schema", &self.schema)
154 .field("fields", &self.fields)
155 .field("batch_size", &self.batch_size)
156 .field("row_groups", &self.row_groups)
157 .field("projection", &self.projection)
158 .field("filter", &self.filter)
159 .field("selection", &self.selection)
160 .field("row_selection_policy", &self.row_selection_policy)
161 .field("limit", &self.limit)
162 .field("offset", &self.offset)
163 .field("metrics", &self.metrics)
164 .finish()
165 }
166}
167
168impl<T> ArrowReaderBuilder<T> {
169 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
170 Self {
171 input,
172 metadata: metadata.metadata,
173 schema: metadata.schema,
174 fields: metadata.fields,
175 batch_size: DEFAULT_BATCH_SIZE,
176 row_groups: None,
177 projection: ProjectionMask::all(),
178 filter: None,
179 selection: None,
180 row_selection_policy: RowSelectionPolicy::default(),
181 limit: None,
182 offset: None,
183 metrics: ArrowReaderMetrics::Disabled,
184 max_predicate_cache_size: 100 * 1024 * 1024, }
186 }
187
188 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
190 &self.metadata
191 }
192
193 pub fn parquet_schema(&self) -> &SchemaDescriptor {
195 self.metadata.file_metadata().schema_descr()
196 }
197
198 pub fn schema(&self) -> &SchemaRef {
200 &self.schema
201 }
202
203 pub fn with_batch_size(self, batch_size: usize) -> Self {
206 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
208 Self { batch_size, ..self }
209 }
210
211 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
215 Self {
216 row_groups: Some(row_groups),
217 ..self
218 }
219 }
220
221 pub fn with_projection(self, mask: ProjectionMask) -> Self {
223 Self {
224 projection: mask,
225 ..self
226 }
227 }
228
229 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
233 Self {
234 row_selection_policy: policy,
235 ..self
236 }
237 }
238
239 pub fn with_row_selection(self, selection: RowSelection) -> Self {
299 Self {
300 selection: Some(selection),
301 ..self
302 }
303 }
304
305 pub fn with_row_filter(self, filter: RowFilter) -> Self {
345 Self {
346 filter: Some(filter),
347 ..self
348 }
349 }
350
351 pub fn with_limit(self, limit: usize) -> Self {
359 Self {
360 limit: Some(limit),
361 ..self
362 }
363 }
364
365 pub fn with_offset(self, offset: usize) -> Self {
373 Self {
374 offset: Some(offset),
375 ..self
376 }
377 }
378
379 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
414 Self { metrics, ..self }
415 }
416
417 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
432 Self {
433 max_predicate_cache_size,
434 ..self
435 }
436 }
437}
438
439#[derive(Debug, Clone, Default)]
454pub struct ArrowReaderOptions {
455 skip_arrow_metadata: bool,
457 supplied_schema: Option<SchemaRef>,
462
463 pub(crate) column_index: PageIndexPolicy,
464 pub(crate) offset_index: PageIndexPolicy,
465
466 metadata_options: ParquetMetaDataOptions,
468 #[cfg(feature = "encryption")]
470 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
471
472 virtual_columns: Vec<FieldRef>,
473}
474
475impl ArrowReaderOptions {
476 pub fn new() -> Self {
478 Self::default()
479 }
480
481 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
488 Self {
489 skip_arrow_metadata,
490 ..self
491 }
492 }
493
494 pub fn with_schema(self, schema: SchemaRef) -> Self {
603 Self {
604 supplied_schema: Some(schema),
605 skip_arrow_metadata: true,
606 ..self
607 }
608 }
609
610 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
611 pub fn with_page_index(self, page_index: bool) -> Self {
624 self.with_page_index_policy(PageIndexPolicy::from(page_index))
625 }
626
627 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
635 self.with_column_index_policy(policy)
636 .with_offset_index_policy(policy)
637 }
638
639 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
646 self.column_index = policy;
647 self
648 }
649
650 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
657 self.offset_index = policy;
658 self
659 }
660
661 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
667 self.metadata_options.set_schema(schema);
668 self
669 }
670
671 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
682 self.metadata_options.set_encoding_stats_as_mask(val);
683 self
684 }
685
686 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
691 self.metadata_options.set_encoding_stats_policy(policy);
692 self
693 }
694
695 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
700 self.metadata_options.set_column_stats_policy(policy);
701 self
702 }
703
704 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
709 self.metadata_options.set_size_stats_policy(policy);
710 self
711 }
712
713 #[cfg(feature = "encryption")]
717 pub fn with_file_decryption_properties(
718 self,
719 file_decryption_properties: Arc<FileDecryptionProperties>,
720 ) -> Self {
721 Self {
722 file_decryption_properties: Some(file_decryption_properties),
723 ..self
724 }
725 }
726
727 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
780 for field in &virtual_columns {
782 if !is_virtual_column(field) {
783 return Err(ParquetError::General(format!(
784 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
785 field.name()
786 )));
787 }
788 }
789 Ok(Self {
790 virtual_columns,
791 ..self
792 })
793 }
794
795 #[deprecated(
796 since = "57.2.0",
797 note = "Use `column_index_policy` or `offset_index_policy` instead"
798 )]
799 pub fn page_index(&self) -> bool {
806 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
807 }
808
809 pub fn offset_index_policy(&self) -> PageIndexPolicy {
814 self.offset_index
815 }
816
817 pub fn column_index_policy(&self) -> PageIndexPolicy {
822 self.column_index
823 }
824
825 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
827 &self.metadata_options
828 }
829
830 #[cfg(feature = "encryption")]
835 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
836 self.file_decryption_properties.as_ref()
837 }
838}
839
840#[derive(Debug, Clone)]
855pub struct ArrowReaderMetadata {
856 pub(crate) metadata: Arc<ParquetMetaData>,
858 pub(crate) schema: SchemaRef,
860 pub(crate) fields: Option<Arc<ParquetField>>,
862}
863
864impl ArrowReaderMetadata {
865 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
877 let metadata = ParquetMetaDataReader::new()
878 .with_column_index_policy(options.column_index)
879 .with_offset_index_policy(options.offset_index)
880 .with_metadata_options(Some(options.metadata_options.clone()));
881 #[cfg(feature = "encryption")]
882 let metadata = metadata.with_decryption_properties(
883 options.file_decryption_properties.as_ref().map(Arc::clone),
884 );
885 let metadata = metadata.parse_and_finish(reader)?;
886 Self::try_new(Arc::new(metadata), options)
887 }
888
889 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
897 match options.supplied_schema {
898 Some(supplied_schema) => Self::with_supplied_schema(
899 metadata,
900 supplied_schema.clone(),
901 &options.virtual_columns,
902 ),
903 None => {
904 let kv_metadata = match options.skip_arrow_metadata {
905 true => None,
906 false => metadata.file_metadata().key_value_metadata(),
907 };
908
909 let (schema, fields) = parquet_to_arrow_schema_and_fields(
910 metadata.file_metadata().schema_descr(),
911 ProjectionMask::all(),
912 kv_metadata,
913 &options.virtual_columns,
914 )?;
915
916 Ok(Self {
917 metadata,
918 schema: Arc::new(schema),
919 fields: fields.map(Arc::new),
920 })
921 }
922 }
923 }
924
925 fn with_supplied_schema(
926 metadata: Arc<ParquetMetaData>,
927 supplied_schema: SchemaRef,
928 virtual_columns: &[FieldRef],
929 ) -> Result<Self> {
930 let parquet_schema = metadata.file_metadata().schema_descr();
931 let field_levels = parquet_to_arrow_field_levels_with_virtual(
932 parquet_schema,
933 ProjectionMask::all(),
934 Some(supplied_schema.fields()),
935 virtual_columns,
936 )?;
937 let fields = field_levels.fields;
938 let inferred_len = fields.len();
939 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
940 if inferred_len != supplied_len {
944 return Err(arrow_err!(format!(
945 "Incompatible supplied Arrow schema: expected {} columns received {}",
946 inferred_len, supplied_len
947 )));
948 }
949
950 let mut errors = Vec::new();
951
952 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
953
954 for (field1, field2) in field_iter {
955 if field1.data_type() != field2.data_type() {
956 errors.push(format!(
957 "data type mismatch for field {}: requested {} but found {}",
958 field1.name(),
959 field1.data_type(),
960 field2.data_type()
961 ));
962 }
963 if field1.is_nullable() != field2.is_nullable() {
964 errors.push(format!(
965 "nullability mismatch for field {}: expected {:?} but found {:?}",
966 field1.name(),
967 field1.is_nullable(),
968 field2.is_nullable()
969 ));
970 }
971 if field1.metadata() != field2.metadata() {
972 errors.push(format!(
973 "metadata mismatch for field {}: expected {:?} but found {:?}",
974 field1.name(),
975 field1.metadata(),
976 field2.metadata()
977 ));
978 }
979 }
980
981 if !errors.is_empty() {
982 let message = errors.join(", ");
983 return Err(ParquetError::ArrowError(format!(
984 "Incompatible supplied Arrow schema: {message}",
985 )));
986 }
987
988 Ok(Self {
989 metadata,
990 schema: supplied_schema,
991 fields: field_levels.levels.map(Arc::new),
992 })
993 }
994
995 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
997 &self.metadata
998 }
999
1000 pub fn parquet_schema(&self) -> &SchemaDescriptor {
1002 self.metadata.file_metadata().schema_descr()
1003 }
1004
1005 pub fn schema(&self) -> &SchemaRef {
1007 &self.schema
1008 }
1009}
1010
1011#[doc(hidden)]
1012pub struct SyncReader<T: ChunkReader>(T);
1014
1015impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1016 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1017 f.debug_tuple("SyncReader").field(&self.0).finish()
1018 }
1019}
1020
1021pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1028
1029impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1030 pub fn try_new(reader: T) -> Result<Self> {
1061 Self::try_new_with_options(reader, Default::default())
1062 }
1063
1064 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1069 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1070 Ok(Self::new_with_metadata(reader, metadata))
1071 }
1072
1073 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1112 Self::new_builder(SyncReader(input), metadata)
1113 }
1114
1115 pub fn get_row_group_column_bloom_filter(
1121 &self,
1122 row_group_idx: usize,
1123 column_idx: usize,
1124 ) -> Result<Option<Sbbf>> {
1125 let metadata = self.metadata.row_group(row_group_idx);
1126 let column_metadata = metadata.column(column_idx);
1127
1128 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1129 offset
1130 .try_into()
1131 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1132 } else {
1133 return Ok(None);
1134 };
1135
1136 let buffer = match column_metadata.bloom_filter_length() {
1137 Some(length) => self.input.0.get_bytes(offset, length as usize),
1138 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1139 }?;
1140
1141 let (header, bitset_offset) =
1142 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1143
1144 match header.algorithm {
1145 BloomFilterAlgorithm::BLOCK => {
1146 }
1148 }
1149 match header.compression {
1150 BloomFilterCompression::UNCOMPRESSED => {
1151 }
1153 }
1154 match header.hash {
1155 BloomFilterHash::XXHASH => {
1156 }
1158 }
1159
1160 let bitset = match column_metadata.bloom_filter_length() {
1161 Some(_) => buffer.slice(
1162 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1163 - TryInto::<usize>::try_into(offset).unwrap())..,
1164 ),
1165 None => {
1166 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1167 ParquetError::General("Bloom filter length is invalid".to_string())
1168 })?;
1169 self.input.0.get_bytes(bitset_offset, bitset_length)?
1170 }
1171 };
1172 Ok(Some(Sbbf::new(&bitset)))
1173 }
1174
1175 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1179 let Self {
1180 input,
1181 metadata,
1182 schema: _,
1183 fields,
1184 batch_size,
1185 row_groups,
1186 projection,
1187 mut filter,
1188 selection,
1189 row_selection_policy,
1190 limit,
1191 offset,
1192 metrics,
1193 max_predicate_cache_size: _,
1195 } = self;
1196
1197 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1199
1200 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1201
1202 let reader = ReaderRowGroups {
1203 reader: Arc::new(input.0),
1204 metadata,
1205 row_groups,
1206 };
1207
1208 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1209 .with_selection(selection)
1210 .with_row_selection_policy(row_selection_policy);
1211
1212 if let Some(filter) = filter.as_mut() {
1214 for predicate in filter.predicates.iter_mut() {
1215 if !plan_builder.selects_any() {
1217 break;
1218 }
1219
1220 let mut cache_projection = predicate.projection().clone();
1221 cache_projection.intersect(&projection);
1222
1223 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1224 .with_batch_size(batch_size)
1225 .with_parquet_metadata(&reader.metadata)
1226 .build_array_reader(fields.as_deref(), predicate.projection())?;
1227
1228 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1229 }
1230 }
1231
1232 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1233 .with_batch_size(batch_size)
1234 .with_parquet_metadata(&reader.metadata)
1235 .build_array_reader(fields.as_deref(), &projection)?;
1236
1237 let read_plan = plan_builder
1238 .limited(reader.num_rows())
1239 .with_offset(offset)
1240 .with_limit(limit)
1241 .build_limited()
1242 .build();
1243
1244 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1245 }
1246}
1247
1248struct ReaderRowGroups<T: ChunkReader> {
1249 reader: Arc<T>,
1250
1251 metadata: Arc<ParquetMetaData>,
1252 row_groups: Vec<usize>,
1254}
1255
1256impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1257 fn num_rows(&self) -> usize {
1258 let meta = self.metadata.row_groups();
1259 self.row_groups
1260 .iter()
1261 .map(|x| meta[*x].num_rows() as usize)
1262 .sum()
1263 }
1264
1265 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1266 Ok(Box::new(ReaderPageIterator {
1267 column_idx: i,
1268 reader: self.reader.clone(),
1269 metadata: self.metadata.clone(),
1270 row_groups: self.row_groups.clone().into_iter(),
1271 }))
1272 }
1273
1274 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1275 Box::new(
1276 self.row_groups
1277 .iter()
1278 .map(move |i| self.metadata.row_group(*i)),
1279 )
1280 }
1281
1282 fn metadata(&self) -> &ParquetMetaData {
1283 self.metadata.as_ref()
1284 }
1285}
1286
1287struct ReaderPageIterator<T: ChunkReader> {
1288 reader: Arc<T>,
1289 column_idx: usize,
1290 row_groups: std::vec::IntoIter<usize>,
1291 metadata: Arc<ParquetMetaData>,
1292}
1293
1294impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1295 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1297 let rg = self.metadata.row_group(rg_idx);
1298 let column_chunk_metadata = rg.column(self.column_idx);
1299 let offset_index = self.metadata.offset_index();
1300 let page_locations = offset_index
1303 .filter(|i| !i[rg_idx].is_empty())
1304 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1305 let total_rows = rg.num_rows() as usize;
1306 let reader = self.reader.clone();
1307
1308 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1309 .add_crypto_context(
1310 rg_idx,
1311 self.column_idx,
1312 self.metadata.as_ref(),
1313 column_chunk_metadata,
1314 )
1315 }
1316}
1317
1318impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1319 type Item = Result<Box<dyn PageReader>>;
1320
1321 fn next(&mut self) -> Option<Self::Item> {
1322 let rg_idx = self.row_groups.next()?;
1323 let page_reader = self
1324 .next_page_reader(rg_idx)
1325 .map(|page_reader| Box::new(page_reader) as _);
1326 Some(page_reader)
1327 }
1328}
1329
1330impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1331
1332pub struct ParquetRecordBatchReader {
1343 array_reader: Box<dyn ArrayReader>,
1344 schema: SchemaRef,
1345 read_plan: ReadPlan,
1346}
1347
1348impl Debug for ParquetRecordBatchReader {
1349 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1350 f.debug_struct("ParquetRecordBatchReader")
1351 .field("array_reader", &"...")
1352 .field("schema", &self.schema)
1353 .field("read_plan", &self.read_plan)
1354 .finish()
1355 }
1356}
1357
1358impl Iterator for ParquetRecordBatchReader {
1359 type Item = Result<RecordBatch, ArrowError>;
1360
1361 fn next(&mut self) -> Option<Self::Item> {
1362 self.next_inner()
1363 .map_err(|arrow_err| arrow_err.into())
1364 .transpose()
1365 }
1366}
1367
1368impl ParquetRecordBatchReader {
1369 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1375 let mut read_records = 0;
1376 let batch_size = self.batch_size();
1377 if batch_size == 0 {
1378 return Ok(None);
1379 }
1380 match self.read_plan.row_selection_cursor_mut() {
1381 RowSelectionCursor::Mask(mask_cursor) => {
1382 while !mask_cursor.is_empty() {
1385 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1386 return Ok(None);
1387 };
1388
1389 if mask_chunk.initial_skip > 0 {
1390 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1391 if skipped != mask_chunk.initial_skip {
1392 return Err(general_err!(
1393 "failed to skip rows, expected {}, got {}",
1394 mask_chunk.initial_skip,
1395 skipped
1396 ));
1397 }
1398 }
1399
1400 if mask_chunk.chunk_rows == 0 {
1401 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1402 return Ok(None);
1403 }
1404 continue;
1405 }
1406
1407 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1408
1409 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1410 if read == 0 {
1411 return Err(general_err!(
1412 "reached end of column while expecting {} rows",
1413 mask_chunk.chunk_rows
1414 ));
1415 }
1416 if read != mask_chunk.chunk_rows {
1417 return Err(general_err!(
1418 "insufficient rows read from array reader - expected {}, got {}",
1419 mask_chunk.chunk_rows,
1420 read
1421 ));
1422 }
1423
1424 let array = self.array_reader.consume_batch()?;
1425 let struct_array = array.as_struct_opt().ok_or_else(|| {
1428 ArrowError::ParquetError(
1429 "Struct array reader should return struct array".to_string(),
1430 )
1431 })?;
1432
1433 let filtered_batch =
1434 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1435
1436 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1437 return Err(general_err!(
1438 "filtered rows mismatch selection - expected {}, got {}",
1439 mask_chunk.selected_rows,
1440 filtered_batch.num_rows()
1441 ));
1442 }
1443
1444 if filtered_batch.num_rows() == 0 {
1445 continue;
1446 }
1447
1448 return Ok(Some(filtered_batch));
1449 }
1450 }
1451 RowSelectionCursor::Selectors(selectors_cursor) => {
1452 while read_records < batch_size && !selectors_cursor.is_empty() {
1453 let front = selectors_cursor.next_selector();
1454 if front.skip {
1455 let skipped = self.array_reader.skip_records(front.row_count)?;
1456
1457 if skipped != front.row_count {
1458 return Err(general_err!(
1459 "failed to skip rows, expected {}, got {}",
1460 front.row_count,
1461 skipped
1462 ));
1463 }
1464 continue;
1465 }
1466
1467 if front.row_count == 0 {
1470 continue;
1471 }
1472
1473 let need_read = batch_size - read_records;
1475 let to_read = match front.row_count.checked_sub(need_read) {
1476 Some(remaining) if remaining != 0 => {
1477 selectors_cursor.return_selector(RowSelector::select(remaining));
1480 need_read
1481 }
1482 _ => front.row_count,
1483 };
1484 match self.array_reader.read_records(to_read)? {
1485 0 => break,
1486 rec => read_records += rec,
1487 };
1488 }
1489 }
1490 RowSelectionCursor::All => {
1491 self.array_reader.read_records(batch_size)?;
1492 }
1493 };
1494
1495 let array = self.array_reader.consume_batch()?;
1496 let struct_array = array.as_struct_opt().ok_or_else(|| {
1497 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1498 })?;
1499
1500 Ok(if struct_array.len() > 0 {
1501 Some(RecordBatch::from(struct_array))
1502 } else {
1503 None
1504 })
1505 }
1506}
1507
1508impl RecordBatchReader for ParquetRecordBatchReader {
1509 fn schema(&self) -> SchemaRef {
1514 self.schema.clone()
1515 }
1516}
1517
1518impl ParquetRecordBatchReader {
1519 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1523 ParquetRecordBatchReaderBuilder::try_new(reader)?
1524 .with_batch_size(batch_size)
1525 .build()
1526 }
1527
1528 pub fn try_new_with_row_groups(
1533 levels: &FieldLevels,
1534 row_groups: &dyn RowGroups,
1535 batch_size: usize,
1536 selection: Option<RowSelection>,
1537 ) -> Result<Self> {
1538 let metrics = ArrowReaderMetrics::disabled();
1540 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1541 .with_batch_size(batch_size)
1542 .with_parquet_metadata(row_groups.metadata())
1543 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1544
1545 let read_plan = ReadPlanBuilder::new(batch_size)
1546 .with_selection(selection)
1547 .build();
1548
1549 Ok(Self {
1550 array_reader,
1551 schema: Arc::new(Schema::new(levels.fields.clone())),
1552 read_plan,
1553 })
1554 }
1555
1556 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1560 let schema = match array_reader.get_data_type() {
1561 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1562 _ => unreachable!("Struct array reader's data type is not struct!"),
1563 };
1564
1565 Self {
1566 array_reader,
1567 schema: Arc::new(schema),
1568 read_plan,
1569 }
1570 }
1571
1572 #[inline(always)]
1573 pub(crate) fn batch_size(&self) -> usize {
1574 self.read_plan.batch_size()
1575 }
1576}
1577
1578#[cfg(test)]
1579pub(crate) mod tests {
1580 use std::cmp::min;
1581 use std::collections::{HashMap, VecDeque};
1582 use std::fmt::Formatter;
1583 use std::fs::File;
1584 use std::io::Seek;
1585 use std::path::PathBuf;
1586 use std::sync::Arc;
1587
1588 use rand::rngs::StdRng;
1589 use rand::{Rng, RngCore, SeedableRng, random, rng};
1590 use tempfile::tempfile;
1591
1592 use crate::arrow::arrow_reader::{
1593 ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1594 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1595 };
1596 use crate::arrow::schema::{
1597 add_encoded_arrow_schema_to_metadata,
1598 virtual_type::{RowGroupIndex, RowNumber},
1599 };
1600 use crate::arrow::{ArrowWriter, ProjectionMask};
1601 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1602 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1603 use crate::data_type::{
1604 BoolType, ByteArray, ByteArrayType, DataType, DoubleType, FixedLenByteArray,
1605 FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96, Int96Type,
1606 };
1607 use crate::errors::Result;
1608 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1609 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1610 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1611 use crate::schema::parser::parse_message_type;
1612 use crate::schema::types::{Type, TypePtr};
1613 use crate::util::test_common::rand_gen::RandGen;
1614 use arrow_array::builder::*;
1615 use arrow_array::cast::AsArray;
1616 use arrow_array::types::{
1617 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1618 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1619 Time64MicrosecondType,
1620 };
1621 use arrow_array::*;
1622 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1623 use arrow_data::{ArrayData, ArrayDataBuilder};
1624 use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1625 use arrow_select::concat::concat_batches;
1626 use bytes::Bytes;
1627 use half::f16;
1628 use num_traits::PrimInt;
1629
1630 #[test]
1631 fn test_arrow_reader_all_columns() {
1632 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1633
1634 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1635 let original_schema = Arc::clone(builder.schema());
1636 let reader = builder.build().unwrap();
1637
1638 assert_eq!(original_schema.fields(), reader.schema().fields());
1640 }
1641
1642 #[test]
1643 fn test_reuse_schema() {
1644 let file = get_test_file("parquet/alltypes-java.parquet");
1645
1646 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1647 let expected = builder.metadata;
1648 let schema = expected.file_metadata().schema_descr_ptr();
1649
1650 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1651 let builder =
1652 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1653
1654 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1656 }
1657
1658 #[test]
1659 fn test_page_encoding_stats_mask() {
1660 let testdata = arrow::util::test_util::parquet_test_data();
1661 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1662 let file = File::open(path).unwrap();
1663
1664 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1665 let builder =
1666 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1667
1668 let row_group_metadata = builder.metadata.row_group(0);
1669
1670 let page_encoding_stats = row_group_metadata
1672 .column(0)
1673 .page_encoding_stats_mask()
1674 .unwrap();
1675 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1676 let page_encoding_stats = row_group_metadata
1677 .column(2)
1678 .page_encoding_stats_mask()
1679 .unwrap();
1680 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1681 }
1682
1683 #[test]
1684 fn test_stats_stats_skipped() {
1685 let testdata = arrow::util::test_util::parquet_test_data();
1686 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1687 let file = File::open(path).unwrap();
1688
1689 let arrow_options = ArrowReaderOptions::new()
1691 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1692 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1693 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1694 file.try_clone().unwrap(),
1695 arrow_options,
1696 )
1697 .unwrap();
1698
1699 let row_group_metadata = builder.metadata.row_group(0);
1700 for column in row_group_metadata.columns() {
1701 assert!(column.page_encoding_stats().is_none());
1702 assert!(column.page_encoding_stats_mask().is_none());
1703 assert!(column.statistics().is_none());
1704 }
1705
1706 let arrow_options = ArrowReaderOptions::new()
1708 .with_encoding_stats_as_mask(true)
1709 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1710 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1711 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1712 file.try_clone().unwrap(),
1713 arrow_options,
1714 )
1715 .unwrap();
1716
1717 let row_group_metadata = builder.metadata.row_group(0);
1718 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1719 assert!(column.page_encoding_stats().is_none());
1720 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1721 assert_eq!(column.statistics().is_some(), idx == 0);
1722 }
1723 }
1724
1725 #[test]
1726 fn test_size_stats_stats_skipped() {
1727 let testdata = arrow::util::test_util::parquet_test_data();
1728 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1729 let file = File::open(path).unwrap();
1730
1731 let arrow_options =
1733 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1734 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1735 file.try_clone().unwrap(),
1736 arrow_options,
1737 )
1738 .unwrap();
1739
1740 let row_group_metadata = builder.metadata.row_group(0);
1741 for column in row_group_metadata.columns() {
1742 assert!(column.repetition_level_histogram().is_none());
1743 assert!(column.definition_level_histogram().is_none());
1744 assert!(column.unencoded_byte_array_data_bytes().is_none());
1745 }
1746
1747 let arrow_options = ArrowReaderOptions::new()
1749 .with_encoding_stats_as_mask(true)
1750 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1751 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1752 file.try_clone().unwrap(),
1753 arrow_options,
1754 )
1755 .unwrap();
1756
1757 let row_group_metadata = builder.metadata.row_group(0);
1758 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1759 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1760 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1761 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1762 }
1763 }
1764
1765 #[test]
1766 fn test_arrow_reader_single_column() {
1767 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1768
1769 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1770 let original_schema = Arc::clone(builder.schema());
1771
1772 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1773 let reader = builder.with_projection(mask).build().unwrap();
1774
1775 assert_eq!(1, reader.schema().fields().len());
1777 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1778 }
1779
1780 #[test]
1781 fn test_arrow_reader_single_column_by_name() {
1782 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1783
1784 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1785 let original_schema = Arc::clone(builder.schema());
1786
1787 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1788 let reader = builder.with_projection(mask).build().unwrap();
1789
1790 assert_eq!(1, reader.schema().fields().len());
1792 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1793 }
1794
1795 #[test]
1796 fn test_null_column_reader_test() {
1797 let mut file = tempfile::tempfile().unwrap();
1798
1799 let schema = "
1800 message message {
1801 OPTIONAL INT32 int32;
1802 }
1803 ";
1804 let schema = Arc::new(parse_message_type(schema).unwrap());
1805
1806 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1807 generate_single_column_file_with_data::<Int32Type>(
1808 &[vec![], vec![]],
1809 Some(&def_levels),
1810 file.try_clone().unwrap(), schema,
1812 Some(Field::new("int32", ArrowDataType::Null, true)),
1813 &Default::default(),
1814 )
1815 .unwrap();
1816
1817 file.rewind().unwrap();
1818
1819 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1820 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1821
1822 assert_eq!(batches.len(), 4);
1823 for batch in &batches[0..3] {
1824 assert_eq!(batch.num_rows(), 2);
1825 assert_eq!(batch.num_columns(), 1);
1826 assert_eq!(batch.column(0).null_count(), 2);
1827 }
1828
1829 assert_eq!(batches[3].num_rows(), 1);
1830 assert_eq!(batches[3].num_columns(), 1);
1831 assert_eq!(batches[3].column(0).null_count(), 1);
1832 }
1833
1834 #[test]
1835 fn test_primitive_single_column_reader_test() {
1836 run_single_column_reader_tests::<BoolType, _, BoolType>(
1837 2,
1838 ConvertedType::NONE,
1839 None,
1840 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1841 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1842 );
1843 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1844 2,
1845 ConvertedType::NONE,
1846 None,
1847 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1848 &[
1849 Encoding::PLAIN,
1850 Encoding::RLE_DICTIONARY,
1851 Encoding::DELTA_BINARY_PACKED,
1852 Encoding::BYTE_STREAM_SPLIT,
1853 ],
1854 );
1855 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1856 2,
1857 ConvertedType::NONE,
1858 None,
1859 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1860 &[
1861 Encoding::PLAIN,
1862 Encoding::RLE_DICTIONARY,
1863 Encoding::DELTA_BINARY_PACKED,
1864 Encoding::BYTE_STREAM_SPLIT,
1865 ],
1866 );
1867 run_single_column_reader_tests::<FloatType, _, FloatType>(
1868 2,
1869 ConvertedType::NONE,
1870 None,
1871 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1872 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1873 );
1874 }
1875
1876 #[test]
1877 fn test_unsigned_primitive_single_column_reader_test() {
1878 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1879 2,
1880 ConvertedType::UINT_32,
1881 Some(ArrowDataType::UInt32),
1882 |vals| {
1883 Arc::new(UInt32Array::from_iter(
1884 vals.iter().map(|x| x.map(|x| x as u32)),
1885 ))
1886 },
1887 &[
1888 Encoding::PLAIN,
1889 Encoding::RLE_DICTIONARY,
1890 Encoding::DELTA_BINARY_PACKED,
1891 ],
1892 );
1893 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1894 2,
1895 ConvertedType::UINT_64,
1896 Some(ArrowDataType::UInt64),
1897 |vals| {
1898 Arc::new(UInt64Array::from_iter(
1899 vals.iter().map(|x| x.map(|x| x as u64)),
1900 ))
1901 },
1902 &[
1903 Encoding::PLAIN,
1904 Encoding::RLE_DICTIONARY,
1905 Encoding::DELTA_BINARY_PACKED,
1906 ],
1907 );
1908 }
1909
1910 #[test]
1911 fn test_unsigned_roundtrip() {
1912 let schema = Arc::new(Schema::new(vec![
1913 Field::new("uint32", ArrowDataType::UInt32, true),
1914 Field::new("uint64", ArrowDataType::UInt64, true),
1915 ]));
1916
1917 let mut buf = Vec::with_capacity(1024);
1918 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1919
1920 let original = RecordBatch::try_new(
1921 schema,
1922 vec![
1923 Arc::new(UInt32Array::from_iter_values([
1924 0,
1925 i32::MAX as u32,
1926 u32::MAX,
1927 ])),
1928 Arc::new(UInt64Array::from_iter_values([
1929 0,
1930 i64::MAX as u64,
1931 u64::MAX,
1932 ])),
1933 ],
1934 )
1935 .unwrap();
1936
1937 writer.write(&original).unwrap();
1938 writer.close().unwrap();
1939
1940 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1941 let ret = reader.next().unwrap().unwrap();
1942 assert_eq!(ret, original);
1943
1944 ret.column(0)
1946 .as_any()
1947 .downcast_ref::<UInt32Array>()
1948 .unwrap();
1949
1950 ret.column(1)
1951 .as_any()
1952 .downcast_ref::<UInt64Array>()
1953 .unwrap();
1954 }
1955
1956 #[test]
1957 fn test_float16_roundtrip() -> Result<()> {
1958 let schema = Arc::new(Schema::new(vec![
1959 Field::new("float16", ArrowDataType::Float16, false),
1960 Field::new("float16-nullable", ArrowDataType::Float16, true),
1961 ]));
1962
1963 let mut buf = Vec::with_capacity(1024);
1964 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1965
1966 let original = RecordBatch::try_new(
1967 schema,
1968 vec![
1969 Arc::new(Float16Array::from_iter_values([
1970 f16::EPSILON,
1971 f16::MIN,
1972 f16::MAX,
1973 f16::NAN,
1974 f16::INFINITY,
1975 f16::NEG_INFINITY,
1976 f16::ONE,
1977 f16::NEG_ONE,
1978 f16::ZERO,
1979 f16::NEG_ZERO,
1980 f16::E,
1981 f16::PI,
1982 f16::FRAC_1_PI,
1983 ])),
1984 Arc::new(Float16Array::from(vec![
1985 None,
1986 None,
1987 None,
1988 Some(f16::NAN),
1989 Some(f16::INFINITY),
1990 Some(f16::NEG_INFINITY),
1991 None,
1992 None,
1993 None,
1994 None,
1995 None,
1996 None,
1997 Some(f16::FRAC_1_PI),
1998 ])),
1999 ],
2000 )?;
2001
2002 writer.write(&original)?;
2003 writer.close()?;
2004
2005 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2006 let ret = reader.next().unwrap()?;
2007 assert_eq!(ret, original);
2008
2009 ret.column(0).as_primitive::<Float16Type>();
2011 ret.column(1).as_primitive::<Float16Type>();
2012
2013 Ok(())
2014 }
2015
2016 #[test]
2017 fn test_time_utc_roundtrip() -> Result<()> {
2018 let schema = Arc::new(Schema::new(vec![
2019 Field::new(
2020 "time_millis",
2021 ArrowDataType::Time32(TimeUnit::Millisecond),
2022 true,
2023 )
2024 .with_metadata(HashMap::from_iter(vec![(
2025 "adjusted_to_utc".to_string(),
2026 "".to_string(),
2027 )])),
2028 Field::new(
2029 "time_micros",
2030 ArrowDataType::Time64(TimeUnit::Microsecond),
2031 true,
2032 )
2033 .with_metadata(HashMap::from_iter(vec![(
2034 "adjusted_to_utc".to_string(),
2035 "".to_string(),
2036 )])),
2037 ]));
2038
2039 let mut buf = Vec::with_capacity(1024);
2040 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2041
2042 let original = RecordBatch::try_new(
2043 schema,
2044 vec![
2045 Arc::new(Time32MillisecondArray::from(vec![
2046 Some(-1),
2047 Some(0),
2048 Some(86_399_000),
2049 Some(86_400_000),
2050 Some(86_401_000),
2051 None,
2052 ])),
2053 Arc::new(Time64MicrosecondArray::from(vec![
2054 Some(-1),
2055 Some(0),
2056 Some(86_399 * 1_000_000),
2057 Some(86_400 * 1_000_000),
2058 Some(86_401 * 1_000_000),
2059 None,
2060 ])),
2061 ],
2062 )?;
2063
2064 writer.write(&original)?;
2065 writer.close()?;
2066
2067 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2068 let ret = reader.next().unwrap()?;
2069 assert_eq!(ret, original);
2070
2071 ret.column(0).as_primitive::<Time32MillisecondType>();
2073 ret.column(1).as_primitive::<Time64MicrosecondType>();
2074
2075 Ok(())
2076 }
2077
2078 #[test]
2079 fn test_date32_roundtrip() -> Result<()> {
2080 use arrow_array::Date32Array;
2081
2082 let schema = Arc::new(Schema::new(vec![Field::new(
2083 "date32",
2084 ArrowDataType::Date32,
2085 false,
2086 )]));
2087
2088 let mut buf = Vec::with_capacity(1024);
2089
2090 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2091
2092 let original = RecordBatch::try_new(
2093 schema,
2094 vec![Arc::new(Date32Array::from(vec![
2095 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2096 ]))],
2097 )?;
2098
2099 writer.write(&original)?;
2100 writer.close()?;
2101
2102 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2103 let ret = reader.next().unwrap()?;
2104 assert_eq!(ret, original);
2105
2106 ret.column(0).as_primitive::<Date32Type>();
2108
2109 Ok(())
2110 }
2111
2112 #[test]
2113 fn test_date64_roundtrip() -> Result<()> {
2114 use arrow_array::Date64Array;
2115
2116 let schema = Arc::new(Schema::new(vec![
2117 Field::new("small-date64", ArrowDataType::Date64, false),
2118 Field::new("big-date64", ArrowDataType::Date64, false),
2119 Field::new("invalid-date64", ArrowDataType::Date64, false),
2120 ]));
2121
2122 let mut default_buf = Vec::with_capacity(1024);
2123 let mut coerce_buf = Vec::with_capacity(1024);
2124
2125 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2126
2127 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2128 let mut coerce_writer =
2129 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2130
2131 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2132
2133 let original = RecordBatch::try_new(
2134 schema,
2135 vec![
2136 Arc::new(Date64Array::from(vec![
2138 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2139 -1_000 * NUM_MILLISECONDS_IN_DAY,
2140 0,
2141 1_000 * NUM_MILLISECONDS_IN_DAY,
2142 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2143 ])),
2144 Arc::new(Date64Array::from(vec![
2146 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2147 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2148 0,
2149 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2150 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2151 ])),
2152 Arc::new(Date64Array::from(vec![
2154 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2155 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2156 1,
2157 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2158 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2159 ])),
2160 ],
2161 )?;
2162
2163 default_writer.write(&original)?;
2164 coerce_writer.write(&original)?;
2165
2166 default_writer.close()?;
2167 coerce_writer.close()?;
2168
2169 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2170 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2171
2172 let default_ret = default_reader.next().unwrap()?;
2173 let coerce_ret = coerce_reader.next().unwrap()?;
2174
2175 assert_eq!(default_ret, original);
2177
2178 assert_eq!(coerce_ret.column(0), original.column(0));
2180 assert_ne!(coerce_ret.column(1), original.column(1));
2181 assert_ne!(coerce_ret.column(2), original.column(2));
2182
2183 default_ret.column(0).as_primitive::<Date64Type>();
2185 coerce_ret.column(0).as_primitive::<Date64Type>();
2186
2187 Ok(())
2188 }
2189 struct RandFixedLenGen {}
2190
2191 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2192 fn r#gen(len: i32) -> FixedLenByteArray {
2193 let mut v = vec![0u8; len as usize];
2194 rng().fill_bytes(&mut v);
2195 ByteArray::from(v).into()
2196 }
2197 }
2198
2199 #[test]
2200 fn test_fixed_length_binary_column_reader() {
2201 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2202 20,
2203 ConvertedType::NONE,
2204 None,
2205 |vals| {
2206 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2207 for val in vals {
2208 match val {
2209 Some(b) => builder.append_value(b).unwrap(),
2210 None => builder.append_null(),
2211 }
2212 }
2213 Arc::new(builder.finish())
2214 },
2215 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2216 );
2217 }
2218
2219 #[test]
2220 fn test_interval_day_time_column_reader() {
2221 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2222 12,
2223 ConvertedType::INTERVAL,
2224 None,
2225 |vals| {
2226 Arc::new(
2227 vals.iter()
2228 .map(|x| {
2229 x.as_ref().map(|b| IntervalDayTime {
2230 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2231 milliseconds: i32::from_le_bytes(
2232 b.as_ref()[8..12].try_into().unwrap(),
2233 ),
2234 })
2235 })
2236 .collect::<IntervalDayTimeArray>(),
2237 )
2238 },
2239 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2240 );
2241 }
2242
2243 #[test]
2244 fn test_int96_single_column_reader_test() {
2245 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2246
2247 type TypeHintAndConversionFunction =
2248 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2249
2250 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2251 (None, |vals: &[Option<Int96>]| {
2253 Arc::new(TimestampNanosecondArray::from_iter(
2254 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2255 )) as ArrayRef
2256 }),
2257 (
2259 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2260 |vals: &[Option<Int96>]| {
2261 Arc::new(TimestampSecondArray::from_iter(
2262 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2263 )) as ArrayRef
2264 },
2265 ),
2266 (
2267 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2268 |vals: &[Option<Int96>]| {
2269 Arc::new(TimestampMillisecondArray::from_iter(
2270 vals.iter().map(|x| x.map(|x| x.to_millis())),
2271 )) as ArrayRef
2272 },
2273 ),
2274 (
2275 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2276 |vals: &[Option<Int96>]| {
2277 Arc::new(TimestampMicrosecondArray::from_iter(
2278 vals.iter().map(|x| x.map(|x| x.to_micros())),
2279 )) as ArrayRef
2280 },
2281 ),
2282 (
2283 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2284 |vals: &[Option<Int96>]| {
2285 Arc::new(TimestampNanosecondArray::from_iter(
2286 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2287 )) as ArrayRef
2288 },
2289 ),
2290 (
2292 Some(ArrowDataType::Timestamp(
2293 TimeUnit::Second,
2294 Some(Arc::from("-05:00")),
2295 )),
2296 |vals: &[Option<Int96>]| {
2297 Arc::new(
2298 TimestampSecondArray::from_iter(
2299 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2300 )
2301 .with_timezone("-05:00"),
2302 ) as ArrayRef
2303 },
2304 ),
2305 ];
2306
2307 resolutions.iter().for_each(|(arrow_type, converter)| {
2308 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2309 2,
2310 ConvertedType::NONE,
2311 arrow_type.clone(),
2312 converter,
2313 encodings,
2314 );
2315 })
2316 }
2317
2318 #[test]
2319 fn test_int96_from_spark_file_with_provided_schema() {
2320 use arrow_schema::DataType::Timestamp;
2324 let test_data = arrow::util::test_util::parquet_test_data();
2325 let path = format!("{test_data}/int96_from_spark.parquet");
2326 let file = File::open(path).unwrap();
2327
2328 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2329 "a",
2330 Timestamp(TimeUnit::Microsecond, None),
2331 true,
2332 )]));
2333 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2334
2335 let mut record_reader =
2336 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2337 .unwrap()
2338 .build()
2339 .unwrap();
2340
2341 let batch = record_reader.next().unwrap().unwrap();
2342 assert_eq!(batch.num_columns(), 1);
2343 let column = batch.column(0);
2344 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2345
2346 let expected = Arc::new(Int64Array::from(vec![
2347 Some(1704141296123456),
2348 Some(1704070800000000),
2349 Some(253402225200000000),
2350 Some(1735599600000000),
2351 None,
2352 Some(9089380393200000000),
2353 ]));
2354
2355 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2360 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2361
2362 assert_eq!(casted_timestamps.len(), expected.len());
2363
2364 casted_timestamps
2365 .iter()
2366 .zip(expected.iter())
2367 .for_each(|(lhs, rhs)| {
2368 assert_eq!(lhs, rhs);
2369 });
2370 }
2371
2372 #[test]
2373 fn test_int96_from_spark_file_without_provided_schema() {
2374 use arrow_schema::DataType::Timestamp;
2378 let test_data = arrow::util::test_util::parquet_test_data();
2379 let path = format!("{test_data}/int96_from_spark.parquet");
2380 let file = File::open(path).unwrap();
2381
2382 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2383 .unwrap()
2384 .build()
2385 .unwrap();
2386
2387 let batch = record_reader.next().unwrap().unwrap();
2388 assert_eq!(batch.num_columns(), 1);
2389 let column = batch.column(0);
2390 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2391
2392 let expected = Arc::new(Int64Array::from(vec![
2393 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2398 Some(-4864435138808946688), ]));
2400
2401 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2406 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2407
2408 assert_eq!(casted_timestamps.len(), expected.len());
2409
2410 casted_timestamps
2411 .iter()
2412 .zip(expected.iter())
2413 .for_each(|(lhs, rhs)| {
2414 assert_eq!(lhs, rhs);
2415 });
2416 }
2417
2418 struct RandUtf8Gen {}
2419
2420 impl RandGen<ByteArrayType> for RandUtf8Gen {
2421 fn r#gen(len: i32) -> ByteArray {
2422 Int32Type::r#gen(len).to_string().as_str().into()
2423 }
2424 }
2425
2426 #[test]
2427 fn test_utf8_single_column_reader_test() {
2428 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2429 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2430 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2431 })))
2432 }
2433
2434 let encodings = &[
2435 Encoding::PLAIN,
2436 Encoding::RLE_DICTIONARY,
2437 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2438 Encoding::DELTA_BYTE_ARRAY,
2439 ];
2440
2441 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2442 2,
2443 ConvertedType::NONE,
2444 None,
2445 |vals| {
2446 Arc::new(BinaryArray::from_iter(
2447 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2448 ))
2449 },
2450 encodings,
2451 );
2452
2453 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2454 2,
2455 ConvertedType::UTF8,
2456 None,
2457 string_converter::<i32>,
2458 encodings,
2459 );
2460
2461 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2462 2,
2463 ConvertedType::UTF8,
2464 Some(ArrowDataType::Utf8),
2465 string_converter::<i32>,
2466 encodings,
2467 );
2468
2469 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2470 2,
2471 ConvertedType::UTF8,
2472 Some(ArrowDataType::LargeUtf8),
2473 string_converter::<i64>,
2474 encodings,
2475 );
2476
2477 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2478 for key in &small_key_types {
2479 for encoding in encodings {
2480 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2481 opts.encoding = *encoding;
2482
2483 let data_type =
2484 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2485
2486 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2488 opts,
2489 2,
2490 ConvertedType::UTF8,
2491 Some(data_type.clone()),
2492 move |vals| {
2493 let vals = string_converter::<i32>(vals);
2494 arrow::compute::cast(&vals, &data_type).unwrap()
2495 },
2496 );
2497 }
2498 }
2499
2500 let key_types = [
2501 ArrowDataType::Int16,
2502 ArrowDataType::UInt16,
2503 ArrowDataType::Int32,
2504 ArrowDataType::UInt32,
2505 ArrowDataType::Int64,
2506 ArrowDataType::UInt64,
2507 ];
2508
2509 for key in &key_types {
2510 let data_type =
2511 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2512
2513 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2514 2,
2515 ConvertedType::UTF8,
2516 Some(data_type.clone()),
2517 move |vals| {
2518 let vals = string_converter::<i32>(vals);
2519 arrow::compute::cast(&vals, &data_type).unwrap()
2520 },
2521 encodings,
2522 );
2523
2524 let data_type = ArrowDataType::Dictionary(
2525 Box::new(key.clone()),
2526 Box::new(ArrowDataType::LargeUtf8),
2527 );
2528
2529 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2530 2,
2531 ConvertedType::UTF8,
2532 Some(data_type.clone()),
2533 move |vals| {
2534 let vals = string_converter::<i64>(vals);
2535 arrow::compute::cast(&vals, &data_type).unwrap()
2536 },
2537 encodings,
2538 );
2539 }
2540 }
2541
2542 #[test]
2543 fn test_decimal_nullable_struct() {
2544 let decimals = Decimal256Array::from_iter_values(
2545 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2546 );
2547
2548 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2549 "decimals",
2550 decimals.data_type().clone(),
2551 false,
2552 )])))
2553 .len(8)
2554 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2555 .child_data(vec![decimals.into_data()])
2556 .build()
2557 .unwrap();
2558
2559 let written =
2560 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2561 .unwrap();
2562
2563 let mut buffer = Vec::with_capacity(1024);
2564 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2565 writer.write(&written).unwrap();
2566 writer.close().unwrap();
2567
2568 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2569 .unwrap()
2570 .collect::<Result<Vec<_>, _>>()
2571 .unwrap();
2572
2573 assert_eq!(&written.slice(0, 3), &read[0]);
2574 assert_eq!(&written.slice(3, 3), &read[1]);
2575 assert_eq!(&written.slice(6, 2), &read[2]);
2576 }
2577
2578 #[test]
2579 fn test_int32_nullable_struct() {
2580 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2581 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2582 "int32",
2583 int32.data_type().clone(),
2584 false,
2585 )])))
2586 .len(8)
2587 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2588 .child_data(vec![int32.into_data()])
2589 .build()
2590 .unwrap();
2591
2592 let written =
2593 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2594 .unwrap();
2595
2596 let mut buffer = Vec::with_capacity(1024);
2597 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2598 writer.write(&written).unwrap();
2599 writer.close().unwrap();
2600
2601 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2602 .unwrap()
2603 .collect::<Result<Vec<_>, _>>()
2604 .unwrap();
2605
2606 assert_eq!(&written.slice(0, 3), &read[0]);
2607 assert_eq!(&written.slice(3, 3), &read[1]);
2608 assert_eq!(&written.slice(6, 2), &read[2]);
2609 }
2610
2611 #[test]
2612 fn test_decimal_list() {
2613 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2614
2615 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2617 decimals.data_type().clone(),
2618 false,
2619 ))))
2620 .len(7)
2621 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2622 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2623 .child_data(vec![decimals.into_data()])
2624 .build()
2625 .unwrap();
2626
2627 let written =
2628 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2629 .unwrap();
2630
2631 let mut buffer = Vec::with_capacity(1024);
2632 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2633 writer.write(&written).unwrap();
2634 writer.close().unwrap();
2635
2636 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2637 .unwrap()
2638 .collect::<Result<Vec<_>, _>>()
2639 .unwrap();
2640
2641 assert_eq!(&written.slice(0, 3), &read[0]);
2642 assert_eq!(&written.slice(3, 3), &read[1]);
2643 assert_eq!(&written.slice(6, 1), &read[2]);
2644 }
2645
2646 #[test]
2647 fn test_read_decimal_file() {
2648 use arrow_array::Decimal128Array;
2649 let testdata = arrow::util::test_util::parquet_test_data();
2650 let file_variants = vec![
2651 ("byte_array", 4),
2652 ("fixed_length", 25),
2653 ("int32", 4),
2654 ("int64", 10),
2655 ];
2656 for (prefix, target_precision) in file_variants {
2657 let path = format!("{testdata}/{prefix}_decimal.parquet");
2658 let file = File::open(path).unwrap();
2659 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2660
2661 let batch = record_reader.next().unwrap().unwrap();
2662 assert_eq!(batch.num_rows(), 24);
2663 let col = batch
2664 .column(0)
2665 .as_any()
2666 .downcast_ref::<Decimal128Array>()
2667 .unwrap();
2668
2669 let expected = 1..25;
2670
2671 assert_eq!(col.precision(), target_precision);
2672 assert_eq!(col.scale(), 2);
2673
2674 for (i, v) in expected.enumerate() {
2675 assert_eq!(col.value(i), v * 100_i128);
2676 }
2677 }
2678 }
2679
2680 #[test]
2681 fn test_read_float16_nonzeros_file() {
2682 use arrow_array::Float16Array;
2683 let testdata = arrow::util::test_util::parquet_test_data();
2684 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2686 let file = File::open(path).unwrap();
2687 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2688
2689 let batch = record_reader.next().unwrap().unwrap();
2690 assert_eq!(batch.num_rows(), 8);
2691 let col = batch
2692 .column(0)
2693 .as_any()
2694 .downcast_ref::<Float16Array>()
2695 .unwrap();
2696
2697 let f16_two = f16::ONE + f16::ONE;
2698
2699 assert_eq!(col.null_count(), 1);
2700 assert!(col.is_null(0));
2701 assert_eq!(col.value(1), f16::ONE);
2702 assert_eq!(col.value(2), -f16_two);
2703 assert!(col.value(3).is_nan());
2704 assert_eq!(col.value(4), f16::ZERO);
2705 assert!(col.value(4).is_sign_positive());
2706 assert_eq!(col.value(5), f16::NEG_ONE);
2707 assert_eq!(col.value(6), f16::NEG_ZERO);
2708 assert!(col.value(6).is_sign_negative());
2709 assert_eq!(col.value(7), f16_two);
2710 }
2711
2712 #[test]
2713 fn test_read_float16_zeros_file() {
2714 use arrow_array::Float16Array;
2715 let testdata = arrow::util::test_util::parquet_test_data();
2716 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2718 let file = File::open(path).unwrap();
2719 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2720
2721 let batch = record_reader.next().unwrap().unwrap();
2722 assert_eq!(batch.num_rows(), 3);
2723 let col = batch
2724 .column(0)
2725 .as_any()
2726 .downcast_ref::<Float16Array>()
2727 .unwrap();
2728
2729 assert_eq!(col.null_count(), 1);
2730 assert!(col.is_null(0));
2731 assert_eq!(col.value(1), f16::ZERO);
2732 assert!(col.value(1).is_sign_positive());
2733 assert!(col.value(2).is_nan());
2734 }
2735
2736 #[test]
2737 fn test_read_float32_float64_byte_stream_split() {
2738 let path = format!(
2739 "{}/byte_stream_split.zstd.parquet",
2740 arrow::util::test_util::parquet_test_data(),
2741 );
2742 let file = File::open(path).unwrap();
2743 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2744
2745 let mut row_count = 0;
2746 for batch in record_reader {
2747 let batch = batch.unwrap();
2748 row_count += batch.num_rows();
2749 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2750 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2751
2752 for &x in f32_col.values() {
2754 assert!(x > -10.0);
2755 assert!(x < 10.0);
2756 }
2757 for &x in f64_col.values() {
2758 assert!(x > -10.0);
2759 assert!(x < 10.0);
2760 }
2761 }
2762 assert_eq!(row_count, 300);
2763 }
2764
2765 #[test]
2766 fn test_read_extended_byte_stream_split() {
2767 let path = format!(
2768 "{}/byte_stream_split_extended.gzip.parquet",
2769 arrow::util::test_util::parquet_test_data(),
2770 );
2771 let file = File::open(path).unwrap();
2772 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2773
2774 let mut row_count = 0;
2775 for batch in record_reader {
2776 let batch = batch.unwrap();
2777 row_count += batch.num_rows();
2778
2779 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2781 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2782 assert_eq!(f16_col.len(), f16_bss.len());
2783 f16_col
2784 .iter()
2785 .zip(f16_bss.iter())
2786 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2787
2788 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2790 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2791 assert_eq!(f32_col.len(), f32_bss.len());
2792 f32_col
2793 .iter()
2794 .zip(f32_bss.iter())
2795 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2796
2797 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2799 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2800 assert_eq!(f64_col.len(), f64_bss.len());
2801 f64_col
2802 .iter()
2803 .zip(f64_bss.iter())
2804 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2805
2806 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2808 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2809 assert_eq!(i32_col.len(), i32_bss.len());
2810 i32_col
2811 .iter()
2812 .zip(i32_bss.iter())
2813 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2814
2815 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2817 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2818 assert_eq!(i64_col.len(), i64_bss.len());
2819 i64_col
2820 .iter()
2821 .zip(i64_bss.iter())
2822 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2823
2824 let flba_col = batch.column(10).as_fixed_size_binary();
2826 let flba_bss = batch.column(11).as_fixed_size_binary();
2827 assert_eq!(flba_col.len(), flba_bss.len());
2828 flba_col
2829 .iter()
2830 .zip(flba_bss.iter())
2831 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2832
2833 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2835 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2836 assert_eq!(dec_col.len(), dec_bss.len());
2837 dec_col
2838 .iter()
2839 .zip(dec_bss.iter())
2840 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2841 }
2842 assert_eq!(row_count, 200);
2843 }
2844
2845 #[test]
2846 fn test_read_incorrect_map_schema_file() {
2847 let testdata = arrow::util::test_util::parquet_test_data();
2848 let path = format!("{testdata}/incorrect_map_schema.parquet");
2850 let file = File::open(path).unwrap();
2851 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2852
2853 let batch = record_reader.next().unwrap().unwrap();
2854 assert_eq!(batch.num_rows(), 1);
2855
2856 let expected_schema = Schema::new(vec![Field::new(
2857 "my_map",
2858 ArrowDataType::Map(
2859 Arc::new(Field::new(
2860 "key_value",
2861 ArrowDataType::Struct(Fields::from(vec![
2862 Field::new("key", ArrowDataType::Utf8, false),
2863 Field::new("value", ArrowDataType::Utf8, true),
2864 ])),
2865 false,
2866 )),
2867 false,
2868 ),
2869 true,
2870 )]);
2871 assert_eq!(batch.schema().as_ref(), &expected_schema);
2872
2873 assert_eq!(batch.num_rows(), 1);
2874 assert_eq!(batch.column(0).null_count(), 0);
2875 assert_eq!(
2876 batch.column(0).as_map().keys().as_ref(),
2877 &StringArray::from(vec!["parent", "name"])
2878 );
2879 assert_eq!(
2880 batch.column(0).as_map().values().as_ref(),
2881 &StringArray::from(vec!["another", "report"])
2882 );
2883 }
2884
2885 #[test]
2886 fn test_read_dict_fixed_size_binary() {
2887 let schema = Arc::new(Schema::new(vec![Field::new(
2888 "a",
2889 ArrowDataType::Dictionary(
2890 Box::new(ArrowDataType::UInt8),
2891 Box::new(ArrowDataType::FixedSizeBinary(8)),
2892 ),
2893 true,
2894 )]));
2895 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2896 let values = FixedSizeBinaryArray::try_from_iter(
2897 vec![
2898 (0u8..8u8).collect::<Vec<u8>>(),
2899 (24u8..32u8).collect::<Vec<u8>>(),
2900 ]
2901 .into_iter(),
2902 )
2903 .unwrap();
2904 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2905 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2906
2907 let mut buffer = Vec::with_capacity(1024);
2908 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2909 writer.write(&batch).unwrap();
2910 writer.close().unwrap();
2911 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2912 .unwrap()
2913 .collect::<Result<Vec<_>, _>>()
2914 .unwrap();
2915
2916 assert_eq!(read.len(), 1);
2917 assert_eq!(&batch, &read[0])
2918 }
2919
2920 #[test]
2921 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2922 let struct_fields = Fields::from(vec![
2929 Field::new(
2930 "city",
2931 ArrowDataType::Dictionary(
2932 Box::new(ArrowDataType::UInt8),
2933 Box::new(ArrowDataType::Utf8),
2934 ),
2935 true,
2936 ),
2937 Field::new("name", ArrowDataType::Utf8, true),
2938 ]);
2939 let schema = Arc::new(Schema::new(vec![Field::new(
2940 "items",
2941 ArrowDataType::Struct(struct_fields.clone()),
2942 true,
2943 )]));
2944
2945 let items_arr = StructArray::new(
2946 struct_fields,
2947 vec![
2948 Arc::new(DictionaryArray::new(
2949 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2950 Arc::new(StringArray::from_iter_values(vec![
2951 "quebec",
2952 "fredericton",
2953 "halifax",
2954 ])),
2955 )),
2956 Arc::new(StringArray::from_iter_values(vec![
2957 "albert", "terry", "lance", "", "tim",
2958 ])),
2959 ],
2960 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2961 );
2962
2963 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2964 let mut buffer = Vec::with_capacity(1024);
2965 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2966 writer.write(&batch).unwrap();
2967 writer.close().unwrap();
2968 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2969 .unwrap()
2970 .collect::<Result<Vec<_>, _>>()
2971 .unwrap();
2972
2973 assert_eq!(read.len(), 1);
2974 assert_eq!(&batch, &read[0])
2975 }
2976
2977 #[derive(Clone)]
2979 struct TestOptions {
2980 num_row_groups: usize,
2983 num_rows: usize,
2985 record_batch_size: usize,
2987 null_percent: Option<usize>,
2989 write_batch_size: usize,
2994 max_data_page_size: usize,
2996 max_dict_page_size: usize,
2998 writer_version: WriterVersion,
3000 enabled_statistics: EnabledStatistics,
3002 encoding: Encoding,
3004 row_selections: Option<(RowSelection, usize)>,
3006 row_filter: Option<Vec<bool>>,
3008 limit: Option<usize>,
3010 offset: Option<usize>,
3012 }
3013
3014 impl std::fmt::Debug for TestOptions {
3016 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3017 f.debug_struct("TestOptions")
3018 .field("num_row_groups", &self.num_row_groups)
3019 .field("num_rows", &self.num_rows)
3020 .field("record_batch_size", &self.record_batch_size)
3021 .field("null_percent", &self.null_percent)
3022 .field("write_batch_size", &self.write_batch_size)
3023 .field("max_data_page_size", &self.max_data_page_size)
3024 .field("max_dict_page_size", &self.max_dict_page_size)
3025 .field("writer_version", &self.writer_version)
3026 .field("enabled_statistics", &self.enabled_statistics)
3027 .field("encoding", &self.encoding)
3028 .field("row_selections", &self.row_selections.is_some())
3029 .field("row_filter", &self.row_filter.is_some())
3030 .field("limit", &self.limit)
3031 .field("offset", &self.offset)
3032 .finish()
3033 }
3034 }
3035
3036 impl Default for TestOptions {
3037 fn default() -> Self {
3038 Self {
3039 num_row_groups: 2,
3040 num_rows: 100,
3041 record_batch_size: 15,
3042 null_percent: None,
3043 write_batch_size: 64,
3044 max_data_page_size: 1024 * 1024,
3045 max_dict_page_size: 1024 * 1024,
3046 writer_version: WriterVersion::PARQUET_1_0,
3047 enabled_statistics: EnabledStatistics::Page,
3048 encoding: Encoding::PLAIN,
3049 row_selections: None,
3050 row_filter: None,
3051 limit: None,
3052 offset: None,
3053 }
3054 }
3055 }
3056
3057 impl TestOptions {
3058 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3059 Self {
3060 num_row_groups,
3061 num_rows,
3062 record_batch_size,
3063 ..Default::default()
3064 }
3065 }
3066
3067 fn with_null_percent(self, null_percent: usize) -> Self {
3068 Self {
3069 null_percent: Some(null_percent),
3070 ..self
3071 }
3072 }
3073
3074 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3075 Self {
3076 max_data_page_size,
3077 ..self
3078 }
3079 }
3080
3081 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3082 Self {
3083 max_dict_page_size,
3084 ..self
3085 }
3086 }
3087
3088 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3089 Self {
3090 enabled_statistics,
3091 ..self
3092 }
3093 }
3094
3095 fn with_row_selections(self) -> Self {
3096 assert!(self.row_filter.is_none(), "Must set row selection first");
3097
3098 let mut rng = rng();
3099 let step = rng.random_range(self.record_batch_size..self.num_rows);
3100 let row_selections = create_test_selection(
3101 step,
3102 self.num_row_groups * self.num_rows,
3103 rng.random::<bool>(),
3104 );
3105 Self {
3106 row_selections: Some(row_selections),
3107 ..self
3108 }
3109 }
3110
3111 fn with_row_filter(self) -> Self {
3112 let row_count = match &self.row_selections {
3113 Some((_, count)) => *count,
3114 None => self.num_row_groups * self.num_rows,
3115 };
3116
3117 let mut rng = rng();
3118 Self {
3119 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3120 ..self
3121 }
3122 }
3123
3124 fn with_limit(self, limit: usize) -> Self {
3125 Self {
3126 limit: Some(limit),
3127 ..self
3128 }
3129 }
3130
3131 fn with_offset(self, offset: usize) -> Self {
3132 Self {
3133 offset: Some(offset),
3134 ..self
3135 }
3136 }
3137
3138 fn writer_props(&self) -> WriterProperties {
3139 let builder = WriterProperties::builder()
3140 .set_data_page_size_limit(self.max_data_page_size)
3141 .set_write_batch_size(self.write_batch_size)
3142 .set_writer_version(self.writer_version)
3143 .set_statistics_enabled(self.enabled_statistics);
3144
3145 let builder = match self.encoding {
3146 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3147 .set_dictionary_enabled(true)
3148 .set_dictionary_page_size_limit(self.max_dict_page_size),
3149 _ => builder
3150 .set_dictionary_enabled(false)
3151 .set_encoding(self.encoding),
3152 };
3153
3154 builder.build()
3155 }
3156 }
3157
3158 fn run_single_column_reader_tests<T, F, G>(
3165 rand_max: i32,
3166 converted_type: ConvertedType,
3167 arrow_type: Option<ArrowDataType>,
3168 converter: F,
3169 encodings: &[Encoding],
3170 ) where
3171 T: DataType,
3172 G: RandGen<T>,
3173 F: Fn(&[Option<T::T>]) -> ArrayRef,
3174 {
3175 let all_options = vec![
3176 TestOptions::new(2, 100, 15),
3179 TestOptions::new(3, 25, 5),
3184 TestOptions::new(4, 100, 25),
3188 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3190 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3192 TestOptions::new(2, 256, 127).with_null_percent(0),
3194 TestOptions::new(2, 256, 93).with_null_percent(25),
3196 TestOptions::new(4, 100, 25).with_limit(0),
3198 TestOptions::new(4, 100, 25).with_limit(50),
3200 TestOptions::new(4, 100, 25).with_limit(10),
3202 TestOptions::new(4, 100, 25).with_limit(101),
3204 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3206 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3208 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3210 TestOptions::new(2, 256, 91)
3212 .with_null_percent(25)
3213 .with_enabled_statistics(EnabledStatistics::Chunk),
3214 TestOptions::new(2, 256, 91)
3216 .with_null_percent(25)
3217 .with_enabled_statistics(EnabledStatistics::None),
3218 TestOptions::new(2, 128, 91)
3220 .with_null_percent(100)
3221 .with_enabled_statistics(EnabledStatistics::None),
3222 TestOptions::new(2, 100, 15).with_row_selections(),
3227 TestOptions::new(3, 25, 5).with_row_selections(),
3232 TestOptions::new(4, 100, 25).with_row_selections(),
3236 TestOptions::new(3, 256, 73)
3238 .with_max_data_page_size(128)
3239 .with_row_selections(),
3240 TestOptions::new(3, 256, 57)
3242 .with_max_dict_page_size(128)
3243 .with_row_selections(),
3244 TestOptions::new(2, 256, 127)
3246 .with_null_percent(0)
3247 .with_row_selections(),
3248 TestOptions::new(2, 256, 93)
3250 .with_null_percent(25)
3251 .with_row_selections(),
3252 TestOptions::new(2, 256, 93)
3254 .with_null_percent(25)
3255 .with_row_selections()
3256 .with_limit(10),
3257 TestOptions::new(2, 256, 93)
3259 .with_null_percent(25)
3260 .with_row_selections()
3261 .with_offset(20)
3262 .with_limit(10),
3263 TestOptions::new(4, 100, 25).with_row_filter(),
3267 TestOptions::new(4, 100, 25)
3269 .with_row_selections()
3270 .with_row_filter(),
3271 TestOptions::new(2, 256, 93)
3273 .with_null_percent(25)
3274 .with_max_data_page_size(10)
3275 .with_row_filter(),
3276 TestOptions::new(2, 256, 93)
3278 .with_null_percent(25)
3279 .with_max_data_page_size(10)
3280 .with_row_selections()
3281 .with_row_filter(),
3282 TestOptions::new(2, 256, 93)
3284 .with_enabled_statistics(EnabledStatistics::None)
3285 .with_max_data_page_size(10)
3286 .with_row_selections(),
3287 ];
3288
3289 all_options.into_iter().for_each(|opts| {
3290 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3291 for encoding in encodings {
3292 let opts = TestOptions {
3293 writer_version,
3294 encoding: *encoding,
3295 ..opts.clone()
3296 };
3297
3298 single_column_reader_test::<T, _, G>(
3299 opts,
3300 rand_max,
3301 converted_type,
3302 arrow_type.clone(),
3303 &converter,
3304 )
3305 }
3306 }
3307 });
3308 }
3309
3310 fn single_column_reader_test<T, F, G>(
3314 opts: TestOptions,
3315 rand_max: i32,
3316 converted_type: ConvertedType,
3317 arrow_type: Option<ArrowDataType>,
3318 converter: F,
3319 ) where
3320 T: DataType,
3321 G: RandGen<T>,
3322 F: Fn(&[Option<T::T>]) -> ArrayRef,
3323 {
3324 println!(
3326 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3327 T::get_physical_type(),
3328 converted_type,
3329 arrow_type,
3330 opts
3331 );
3332
3333 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3335 Some(null_percent) => {
3336 let mut rng = rng();
3337
3338 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3339 .map(|_| {
3340 std::iter::from_fn(|| {
3341 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3342 })
3343 .take(opts.num_rows)
3344 .collect()
3345 })
3346 .collect();
3347 (Repetition::OPTIONAL, Some(def_levels))
3348 }
3349 None => (Repetition::REQUIRED, None),
3350 };
3351
3352 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3354 .map(|idx| {
3355 let null_count = match def_levels.as_ref() {
3356 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3357 None => 0,
3358 };
3359 G::gen_vec(rand_max, opts.num_rows - null_count)
3360 })
3361 .collect();
3362
3363 let len = match T::get_physical_type() {
3364 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3365 crate::basic::Type::INT96 => 12,
3366 _ => -1,
3367 };
3368
3369 let fields = vec![Arc::new(
3370 Type::primitive_type_builder("leaf", T::get_physical_type())
3371 .with_repetition(repetition)
3372 .with_converted_type(converted_type)
3373 .with_length(len)
3374 .build()
3375 .unwrap(),
3376 )];
3377
3378 let schema = Arc::new(
3379 Type::group_type_builder("test_schema")
3380 .with_fields(fields)
3381 .build()
3382 .unwrap(),
3383 );
3384
3385 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3386
3387 let mut file = tempfile::tempfile().unwrap();
3388
3389 generate_single_column_file_with_data::<T>(
3390 &values,
3391 def_levels.as_ref(),
3392 file.try_clone().unwrap(), schema,
3394 arrow_field,
3395 &opts,
3396 )
3397 .unwrap();
3398
3399 file.rewind().unwrap();
3400
3401 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3402 opts.enabled_statistics == EnabledStatistics::Page,
3403 ));
3404
3405 let mut builder =
3406 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3407
3408 let expected_data = match opts.row_selections {
3409 Some((selections, row_count)) => {
3410 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3411
3412 let mut skip_data: Vec<Option<T::T>> = vec![];
3413 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3414 for select in dequeue {
3415 if select.skip {
3416 without_skip_data.drain(0..select.row_count);
3417 } else {
3418 skip_data.extend(without_skip_data.drain(0..select.row_count));
3419 }
3420 }
3421 builder = builder.with_row_selection(selections);
3422
3423 assert_eq!(skip_data.len(), row_count);
3424 skip_data
3425 }
3426 None => {
3427 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3429 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3430 expected_data
3431 }
3432 };
3433
3434 let mut expected_data = match opts.row_filter {
3435 Some(filter) => {
3436 let expected_data = expected_data
3437 .into_iter()
3438 .zip(filter.iter())
3439 .filter_map(|(d, f)| f.then(|| d))
3440 .collect();
3441
3442 let mut filter_offset = 0;
3443 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3444 ProjectionMask::all(),
3445 move |b| {
3446 let array = BooleanArray::from_iter(
3447 filter
3448 .iter()
3449 .skip(filter_offset)
3450 .take(b.num_rows())
3451 .map(|x| Some(*x)),
3452 );
3453 filter_offset += b.num_rows();
3454 Ok(array)
3455 },
3456 ))]);
3457
3458 builder = builder.with_row_filter(filter);
3459 expected_data
3460 }
3461 None => expected_data,
3462 };
3463
3464 if let Some(offset) = opts.offset {
3465 builder = builder.with_offset(offset);
3466 expected_data = expected_data.into_iter().skip(offset).collect();
3467 }
3468
3469 if let Some(limit) = opts.limit {
3470 builder = builder.with_limit(limit);
3471 expected_data = expected_data.into_iter().take(limit).collect();
3472 }
3473
3474 let mut record_reader = builder
3475 .with_batch_size(opts.record_batch_size)
3476 .build()
3477 .unwrap();
3478
3479 let mut total_read = 0;
3480 loop {
3481 let maybe_batch = record_reader.next();
3482 if total_read < expected_data.len() {
3483 let end = min(total_read + opts.record_batch_size, expected_data.len());
3484 let batch = maybe_batch.unwrap().unwrap();
3485 assert_eq!(end - total_read, batch.num_rows());
3486
3487 let a = converter(&expected_data[total_read..end]);
3488 let b = batch.column(0);
3489
3490 assert_eq!(a.data_type(), b.data_type());
3491 assert_eq!(a.to_data(), b.to_data());
3492 assert_eq!(
3493 a.as_any().type_id(),
3494 b.as_any().type_id(),
3495 "incorrect type ids"
3496 );
3497
3498 total_read = end;
3499 } else {
3500 assert!(maybe_batch.is_none());
3501 break;
3502 }
3503 }
3504 }
3505
3506 fn gen_expected_data<T: DataType>(
3507 def_levels: Option<&Vec<Vec<i16>>>,
3508 values: &[Vec<T::T>],
3509 ) -> Vec<Option<T::T>> {
3510 let data: Vec<Option<T::T>> = match def_levels {
3511 Some(levels) => {
3512 let mut values_iter = values.iter().flatten();
3513 levels
3514 .iter()
3515 .flatten()
3516 .map(|d| match d {
3517 1 => Some(values_iter.next().cloned().unwrap()),
3518 0 => None,
3519 _ => unreachable!(),
3520 })
3521 .collect()
3522 }
3523 None => values.iter().flatten().cloned().map(Some).collect(),
3524 };
3525 data
3526 }
3527
3528 fn generate_single_column_file_with_data<T: DataType>(
3529 values: &[Vec<T::T>],
3530 def_levels: Option<&Vec<Vec<i16>>>,
3531 file: File,
3532 schema: TypePtr,
3533 field: Option<Field>,
3534 opts: &TestOptions,
3535 ) -> Result<ParquetMetaData> {
3536 let mut writer_props = opts.writer_props();
3537 if let Some(field) = field {
3538 let arrow_schema = Schema::new(vec![field]);
3539 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3540 }
3541
3542 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3543
3544 for (idx, v) in values.iter().enumerate() {
3545 let def_levels = def_levels.map(|d| d[idx].as_slice());
3546 let mut row_group_writer = writer.next_row_group()?;
3547 {
3548 let mut column_writer = row_group_writer
3549 .next_column()?
3550 .expect("Column writer is none!");
3551
3552 column_writer
3553 .typed::<T>()
3554 .write_batch(v, def_levels, None)?;
3555
3556 column_writer.close()?;
3557 }
3558 row_group_writer.close()?;
3559 }
3560
3561 writer.close()
3562 }
3563
3564 fn get_test_file(file_name: &str) -> File {
3565 let mut path = PathBuf::new();
3566 path.push(arrow::util::test_util::arrow_test_data());
3567 path.push(file_name);
3568
3569 File::open(path.as_path()).expect("File not found!")
3570 }
3571
3572 #[test]
3573 fn test_read_structs() {
3574 let testdata = arrow::util::test_util::parquet_test_data();
3578 let path = format!("{testdata}/nested_structs.rust.parquet");
3579 let file = File::open(&path).unwrap();
3580 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3581
3582 for batch in record_batch_reader {
3583 batch.unwrap();
3584 }
3585
3586 let file = File::open(&path).unwrap();
3587 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3588
3589 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3590 let projected_reader = builder
3591 .with_projection(mask)
3592 .with_batch_size(60)
3593 .build()
3594 .unwrap();
3595
3596 let expected_schema = Schema::new(vec![
3597 Field::new(
3598 "roll_num",
3599 ArrowDataType::Struct(Fields::from(vec![Field::new(
3600 "count",
3601 ArrowDataType::UInt64,
3602 false,
3603 )])),
3604 false,
3605 ),
3606 Field::new(
3607 "PC_CUR",
3608 ArrowDataType::Struct(Fields::from(vec![
3609 Field::new("mean", ArrowDataType::Int64, false),
3610 Field::new("sum", ArrowDataType::Int64, false),
3611 ])),
3612 false,
3613 ),
3614 ]);
3615
3616 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3618
3619 for batch in projected_reader {
3620 let batch = batch.unwrap();
3621 assert_eq!(batch.schema().as_ref(), &expected_schema);
3622 }
3623 }
3624
3625 #[test]
3626 fn test_read_structs_by_name() {
3628 let testdata = arrow::util::test_util::parquet_test_data();
3629 let path = format!("{testdata}/nested_structs.rust.parquet");
3630 let file = File::open(&path).unwrap();
3631 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3632
3633 for batch in record_batch_reader {
3634 batch.unwrap();
3635 }
3636
3637 let file = File::open(&path).unwrap();
3638 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3639
3640 let mask = ProjectionMask::columns(
3641 builder.parquet_schema(),
3642 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3643 );
3644 let projected_reader = builder
3645 .with_projection(mask)
3646 .with_batch_size(60)
3647 .build()
3648 .unwrap();
3649
3650 let expected_schema = Schema::new(vec![
3651 Field::new(
3652 "roll_num",
3653 ArrowDataType::Struct(Fields::from(vec![Field::new(
3654 "count",
3655 ArrowDataType::UInt64,
3656 false,
3657 )])),
3658 false,
3659 ),
3660 Field::new(
3661 "PC_CUR",
3662 ArrowDataType::Struct(Fields::from(vec![
3663 Field::new("mean", ArrowDataType::Int64, false),
3664 Field::new("sum", ArrowDataType::Int64, false),
3665 ])),
3666 false,
3667 ),
3668 ]);
3669
3670 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3671
3672 for batch in projected_reader {
3673 let batch = batch.unwrap();
3674 assert_eq!(batch.schema().as_ref(), &expected_schema);
3675 }
3676 }
3677
3678 #[test]
3679 fn test_read_maps() {
3680 let testdata = arrow::util::test_util::parquet_test_data();
3681 let path = format!("{testdata}/nested_maps.snappy.parquet");
3682 let file = File::open(path).unwrap();
3683 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3684
3685 for batch in record_batch_reader {
3686 batch.unwrap();
3687 }
3688 }
3689
3690 #[test]
3692 fn test_unknown_logical_type() {
3693 let message_type = "message uk {
3694 OPTIONAL INT32 uki32 (UNKNOWN);
3695 OPTIONAL INT64 uki64 (UNKNOWN);
3696 OPTIONAL INT96 uki96 (UNKNOWN);
3697 OPTIONAL BOOLEAN ukbool (UNKNOWN);
3698 OPTIONAL FLOAT ukfloat (UNKNOWN);
3699 OPTIONAL DOUBLE ukdbl (UNKNOWN);
3700 OPTIONAL BYTE_ARRAY ukbytes (UNKNOWN);
3701 OPTIONAL FIXED_LEN_BYTE_ARRAY(10) ukflba (UNKNOWN);
3702 }";
3703
3704 let schema = Arc::new(parse_message_type(message_type).unwrap());
3705 let file = tempfile::tempfile().unwrap();
3706
3707 let mut writer =
3708 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3709 .unwrap();
3710
3711 let mut row_group_writer = writer.next_row_group().unwrap();
3712
3713 fn write_nulls<T: DataType>(row_group_writer: &mut SerializedRowGroupWriter<'_, File>) {
3714 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3715 column_writer
3717 .typed::<T>()
3718 .write_batch(&[], Some(&[0, 0, 0, 0]), None)
3719 .unwrap();
3720 column_writer.close().unwrap();
3721 }
3722
3723 write_nulls::<Int32Type>(&mut row_group_writer);
3725
3726 write_nulls::<Int64Type>(&mut row_group_writer);
3728
3729 write_nulls::<Int96Type>(&mut row_group_writer);
3731
3732 write_nulls::<BoolType>(&mut row_group_writer);
3734
3735 write_nulls::<FloatType>(&mut row_group_writer);
3737
3738 write_nulls::<DoubleType>(&mut row_group_writer);
3740
3741 write_nulls::<ByteArrayType>(&mut row_group_writer);
3743
3744 write_nulls::<FixedLenByteArrayType>(&mut row_group_writer);
3746
3747 row_group_writer.close().unwrap();
3748
3749 writer.close().unwrap();
3750
3751 let mut reader = ParquetRecordBatchReader::try_new(file, 4).unwrap();
3752 let batch = reader.next().unwrap().unwrap();
3753
3754 for col in batch.columns() {
3755 assert_eq!(col.len(), 4);
3756 assert_eq!(col.logical_null_count(), 4);
3757 assert_eq!(*col.data_type(), ArrowDataType::Null);
3758 }
3759 }
3760
3761 #[test]
3762 fn test_nested_nullability() {
3763 let message_type = "message nested {
3764 OPTIONAL Group group {
3765 REQUIRED INT32 leaf;
3766 }
3767 }";
3768
3769 let file = tempfile::tempfile().unwrap();
3770 let schema = Arc::new(parse_message_type(message_type).unwrap());
3771
3772 {
3773 let mut writer =
3775 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3776 .unwrap();
3777
3778 {
3779 let mut row_group_writer = writer.next_row_group().unwrap();
3780 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3781
3782 column_writer
3783 .typed::<Int32Type>()
3784 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3785 .unwrap();
3786
3787 column_writer.close().unwrap();
3788 row_group_writer.close().unwrap();
3789 }
3790
3791 writer.close().unwrap();
3792 }
3793
3794 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3795 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3796
3797 let reader = builder.with_projection(mask).build().unwrap();
3798
3799 let expected_schema = Schema::new(vec![Field::new(
3800 "group",
3801 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3802 true,
3803 )]);
3804
3805 let batch = reader.into_iter().next().unwrap().unwrap();
3806 assert_eq!(batch.schema().as_ref(), &expected_schema);
3807 assert_eq!(batch.num_rows(), 4);
3808 assert_eq!(batch.column(0).null_count(), 2);
3809 }
3810
3811 #[test]
3812 fn test_dictionary_preservation() {
3813 let fields = vec![Arc::new(
3814 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3815 .with_repetition(Repetition::OPTIONAL)
3816 .with_converted_type(ConvertedType::UTF8)
3817 .build()
3818 .unwrap(),
3819 )];
3820
3821 let schema = Arc::new(
3822 Type::group_type_builder("test_schema")
3823 .with_fields(fields)
3824 .build()
3825 .unwrap(),
3826 );
3827
3828 let dict_type = ArrowDataType::Dictionary(
3829 Box::new(ArrowDataType::Int32),
3830 Box::new(ArrowDataType::Utf8),
3831 );
3832
3833 let arrow_field = Field::new("leaf", dict_type, true);
3834
3835 let mut file = tempfile::tempfile().unwrap();
3836
3837 let values = vec![
3838 vec![
3839 ByteArray::from("hello"),
3840 ByteArray::from("a"),
3841 ByteArray::from("b"),
3842 ByteArray::from("d"),
3843 ],
3844 vec![
3845 ByteArray::from("c"),
3846 ByteArray::from("a"),
3847 ByteArray::from("b"),
3848 ],
3849 ];
3850
3851 let def_levels = vec![
3852 vec![1, 0, 0, 1, 0, 0, 1, 1],
3853 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3854 ];
3855
3856 let opts = TestOptions {
3857 encoding: Encoding::RLE_DICTIONARY,
3858 ..Default::default()
3859 };
3860
3861 generate_single_column_file_with_data::<ByteArrayType>(
3862 &values,
3863 Some(&def_levels),
3864 file.try_clone().unwrap(), schema,
3866 Some(arrow_field),
3867 &opts,
3868 )
3869 .unwrap();
3870
3871 file.rewind().unwrap();
3872
3873 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3874
3875 let batches = record_reader
3876 .collect::<Result<Vec<RecordBatch>, _>>()
3877 .unwrap();
3878
3879 assert_eq!(batches.len(), 6);
3880 assert!(batches.iter().all(|x| x.num_columns() == 1));
3881
3882 let row_counts = batches
3883 .iter()
3884 .map(|x| (x.num_rows(), x.column(0).null_count()))
3885 .collect::<Vec<_>>();
3886
3887 assert_eq!(
3888 row_counts,
3889 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3890 );
3891
3892 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3893
3894 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3896 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3898 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3899 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3901 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3902 }
3903
3904 #[test]
3905 fn test_read_null_list() {
3906 let testdata = arrow::util::test_util::parquet_test_data();
3907 let path = format!("{testdata}/null_list.parquet");
3908 let file = File::open(path).unwrap();
3909 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3910
3911 let batch = record_batch_reader.next().unwrap().unwrap();
3912 assert_eq!(batch.num_rows(), 1);
3913 assert_eq!(batch.num_columns(), 1);
3914 assert_eq!(batch.column(0).len(), 1);
3915
3916 let list = batch
3917 .column(0)
3918 .as_any()
3919 .downcast_ref::<ListArray>()
3920 .unwrap();
3921 assert_eq!(list.len(), 1);
3922 assert!(list.is_valid(0));
3923
3924 let val = list.value(0);
3925 assert_eq!(val.len(), 0);
3926 }
3927
3928 #[test]
3929 fn test_null_schema_inference() {
3930 let testdata = arrow::util::test_util::parquet_test_data();
3931 let path = format!("{testdata}/null_list.parquet");
3932 let file = File::open(path).unwrap();
3933
3934 let arrow_field = Field::new(
3935 "emptylist",
3936 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3937 true,
3938 );
3939
3940 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3941 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3942 let schema = builder.schema();
3943 assert_eq!(schema.fields().len(), 1);
3944 assert_eq!(schema.field(0), &arrow_field);
3945 }
3946
3947 #[test]
3948 fn test_skip_metadata() {
3949 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3950 let field = Field::new("col", col.data_type().clone(), true);
3951
3952 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3953
3954 let metadata = [("key".to_string(), "value".to_string())]
3955 .into_iter()
3956 .collect();
3957
3958 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3959
3960 assert_ne!(schema_with_metadata, schema_without_metadata);
3961
3962 let batch =
3963 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3964
3965 let file = |version: WriterVersion| {
3966 let props = WriterProperties::builder()
3967 .set_writer_version(version)
3968 .build();
3969
3970 let file = tempfile().unwrap();
3971 let mut writer =
3972 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3973 .unwrap();
3974 writer.write(&batch).unwrap();
3975 writer.close().unwrap();
3976 file
3977 };
3978
3979 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3980
3981 let v1_reader = file(WriterVersion::PARQUET_1_0);
3982 let v2_reader = file(WriterVersion::PARQUET_2_0);
3983
3984 let arrow_reader =
3985 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3986 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3987
3988 let reader =
3989 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3990 .unwrap()
3991 .build()
3992 .unwrap();
3993 assert_eq!(reader.schema(), schema_without_metadata);
3994
3995 let arrow_reader =
3996 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3997 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3998
3999 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4000 .unwrap()
4001 .build()
4002 .unwrap();
4003 assert_eq!(reader.schema(), schema_without_metadata);
4004 }
4005
4006 fn write_parquet_from_iter<I, F>(value: I) -> File
4007 where
4008 I: IntoIterator<Item = (F, ArrayRef)>,
4009 F: AsRef<str>,
4010 {
4011 let batch = RecordBatch::try_from_iter(value).unwrap();
4012 let file = tempfile().unwrap();
4013 let mut writer =
4014 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4015 writer.write(&batch).unwrap();
4016 writer.close().unwrap();
4017 file
4018 }
4019
4020 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4021 where
4022 I: IntoIterator<Item = (F, ArrayRef)>,
4023 F: AsRef<str>,
4024 {
4025 let file = write_parquet_from_iter(value);
4026 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4027 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4028 file.try_clone().unwrap(),
4029 options_with_schema,
4030 );
4031 assert_eq!(builder.err().unwrap().to_string(), expected_error);
4032 }
4033
4034 #[test]
4035 fn test_schema_too_few_columns() {
4036 run_schema_test_with_error(
4037 vec![
4038 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4039 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4040 ],
4041 Arc::new(Schema::new(vec![Field::new(
4042 "int64",
4043 ArrowDataType::Int64,
4044 false,
4045 )])),
4046 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4047 );
4048 }
4049
4050 #[test]
4051 fn test_schema_too_many_columns() {
4052 run_schema_test_with_error(
4053 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4054 Arc::new(Schema::new(vec![
4055 Field::new("int64", ArrowDataType::Int64, false),
4056 Field::new("int32", ArrowDataType::Int32, false),
4057 ])),
4058 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4059 );
4060 }
4061
4062 #[test]
4063 fn test_schema_mismatched_column_names() {
4064 run_schema_test_with_error(
4065 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4066 Arc::new(Schema::new(vec![Field::new(
4067 "other",
4068 ArrowDataType::Int64,
4069 false,
4070 )])),
4071 "Arrow: incompatible arrow schema, expected field named int64 got other",
4072 );
4073 }
4074
4075 #[test]
4076 fn test_schema_incompatible_columns() {
4077 run_schema_test_with_error(
4078 vec![
4079 (
4080 "col1_invalid",
4081 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4082 ),
4083 (
4084 "col2_valid",
4085 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4086 ),
4087 (
4088 "col3_invalid",
4089 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4090 ),
4091 ],
4092 Arc::new(Schema::new(vec![
4093 Field::new("col1_invalid", ArrowDataType::Int32, false),
4094 Field::new("col2_valid", ArrowDataType::Int32, false),
4095 Field::new("col3_invalid", ArrowDataType::Int32, false),
4096 ])),
4097 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4098 );
4099 }
4100
4101 #[test]
4102 fn test_one_incompatible_nested_column() {
4103 let nested_fields = Fields::from(vec![
4104 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4105 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4106 ]);
4107 let nested = StructArray::try_new(
4108 nested_fields,
4109 vec![
4110 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4111 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4112 ],
4113 None,
4114 )
4115 .expect("struct array");
4116 let supplied_nested_fields = Fields::from(vec![
4117 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4118 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4119 ]);
4120 run_schema_test_with_error(
4121 vec![
4122 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4123 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4124 ("nested", Arc::new(nested) as ArrayRef),
4125 ],
4126 Arc::new(Schema::new(vec![
4127 Field::new("col1", ArrowDataType::Int64, false),
4128 Field::new("col2", ArrowDataType::Int32, false),
4129 Field::new(
4130 "nested",
4131 ArrowDataType::Struct(supplied_nested_fields),
4132 false,
4133 ),
4134 ])),
4135 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4136 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4137 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4138 );
4139 }
4140
4141 fn utf8_parquet() -> Bytes {
4143 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4144 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4145 let props = None;
4146 let mut parquet_data = vec![];
4148 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4149 writer.write(&batch).unwrap();
4150 writer.close().unwrap();
4151 Bytes::from(parquet_data)
4152 }
4153
4154 #[test]
4155 fn test_schema_error_bad_types() {
4156 let parquet_data = utf8_parquet();
4158
4159 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4161 "column1",
4162 arrow::datatypes::DataType::Int32,
4163 false,
4164 )]));
4165
4166 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4168 let err =
4169 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4170 .unwrap_err();
4171 assert_eq!(
4172 err.to_string(),
4173 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4174 )
4175 }
4176
4177 #[test]
4178 fn test_schema_error_bad_nullability() {
4179 let parquet_data = utf8_parquet();
4181
4182 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4184 "column1",
4185 arrow::datatypes::DataType::Utf8,
4186 true,
4187 )]));
4188
4189 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4191 let err =
4192 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4193 .unwrap_err();
4194 assert_eq!(
4195 err.to_string(),
4196 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4197 )
4198 }
4199
4200 #[test]
4201 fn test_read_binary_as_utf8() {
4202 let file = write_parquet_from_iter(vec![
4203 (
4204 "binary_to_utf8",
4205 Arc::new(BinaryArray::from(vec![
4206 b"one".as_ref(),
4207 b"two".as_ref(),
4208 b"three".as_ref(),
4209 ])) as ArrayRef,
4210 ),
4211 (
4212 "large_binary_to_large_utf8",
4213 Arc::new(LargeBinaryArray::from(vec![
4214 b"one".as_ref(),
4215 b"two".as_ref(),
4216 b"three".as_ref(),
4217 ])) as ArrayRef,
4218 ),
4219 (
4220 "binary_view_to_utf8_view",
4221 Arc::new(BinaryViewArray::from(vec![
4222 b"one".as_ref(),
4223 b"two".as_ref(),
4224 b"three".as_ref(),
4225 ])) as ArrayRef,
4226 ),
4227 ]);
4228 let supplied_fields = Fields::from(vec![
4229 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4230 Field::new(
4231 "large_binary_to_large_utf8",
4232 ArrowDataType::LargeUtf8,
4233 false,
4234 ),
4235 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4236 ]);
4237
4238 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4239 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4240 file.try_clone().unwrap(),
4241 options,
4242 )
4243 .expect("reader builder with schema")
4244 .build()
4245 .expect("reader with schema");
4246
4247 let batch = arrow_reader.next().unwrap().unwrap();
4248 assert_eq!(batch.num_columns(), 3);
4249 assert_eq!(batch.num_rows(), 3);
4250 assert_eq!(
4251 batch
4252 .column(0)
4253 .as_string::<i32>()
4254 .iter()
4255 .collect::<Vec<_>>(),
4256 vec![Some("one"), Some("two"), Some("three")]
4257 );
4258
4259 assert_eq!(
4260 batch
4261 .column(1)
4262 .as_string::<i64>()
4263 .iter()
4264 .collect::<Vec<_>>(),
4265 vec![Some("one"), Some("two"), Some("three")]
4266 );
4267
4268 assert_eq!(
4269 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4270 vec![Some("one"), Some("two"), Some("three")]
4271 );
4272 }
4273
4274 #[test]
4275 #[should_panic(expected = "Invalid UTF8 sequence at")]
4276 fn test_read_non_utf8_binary_as_utf8() {
4277 let file = write_parquet_from_iter(vec![(
4278 "non_utf8_binary",
4279 Arc::new(BinaryArray::from(vec![
4280 b"\xDE\x00\xFF".as_ref(),
4281 b"\xDE\x01\xAA".as_ref(),
4282 b"\xDE\x02\xFF".as_ref(),
4283 ])) as ArrayRef,
4284 )]);
4285 let supplied_fields = Fields::from(vec![Field::new(
4286 "non_utf8_binary",
4287 ArrowDataType::Utf8,
4288 false,
4289 )]);
4290
4291 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4292 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4293 file.try_clone().unwrap(),
4294 options,
4295 )
4296 .expect("reader builder with schema")
4297 .build()
4298 .expect("reader with schema");
4299 arrow_reader.next().unwrap().unwrap_err();
4300 }
4301
4302 #[test]
4303 fn test_with_schema() {
4304 let nested_fields = Fields::from(vec![
4305 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4306 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4307 ]);
4308
4309 let nested_arrays: Vec<ArrayRef> = vec![
4310 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4311 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4312 ];
4313
4314 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4315
4316 let file = write_parquet_from_iter(vec![
4317 (
4318 "int32_to_ts_second",
4319 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4320 ),
4321 (
4322 "date32_to_date64",
4323 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4324 ),
4325 ("nested", Arc::new(nested) as ArrayRef),
4326 ]);
4327
4328 let supplied_nested_fields = Fields::from(vec![
4329 Field::new(
4330 "utf8_to_dict",
4331 ArrowDataType::Dictionary(
4332 Box::new(ArrowDataType::Int32),
4333 Box::new(ArrowDataType::Utf8),
4334 ),
4335 false,
4336 ),
4337 Field::new(
4338 "int64_to_ts_nano",
4339 ArrowDataType::Timestamp(
4340 arrow::datatypes::TimeUnit::Nanosecond,
4341 Some("+10:00".into()),
4342 ),
4343 false,
4344 ),
4345 ]);
4346
4347 let supplied_schema = Arc::new(Schema::new(vec![
4348 Field::new(
4349 "int32_to_ts_second",
4350 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4351 false,
4352 ),
4353 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4354 Field::new(
4355 "nested",
4356 ArrowDataType::Struct(supplied_nested_fields),
4357 false,
4358 ),
4359 ]));
4360
4361 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4362 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4363 file.try_clone().unwrap(),
4364 options,
4365 )
4366 .expect("reader builder with schema")
4367 .build()
4368 .expect("reader with schema");
4369
4370 assert_eq!(arrow_reader.schema(), supplied_schema);
4371 let batch = arrow_reader.next().unwrap().unwrap();
4372 assert_eq!(batch.num_columns(), 3);
4373 assert_eq!(batch.num_rows(), 4);
4374 assert_eq!(
4375 batch
4376 .column(0)
4377 .as_any()
4378 .downcast_ref::<TimestampSecondArray>()
4379 .expect("downcast to timestamp second")
4380 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4381 .map(|v| v.to_string())
4382 .expect("value as datetime"),
4383 "1970-01-01 01:00:00 +01:00"
4384 );
4385 assert_eq!(
4386 batch
4387 .column(1)
4388 .as_any()
4389 .downcast_ref::<Date64Array>()
4390 .expect("downcast to date64")
4391 .value_as_date(0)
4392 .map(|v| v.to_string())
4393 .expect("value as date"),
4394 "1970-01-01"
4395 );
4396
4397 let nested = batch
4398 .column(2)
4399 .as_any()
4400 .downcast_ref::<StructArray>()
4401 .expect("downcast to struct");
4402
4403 let nested_dict = nested
4404 .column(0)
4405 .as_any()
4406 .downcast_ref::<Int32DictionaryArray>()
4407 .expect("downcast to dictionary");
4408
4409 assert_eq!(
4410 nested_dict
4411 .values()
4412 .as_any()
4413 .downcast_ref::<StringArray>()
4414 .expect("downcast to string")
4415 .iter()
4416 .collect::<Vec<_>>(),
4417 vec![Some("a"), Some("b")]
4418 );
4419
4420 assert_eq!(
4421 nested_dict.keys().iter().collect::<Vec<_>>(),
4422 vec![Some(0), Some(0), Some(0), Some(1)]
4423 );
4424
4425 assert_eq!(
4426 nested
4427 .column(1)
4428 .as_any()
4429 .downcast_ref::<TimestampNanosecondArray>()
4430 .expect("downcast to timestamp nanosecond")
4431 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4432 .map(|v| v.to_string())
4433 .expect("value as datetime"),
4434 "1970-01-01 10:00:00.000000001 +10:00"
4435 );
4436 }
4437
4438 #[test]
4439 fn test_empty_projection() {
4440 let testdata = arrow::util::test_util::parquet_test_data();
4441 let path = format!("{testdata}/alltypes_plain.parquet");
4442 let file = File::open(path).unwrap();
4443
4444 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4445 let file_metadata = builder.metadata().file_metadata();
4446 let expected_rows = file_metadata.num_rows() as usize;
4447
4448 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4449 let batch_reader = builder
4450 .with_projection(mask)
4451 .with_batch_size(2)
4452 .build()
4453 .unwrap();
4454
4455 let mut total_rows = 0;
4456 for maybe_batch in batch_reader {
4457 let batch = maybe_batch.unwrap();
4458 total_rows += batch.num_rows();
4459 assert_eq!(batch.num_columns(), 0);
4460 assert!(batch.num_rows() <= 2);
4461 }
4462
4463 assert_eq!(total_rows, expected_rows);
4464 }
4465
4466 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4467 let schema = Arc::new(Schema::new(vec![Field::new(
4468 "list",
4469 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4470 true,
4471 )]));
4472
4473 let mut buf = Vec::with_capacity(1024);
4474
4475 let mut writer = ArrowWriter::try_new(
4476 &mut buf,
4477 schema.clone(),
4478 Some(
4479 WriterProperties::builder()
4480 .set_max_row_group_row_count(Some(row_group_size))
4481 .build(),
4482 ),
4483 )
4484 .unwrap();
4485 for _ in 0..2 {
4486 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4487 for _ in 0..(batch_size) {
4488 list_builder.append(true);
4489 }
4490 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4491 .unwrap();
4492 writer.write(&batch).unwrap();
4493 }
4494 writer.close().unwrap();
4495
4496 let mut record_reader =
4497 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4498 assert_eq!(
4499 batch_size,
4500 record_reader.next().unwrap().unwrap().num_rows()
4501 );
4502 assert_eq!(
4503 batch_size,
4504 record_reader.next().unwrap().unwrap().num_rows()
4505 );
4506 }
4507
4508 #[test]
4509 fn test_row_group_exact_multiple() {
4510 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4511 test_row_group_batch(8, 8);
4512 test_row_group_batch(10, 8);
4513 test_row_group_batch(8, 10);
4514 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4515 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4516 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4517 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4518 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4519 }
4520
4521 fn get_expected_batches(
4524 column: &RecordBatch,
4525 selection: &RowSelection,
4526 batch_size: usize,
4527 ) -> Vec<RecordBatch> {
4528 let mut expected_batches = vec![];
4529
4530 let mut selection: VecDeque<_> = selection.clone().into();
4531 let mut row_offset = 0;
4532 let mut last_start = None;
4533 while row_offset < column.num_rows() && !selection.is_empty() {
4534 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4535 while batch_remaining > 0 && !selection.is_empty() {
4536 let (to_read, skip) = match selection.front_mut() {
4537 Some(selection) if selection.row_count > batch_remaining => {
4538 selection.row_count -= batch_remaining;
4539 (batch_remaining, selection.skip)
4540 }
4541 Some(_) => {
4542 let select = selection.pop_front().unwrap();
4543 (select.row_count, select.skip)
4544 }
4545 None => break,
4546 };
4547
4548 batch_remaining -= to_read;
4549
4550 match skip {
4551 true => {
4552 if let Some(last_start) = last_start.take() {
4553 expected_batches.push(column.slice(last_start, row_offset - last_start))
4554 }
4555 row_offset += to_read
4556 }
4557 false => {
4558 last_start.get_or_insert(row_offset);
4559 row_offset += to_read
4560 }
4561 }
4562 }
4563 }
4564
4565 if let Some(last_start) = last_start.take() {
4566 expected_batches.push(column.slice(last_start, row_offset - last_start))
4567 }
4568
4569 for batch in &expected_batches[..expected_batches.len() - 1] {
4571 assert_eq!(batch.num_rows(), batch_size);
4572 }
4573
4574 expected_batches
4575 }
4576
4577 fn create_test_selection(
4578 step_len: usize,
4579 total_len: usize,
4580 skip_first: bool,
4581 ) -> (RowSelection, usize) {
4582 let mut remaining = total_len;
4583 let mut skip = skip_first;
4584 let mut vec = vec![];
4585 let mut selected_count = 0;
4586 while remaining != 0 {
4587 let step = if remaining > step_len {
4588 step_len
4589 } else {
4590 remaining
4591 };
4592 vec.push(RowSelector {
4593 row_count: step,
4594 skip,
4595 });
4596 remaining -= step;
4597 if !skip {
4598 selected_count += step;
4599 }
4600 skip = !skip;
4601 }
4602 (vec.into(), selected_count)
4603 }
4604
4605 #[test]
4606 fn test_scan_row_with_selection() {
4607 let testdata = arrow::util::test_util::parquet_test_data();
4608 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4609 let test_file = File::open(&path).unwrap();
4610
4611 let mut serial_reader =
4612 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4613 let data = serial_reader.next().unwrap().unwrap();
4614
4615 let do_test = |batch_size: usize, selection_len: usize| {
4616 for skip_first in [false, true] {
4617 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4618
4619 let expected = get_expected_batches(&data, &selections, batch_size);
4620 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4621 assert_eq!(
4622 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4623 expected,
4624 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4625 );
4626 }
4627 };
4628
4629 do_test(1000, 1000);
4632
4633 do_test(20, 20);
4635
4636 do_test(20, 5);
4638
4639 do_test(20, 5);
4642
4643 fn create_skip_reader(
4644 test_file: &File,
4645 batch_size: usize,
4646 selections: RowSelection,
4647 ) -> ParquetRecordBatchReader {
4648 let options =
4649 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4650 let file = test_file.try_clone().unwrap();
4651 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4652 .unwrap()
4653 .with_batch_size(batch_size)
4654 .with_row_selection(selections)
4655 .build()
4656 .unwrap()
4657 }
4658 }
4659
4660 #[test]
4661 fn test_batch_size_overallocate() {
4662 let testdata = arrow::util::test_util::parquet_test_data();
4663 let path = format!("{testdata}/alltypes_plain.parquet");
4665 let test_file = File::open(path).unwrap();
4666
4667 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4668 let num_rows = builder.metadata.file_metadata().num_rows();
4669 let reader = builder
4670 .with_batch_size(1024)
4671 .with_projection(ProjectionMask::all())
4672 .build()
4673 .unwrap();
4674 assert_ne!(1024, num_rows);
4675 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4676 }
4677
4678 #[test]
4679 fn test_read_with_page_index_enabled() {
4680 let testdata = arrow::util::test_util::parquet_test_data();
4681
4682 {
4683 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4685 let test_file = File::open(path).unwrap();
4686 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4687 test_file,
4688 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4689 )
4690 .unwrap();
4691 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4692 let reader = builder.build().unwrap();
4693 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4694 assert_eq!(batches.len(), 8);
4695 }
4696
4697 {
4698 let path = format!("{testdata}/alltypes_plain.parquet");
4700 let test_file = File::open(path).unwrap();
4701 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4702 test_file,
4703 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4704 )
4705 .unwrap();
4706 assert!(builder.metadata().offset_index().is_none());
4709 let reader = builder.build().unwrap();
4710 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4711 assert_eq!(batches.len(), 1);
4712 }
4713 }
4714
4715 #[test]
4716 fn test_raw_repetition() {
4717 const MESSAGE_TYPE: &str = "
4718 message Log {
4719 OPTIONAL INT32 eventType;
4720 REPEATED INT32 category;
4721 REPEATED group filter {
4722 OPTIONAL INT32 error;
4723 }
4724 }
4725 ";
4726 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4727 let props = Default::default();
4728
4729 let mut buf = Vec::with_capacity(1024);
4730 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4731 let mut row_group_writer = writer.next_row_group().unwrap();
4732
4733 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4735 col_writer
4736 .typed::<Int32Type>()
4737 .write_batch(&[1], Some(&[1]), None)
4738 .unwrap();
4739 col_writer.close().unwrap();
4740 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4742 col_writer
4743 .typed::<Int32Type>()
4744 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4745 .unwrap();
4746 col_writer.close().unwrap();
4747 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4749 col_writer
4750 .typed::<Int32Type>()
4751 .write_batch(&[1], Some(&[1]), Some(&[0]))
4752 .unwrap();
4753 col_writer.close().unwrap();
4754
4755 let rg_md = row_group_writer.close().unwrap();
4756 assert_eq!(rg_md.num_rows(), 1);
4757 writer.close().unwrap();
4758
4759 let bytes = Bytes::from(buf);
4760
4761 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4762 let full = no_mask.next().unwrap().unwrap();
4763
4764 assert_eq!(full.num_columns(), 3);
4765
4766 for idx in 0..3 {
4767 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4768 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4769 let mut reader = b.with_projection(mask).build().unwrap();
4770 let projected = reader.next().unwrap().unwrap();
4771
4772 assert_eq!(projected.num_columns(), 1);
4773 assert_eq!(full.column(idx), projected.column(0));
4774 }
4775 }
4776
4777 #[test]
4778 fn test_read_lz4_raw() {
4779 let testdata = arrow::util::test_util::parquet_test_data();
4780 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4781 let file = File::open(path).unwrap();
4782
4783 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4784 .unwrap()
4785 .collect::<Result<Vec<_>, _>>()
4786 .unwrap();
4787 assert_eq!(batches.len(), 1);
4788 let batch = &batches[0];
4789
4790 assert_eq!(batch.num_columns(), 3);
4791 assert_eq!(batch.num_rows(), 4);
4792
4793 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4795 assert_eq!(
4796 a.values(),
4797 &[1593604800, 1593604800, 1593604801, 1593604801]
4798 );
4799
4800 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4801 let a: Vec<_> = a.iter().flatten().collect();
4802 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4803
4804 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4805 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4806 }
4807
4808 #[test]
4818 fn test_read_lz4_hadoop_fallback() {
4819 for file in [
4820 "hadoop_lz4_compressed.parquet",
4821 "non_hadoop_lz4_compressed.parquet",
4822 ] {
4823 let testdata = arrow::util::test_util::parquet_test_data();
4824 let path = format!("{testdata}/{file}");
4825 let file = File::open(path).unwrap();
4826 let expected_rows = 4;
4827
4828 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4829 .unwrap()
4830 .collect::<Result<Vec<_>, _>>()
4831 .unwrap();
4832 assert_eq!(batches.len(), 1);
4833 let batch = &batches[0];
4834
4835 assert_eq!(batch.num_columns(), 3);
4836 assert_eq!(batch.num_rows(), expected_rows);
4837
4838 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4839 assert_eq!(
4840 a.values(),
4841 &[1593604800, 1593604800, 1593604801, 1593604801]
4842 );
4843
4844 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4845 let b: Vec<_> = b.iter().flatten().collect();
4846 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4847
4848 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4849 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4850 }
4851 }
4852
4853 #[test]
4854 fn test_read_lz4_hadoop_large() {
4855 let testdata = arrow::util::test_util::parquet_test_data();
4856 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4857 let file = File::open(path).unwrap();
4858 let expected_rows = 10000;
4859
4860 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4861 .unwrap()
4862 .collect::<Result<Vec<_>, _>>()
4863 .unwrap();
4864 assert_eq!(batches.len(), 1);
4865 let batch = &batches[0];
4866
4867 assert_eq!(batch.num_columns(), 1);
4868 assert_eq!(batch.num_rows(), expected_rows);
4869
4870 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4871 let a: Vec<_> = a.iter().flatten().collect();
4872 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4873 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4874 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4875 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4876 }
4877
4878 #[test]
4879 #[cfg(feature = "snap")]
4880 fn test_read_nested_lists() {
4881 let testdata = arrow::util::test_util::parquet_test_data();
4882 let path = format!("{testdata}/nested_lists.snappy.parquet");
4883 let file = File::open(path).unwrap();
4884
4885 let f = file.try_clone().unwrap();
4886 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4887 let expected = reader.next().unwrap().unwrap();
4888 assert_eq!(expected.num_rows(), 3);
4889
4890 let selection = RowSelection::from(vec![
4891 RowSelector::skip(1),
4892 RowSelector::select(1),
4893 RowSelector::skip(1),
4894 ]);
4895 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4896 .unwrap()
4897 .with_row_selection(selection)
4898 .build()
4899 .unwrap();
4900
4901 let actual = reader.next().unwrap().unwrap();
4902 assert_eq!(actual.num_rows(), 1);
4903 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4904 }
4905
4906 #[test]
4907 fn test_arbitrary_decimal() {
4908 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4909 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4910 .with_precision_and_scale(19, 0)
4911 .unwrap();
4912 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4913 .with_precision_and_scale(12, 0)
4914 .unwrap();
4915 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4916 .with_precision_and_scale(17, 10)
4917 .unwrap();
4918
4919 let written = RecordBatch::try_from_iter([
4920 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4921 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4922 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4923 ])
4924 .unwrap();
4925
4926 let mut buffer = Vec::with_capacity(1024);
4927 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4928 writer.write(&written).unwrap();
4929 writer.close().unwrap();
4930
4931 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4932 .unwrap()
4933 .collect::<Result<Vec<_>, _>>()
4934 .unwrap();
4935
4936 assert_eq!(&written.slice(0, 8), &read[0]);
4937 }
4938
4939 #[test]
4940 fn test_list_skip() {
4941 let mut list = ListBuilder::new(Int32Builder::new());
4942 list.append_value([Some(1), Some(2)]);
4943 list.append_value([Some(3)]);
4944 list.append_value([Some(4)]);
4945 let list = list.finish();
4946 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4947
4948 let props = WriterProperties::builder()
4950 .set_data_page_row_count_limit(1)
4951 .set_write_batch_size(2)
4952 .build();
4953
4954 let mut buffer = Vec::with_capacity(1024);
4955 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4956 writer.write(&batch).unwrap();
4957 writer.close().unwrap();
4958
4959 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4960 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4961 .unwrap()
4962 .with_row_selection(selection.into())
4963 .build()
4964 .unwrap();
4965 let out = reader.next().unwrap().unwrap();
4966 assert_eq!(out.num_rows(), 1);
4967 assert_eq!(out, batch.slice(2, 1));
4968 }
4969
4970 fn test_decimal32_roundtrip() {
4971 let d = |values: Vec<i32>, p: u8| {
4972 let iter = values.into_iter();
4973 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4974 .with_precision_and_scale(p, 2)
4975 .unwrap()
4976 };
4977
4978 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4979 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4980
4981 let mut buffer = Vec::with_capacity(1024);
4982 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4983 writer.write(&batch).unwrap();
4984 writer.close().unwrap();
4985
4986 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4987 let t1 = builder.parquet_schema().columns()[0].physical_type();
4988 assert_eq!(t1, PhysicalType::INT32);
4989
4990 let mut reader = builder.build().unwrap();
4991 assert_eq!(batch.schema(), reader.schema());
4992
4993 let out = reader.next().unwrap().unwrap();
4994 assert_eq!(batch, out);
4995 }
4996
4997 fn test_decimal64_roundtrip() {
4998 let d = |values: Vec<i64>, p: u8| {
5002 let iter = values.into_iter();
5003 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5004 .with_precision_and_scale(p, 2)
5005 .unwrap()
5006 };
5007
5008 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5009 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5010 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5011
5012 let batch = RecordBatch::try_from_iter([
5013 ("d1", Arc::new(d1) as ArrayRef),
5014 ("d2", Arc::new(d2) as ArrayRef),
5015 ("d3", Arc::new(d3) as ArrayRef),
5016 ])
5017 .unwrap();
5018
5019 let mut buffer = Vec::with_capacity(1024);
5020 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5021 writer.write(&batch).unwrap();
5022 writer.close().unwrap();
5023
5024 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5025 let t1 = builder.parquet_schema().columns()[0].physical_type();
5026 assert_eq!(t1, PhysicalType::INT32);
5027 let t2 = builder.parquet_schema().columns()[1].physical_type();
5028 assert_eq!(t2, PhysicalType::INT64);
5029 let t3 = builder.parquet_schema().columns()[2].physical_type();
5030 assert_eq!(t3, PhysicalType::INT64);
5031
5032 let mut reader = builder.build().unwrap();
5033 assert_eq!(batch.schema(), reader.schema());
5034
5035 let out = reader.next().unwrap().unwrap();
5036 assert_eq!(batch, out);
5037 }
5038
5039 fn test_decimal_roundtrip<T: DecimalType>() {
5040 let d = |values: Vec<usize>, p: u8| {
5045 let iter = values.into_iter().map(T::Native::usize_as);
5046 PrimitiveArray::<T>::from_iter_values(iter)
5047 .with_precision_and_scale(p, 2)
5048 .unwrap()
5049 };
5050
5051 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5052 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5053 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5054 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5055
5056 let batch = RecordBatch::try_from_iter([
5057 ("d1", Arc::new(d1) as ArrayRef),
5058 ("d2", Arc::new(d2) as ArrayRef),
5059 ("d3", Arc::new(d3) as ArrayRef),
5060 ("d4", Arc::new(d4) as ArrayRef),
5061 ])
5062 .unwrap();
5063
5064 let mut buffer = Vec::with_capacity(1024);
5065 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5066 writer.write(&batch).unwrap();
5067 writer.close().unwrap();
5068
5069 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5070 let t1 = builder.parquet_schema().columns()[0].physical_type();
5071 assert_eq!(t1, PhysicalType::INT32);
5072 let t2 = builder.parquet_schema().columns()[1].physical_type();
5073 assert_eq!(t2, PhysicalType::INT64);
5074 let t3 = builder.parquet_schema().columns()[2].physical_type();
5075 assert_eq!(t3, PhysicalType::INT64);
5076 let t4 = builder.parquet_schema().columns()[3].physical_type();
5077 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5078
5079 let mut reader = builder.build().unwrap();
5080 assert_eq!(batch.schema(), reader.schema());
5081
5082 let out = reader.next().unwrap().unwrap();
5083 assert_eq!(batch, out);
5084 }
5085
5086 #[test]
5087 fn test_decimal() {
5088 test_decimal32_roundtrip();
5089 test_decimal64_roundtrip();
5090 test_decimal_roundtrip::<Decimal128Type>();
5091 test_decimal_roundtrip::<Decimal256Type>();
5092 }
5093
5094 #[test]
5095 fn test_list_selection() {
5096 let schema = Arc::new(Schema::new(vec![Field::new_list(
5097 "list",
5098 Field::new_list_field(ArrowDataType::Utf8, true),
5099 false,
5100 )]));
5101 let mut buf = Vec::with_capacity(1024);
5102
5103 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5104
5105 for i in 0..2 {
5106 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5107 for j in 0..1024 {
5108 list_a_builder.values().append_value(format!("{i} {j}"));
5109 list_a_builder.append(true);
5110 }
5111 let batch =
5112 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5113 .unwrap();
5114 writer.write(&batch).unwrap();
5115 }
5116 let _metadata = writer.close().unwrap();
5117
5118 let buf = Bytes::from(buf);
5119 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5120 .unwrap()
5121 .with_row_selection(RowSelection::from(vec![
5122 RowSelector::skip(100),
5123 RowSelector::select(924),
5124 RowSelector::skip(100),
5125 RowSelector::select(924),
5126 ]))
5127 .build()
5128 .unwrap();
5129
5130 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5131 let batch = concat_batches(&schema, &batches).unwrap();
5132
5133 assert_eq!(batch.num_rows(), 924 * 2);
5134 let list = batch.column(0).as_list::<i32>();
5135
5136 for w in list.value_offsets().windows(2) {
5137 assert_eq!(w[0] + 1, w[1])
5138 }
5139 let mut values = list.values().as_string::<i32>().iter();
5140
5141 for i in 0..2 {
5142 for j in 100..1024 {
5143 let expected = format!("{i} {j}");
5144 assert_eq!(values.next().unwrap().unwrap(), &expected);
5145 }
5146 }
5147 }
5148
5149 #[test]
5150 fn test_list_selection_fuzz() {
5151 let mut rng = rng();
5152 let schema = Arc::new(Schema::new(vec![Field::new_list(
5153 "list",
5154 Field::new_list(
5155 Field::LIST_FIELD_DEFAULT_NAME,
5156 Field::new_list_field(ArrowDataType::Int32, true),
5157 true,
5158 ),
5159 true,
5160 )]));
5161 let mut buf = Vec::with_capacity(1024);
5162 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5163
5164 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5165
5166 for _ in 0..2048 {
5167 if rng.random_bool(0.2) {
5168 list_a_builder.append(false);
5169 continue;
5170 }
5171
5172 let list_a_len = rng.random_range(0..10);
5173 let list_b_builder = list_a_builder.values();
5174
5175 for _ in 0..list_a_len {
5176 if rng.random_bool(0.2) {
5177 list_b_builder.append(false);
5178 continue;
5179 }
5180
5181 let list_b_len = rng.random_range(0..10);
5182 let int_builder = list_b_builder.values();
5183 for _ in 0..list_b_len {
5184 match rng.random_bool(0.2) {
5185 true => int_builder.append_null(),
5186 false => int_builder.append_value(rng.random()),
5187 }
5188 }
5189 list_b_builder.append(true)
5190 }
5191 list_a_builder.append(true);
5192 }
5193
5194 let array = Arc::new(list_a_builder.finish());
5195 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5196
5197 writer.write(&batch).unwrap();
5198 let _metadata = writer.close().unwrap();
5199
5200 let buf = Bytes::from(buf);
5201
5202 let cases = [
5203 vec![
5204 RowSelector::skip(100),
5205 RowSelector::select(924),
5206 RowSelector::skip(100),
5207 RowSelector::select(924),
5208 ],
5209 vec![
5210 RowSelector::select(924),
5211 RowSelector::skip(100),
5212 RowSelector::select(924),
5213 RowSelector::skip(100),
5214 ],
5215 vec![
5216 RowSelector::skip(1023),
5217 RowSelector::select(1),
5218 RowSelector::skip(1023),
5219 RowSelector::select(1),
5220 ],
5221 vec![
5222 RowSelector::select(1),
5223 RowSelector::skip(1023),
5224 RowSelector::select(1),
5225 RowSelector::skip(1023),
5226 ],
5227 ];
5228
5229 for batch_size in [100, 1024, 2048] {
5230 for selection in &cases {
5231 let selection = RowSelection::from(selection.clone());
5232 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5233 .unwrap()
5234 .with_row_selection(selection.clone())
5235 .with_batch_size(batch_size)
5236 .build()
5237 .unwrap();
5238
5239 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5240 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5241 assert_eq!(actual.num_rows(), selection.row_count());
5242
5243 let mut batch_offset = 0;
5244 let mut actual_offset = 0;
5245 for selector in selection.iter() {
5246 if selector.skip {
5247 batch_offset += selector.row_count;
5248 continue;
5249 }
5250
5251 assert_eq!(
5252 batch.slice(batch_offset, selector.row_count),
5253 actual.slice(actual_offset, selector.row_count)
5254 );
5255
5256 batch_offset += selector.row_count;
5257 actual_offset += selector.row_count;
5258 }
5259 }
5260 }
5261 }
5262
5263 #[test]
5264 fn test_read_old_nested_list() {
5265 use arrow::datatypes::DataType;
5266 use arrow::datatypes::ToByteSlice;
5267
5268 let testdata = arrow::util::test_util::parquet_test_data();
5269 let path = format!("{testdata}/old_list_structure.parquet");
5278 let test_file = File::open(path).unwrap();
5279
5280 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5282
5283 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5285
5286 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5288 "array",
5289 DataType::Int32,
5290 false,
5291 ))))
5292 .len(2)
5293 .add_buffer(a_value_offsets)
5294 .add_child_data(a_values.into_data())
5295 .build()
5296 .unwrap();
5297 let a = ListArray::from(a_list_data);
5298
5299 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5300 let mut reader = builder.build().unwrap();
5301 let out = reader.next().unwrap().unwrap();
5302 assert_eq!(out.num_rows(), 1);
5303 assert_eq!(out.num_columns(), 1);
5304 let c0 = out.column(0);
5306 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5307 let r0 = c0arr.value(0);
5309 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5310 assert_eq!(r0arr, &a);
5311 }
5312
5313 #[test]
5314 fn test_map_no_value() {
5315 let testdata = arrow::util::test_util::parquet_test_data();
5335 let path = format!("{testdata}/map_no_value.parquet");
5336 let file = File::open(path).unwrap();
5337
5338 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5339 .unwrap()
5340 .build()
5341 .unwrap();
5342 let out = reader.next().unwrap().unwrap();
5343 assert_eq!(out.num_rows(), 3);
5344 assert_eq!(out.num_columns(), 3);
5345 let c0 = out.column(1).as_list::<i32>();
5347 let c1 = out.column(2).as_list::<i32>();
5348 assert_eq!(c0.len(), c1.len());
5349 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5350 }
5351
5352 #[test]
5353 fn test_read_unknown_logical_type() {
5354 let testdata = arrow::util::test_util::parquet_test_data();
5355 let path = format!("{testdata}/unknown-logical-type.parquet");
5356 let test_file = File::open(path).unwrap();
5357
5358 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5359 .expect("Error creating reader builder");
5360
5361 let schema = builder.metadata().file_metadata().schema_descr();
5362 assert_eq!(
5363 schema.column(0).logical_type_ref(),
5364 Some(&LogicalType::String)
5365 );
5366 assert_eq!(
5367 schema.column(1).logical_type_ref(),
5368 Some(&LogicalType::_Unknown { field_id: 2555 })
5369 );
5370 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5371
5372 let mut reader = builder.build().unwrap();
5373 let out = reader.next().unwrap().unwrap();
5374 assert_eq!(out.num_rows(), 3);
5375 assert_eq!(out.num_columns(), 2);
5376 }
5377
5378 #[test]
5379 fn test_read_row_numbers() {
5380 let file = write_parquet_from_iter(vec![(
5381 "value",
5382 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5383 )]);
5384 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5385
5386 let row_number_field = Arc::new(
5387 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5388 );
5389
5390 let options = ArrowReaderOptions::new()
5391 .with_schema(Arc::new(Schema::new(supplied_fields)))
5392 .with_virtual_columns(vec![row_number_field.clone()])
5393 .unwrap();
5394 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5395 file.try_clone().unwrap(),
5396 options,
5397 )
5398 .expect("reader builder with schema")
5399 .build()
5400 .expect("reader with schema");
5401
5402 let batch = arrow_reader.next().unwrap().unwrap();
5403 let schema = Arc::new(Schema::new(vec![
5404 Field::new("value", ArrowDataType::Int64, false),
5405 (*row_number_field).clone(),
5406 ]));
5407
5408 assert_eq!(batch.schema(), schema);
5409 assert_eq!(batch.num_columns(), 2);
5410 assert_eq!(batch.num_rows(), 3);
5411 assert_eq!(
5412 batch
5413 .column(0)
5414 .as_primitive::<types::Int64Type>()
5415 .iter()
5416 .collect::<Vec<_>>(),
5417 vec![Some(1), Some(2), Some(3)]
5418 );
5419 assert_eq!(
5420 batch
5421 .column(1)
5422 .as_primitive::<types::Int64Type>()
5423 .iter()
5424 .collect::<Vec<_>>(),
5425 vec![Some(0), Some(1), Some(2)]
5426 );
5427 }
5428
5429 #[test]
5430 fn test_read_only_row_numbers() {
5431 let file = write_parquet_from_iter(vec![(
5432 "value",
5433 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5434 )]);
5435 let row_number_field = Arc::new(
5436 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5437 );
5438 let options = ArrowReaderOptions::new()
5439 .with_virtual_columns(vec![row_number_field.clone()])
5440 .unwrap();
5441 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5442 let num_columns = metadata
5443 .metadata
5444 .file_metadata()
5445 .schema_descr()
5446 .num_columns();
5447
5448 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5449 .with_projection(ProjectionMask::none(num_columns))
5450 .build()
5451 .expect("reader with schema");
5452
5453 let batch = arrow_reader.next().unwrap().unwrap();
5454 let schema = Arc::new(Schema::new(vec![row_number_field]));
5455
5456 assert_eq!(batch.schema(), schema);
5457 assert_eq!(batch.num_columns(), 1);
5458 assert_eq!(batch.num_rows(), 3);
5459 assert_eq!(
5460 batch
5461 .column(0)
5462 .as_primitive::<types::Int64Type>()
5463 .iter()
5464 .collect::<Vec<_>>(),
5465 vec![Some(0), Some(1), Some(2)]
5466 );
5467 }
5468
5469 #[test]
5470 fn test_read_row_numbers_row_group_order() -> Result<()> {
5471 let array = Int64Array::from_iter_values(5000..5100);
5473 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5474 let mut buffer = Vec::new();
5475 let options = WriterProperties::builder()
5476 .set_max_row_group_row_count(Some(50))
5477 .build();
5478 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5479 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5481 writer.write(&batch_chunk)?;
5482 }
5483 writer.close()?;
5484
5485 let row_number_field = Arc::new(
5486 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5487 );
5488
5489 let buffer = Bytes::from(buffer);
5490
5491 let options =
5492 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5493
5494 let arrow_reader =
5496 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5497 .build()?;
5498
5499 assert_eq!(
5500 ValuesAndRowNumbers {
5501 values: (5000..5100).collect(),
5502 row_numbers: (0..100).collect()
5503 },
5504 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5505 );
5506
5507 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5509 .with_row_groups(vec![1, 0])
5510 .build()?;
5511
5512 assert_eq!(
5513 ValuesAndRowNumbers {
5514 values: (5050..5100).chain(5000..5050).collect(),
5515 row_numbers: (50..100).chain(0..50).collect(),
5516 },
5517 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5518 );
5519
5520 Ok(())
5521 }
5522
5523 #[derive(Debug, PartialEq)]
5524 struct ValuesAndRowNumbers {
5525 values: Vec<i64>,
5526 row_numbers: Vec<i64>,
5527 }
5528 impl ValuesAndRowNumbers {
5529 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5530 let mut values = vec![];
5531 let mut row_numbers = vec![];
5532 for batch in reader {
5533 let batch = batch.expect("Could not read batch");
5534 values.extend(
5535 batch
5536 .column_by_name("col")
5537 .expect("Could not get col column")
5538 .as_primitive::<arrow::datatypes::Int64Type>()
5539 .iter()
5540 .map(|v| v.expect("Could not get value")),
5541 );
5542
5543 row_numbers.extend(
5544 batch
5545 .column_by_name("row_number")
5546 .expect("Could not get row_number column")
5547 .as_primitive::<arrow::datatypes::Int64Type>()
5548 .iter()
5549 .map(|v| v.expect("Could not get row number"))
5550 .collect::<Vec<_>>(),
5551 );
5552 }
5553 Self {
5554 values,
5555 row_numbers,
5556 }
5557 }
5558 }
5559
5560 #[test]
5561 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5562 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5564 assert_eq!(
5565 ArrowReaderOptions::new()
5566 .with_virtual_columns(vec![regular_field])
5567 .unwrap_err()
5568 .to_string(),
5569 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5570 );
5571 }
5572
5573 #[test]
5574 fn test_row_numbers_with_multiple_row_groups() {
5575 test_row_numbers_with_multiple_row_groups_helper(
5576 false,
5577 |path, selection, _row_filter, batch_size| {
5578 let file = File::open(path).unwrap();
5579 let row_number_field = Arc::new(
5580 Field::new("row_number", ArrowDataType::Int64, false)
5581 .with_extension_type(RowNumber),
5582 );
5583 let options = ArrowReaderOptions::new()
5584 .with_virtual_columns(vec![row_number_field])
5585 .unwrap();
5586 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5587 .unwrap()
5588 .with_row_selection(selection)
5589 .with_batch_size(batch_size)
5590 .build()
5591 .expect("Could not create reader");
5592 reader
5593 .collect::<Result<Vec<_>, _>>()
5594 .expect("Could not read")
5595 },
5596 );
5597 }
5598
5599 #[test]
5600 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5601 test_row_numbers_with_multiple_row_groups_helper(
5602 true,
5603 |path, selection, row_filter, batch_size| {
5604 let file = File::open(path).unwrap();
5605 let row_number_field = Arc::new(
5606 Field::new("row_number", ArrowDataType::Int64, false)
5607 .with_extension_type(RowNumber),
5608 );
5609 let options = ArrowReaderOptions::new()
5610 .with_virtual_columns(vec![row_number_field])
5611 .unwrap();
5612 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5613 .unwrap()
5614 .with_row_selection(selection)
5615 .with_batch_size(batch_size)
5616 .with_row_filter(row_filter.expect("No filter"))
5617 .build()
5618 .expect("Could not create reader");
5619 reader
5620 .collect::<Result<Vec<_>, _>>()
5621 .expect("Could not read")
5622 },
5623 );
5624 }
5625
5626 #[test]
5627 fn test_read_row_group_indices() {
5628 let array1 = Int64Array::from(vec![1, 2]);
5630 let array2 = Int64Array::from(vec![3, 4]);
5631 let array3 = Int64Array::from(vec![5, 6]);
5632
5633 let batch1 =
5634 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5635 let batch2 =
5636 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5637 let batch3 =
5638 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5639
5640 let mut buffer = Vec::new();
5641 let options = WriterProperties::builder()
5642 .set_max_row_group_row_count(Some(2))
5643 .build();
5644 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5645 writer.write(&batch1).unwrap();
5646 writer.write(&batch2).unwrap();
5647 writer.write(&batch3).unwrap();
5648 writer.close().unwrap();
5649
5650 let file = Bytes::from(buffer);
5651 let row_group_index_field = Arc::new(
5652 Field::new("row_group_index", ArrowDataType::Int64, false)
5653 .with_extension_type(RowGroupIndex),
5654 );
5655
5656 let options = ArrowReaderOptions::new()
5657 .with_virtual_columns(vec![row_group_index_field.clone()])
5658 .unwrap();
5659 let mut arrow_reader =
5660 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5661 .expect("reader builder with virtual columns")
5662 .build()
5663 .expect("reader with virtual columns");
5664
5665 let batch = arrow_reader.next().unwrap().unwrap();
5666
5667 assert_eq!(batch.num_columns(), 2);
5668 assert_eq!(batch.num_rows(), 6);
5669
5670 assert_eq!(
5671 batch
5672 .column(0)
5673 .as_primitive::<types::Int64Type>()
5674 .iter()
5675 .collect::<Vec<_>>(),
5676 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5677 );
5678
5679 assert_eq!(
5680 batch
5681 .column(1)
5682 .as_primitive::<types::Int64Type>()
5683 .iter()
5684 .collect::<Vec<_>>(),
5685 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5686 );
5687 }
5688
5689 #[test]
5690 fn test_read_only_row_group_indices() {
5691 let array1 = Int64Array::from(vec![1, 2, 3]);
5692 let array2 = Int64Array::from(vec![4, 5]);
5693
5694 let batch1 =
5695 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5696 let batch2 =
5697 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5698
5699 let mut buffer = Vec::new();
5700 let options = WriterProperties::builder()
5701 .set_max_row_group_row_count(Some(3))
5702 .build();
5703 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5704 writer.write(&batch1).unwrap();
5705 writer.write(&batch2).unwrap();
5706 writer.close().unwrap();
5707
5708 let file = Bytes::from(buffer);
5709 let row_group_index_field = Arc::new(
5710 Field::new("row_group_index", ArrowDataType::Int64, false)
5711 .with_extension_type(RowGroupIndex),
5712 );
5713
5714 let options = ArrowReaderOptions::new()
5715 .with_virtual_columns(vec![row_group_index_field.clone()])
5716 .unwrap();
5717 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5718 let num_columns = metadata
5719 .metadata
5720 .file_metadata()
5721 .schema_descr()
5722 .num_columns();
5723
5724 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5725 .with_projection(ProjectionMask::none(num_columns))
5726 .build()
5727 .expect("reader with virtual columns only");
5728
5729 let batch = arrow_reader.next().unwrap().unwrap();
5730 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5731
5732 assert_eq!(batch.schema(), schema);
5733 assert_eq!(batch.num_columns(), 1);
5734 assert_eq!(batch.num_rows(), 5);
5735
5736 assert_eq!(
5737 batch
5738 .column(0)
5739 .as_primitive::<types::Int64Type>()
5740 .iter()
5741 .collect::<Vec<_>>(),
5742 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5743 );
5744 }
5745
5746 #[test]
5747 fn test_read_row_group_indices_with_selection() -> Result<()> {
5748 let mut buffer = Vec::new();
5749 let options = WriterProperties::builder()
5750 .set_max_row_group_row_count(Some(10))
5751 .build();
5752
5753 let schema = Arc::new(Schema::new(vec![Field::new(
5754 "value",
5755 ArrowDataType::Int64,
5756 false,
5757 )]));
5758
5759 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5760
5761 for i in 0..3 {
5763 let start = i * 10;
5764 let array = Int64Array::from_iter_values(start..start + 10);
5765 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5766 writer.write(&batch)?;
5767 }
5768 writer.close()?;
5769
5770 let file = Bytes::from(buffer);
5771 let row_group_index_field = Arc::new(
5772 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5773 );
5774
5775 let options =
5776 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5777
5778 let arrow_reader =
5780 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5781 .with_row_groups(vec![2, 1, 0])
5782 .build()?;
5783
5784 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5785 let combined = concat_batches(&batches[0].schema(), &batches)?;
5786
5787 let values = combined.column(0).as_primitive::<types::Int64Type>();
5788 let first_val = values.value(0);
5789 let last_val = values.value(combined.num_rows() - 1);
5790 assert_eq!(first_val, 20);
5792 assert_eq!(last_val, 9);
5794
5795 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5796 assert_eq!(rg_indices.value(0), 2);
5797 assert_eq!(rg_indices.value(10), 1);
5798 assert_eq!(rg_indices.value(20), 0);
5799
5800 Ok(())
5801 }
5802
5803 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5804 use_filter: bool,
5805 test_case: F,
5806 ) where
5807 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5808 {
5809 let seed: u64 = random();
5810 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5811 let mut rng = StdRng::seed_from_u64(seed);
5812
5813 use tempfile::TempDir;
5814 let tempdir = TempDir::new().expect("Could not create temp dir");
5815
5816 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5817
5818 let path = tempdir.path().join("test.parquet");
5819 std::fs::write(&path, bytes).expect("Could not write file");
5820
5821 let mut case = vec![];
5822 let mut remaining = metadata.file_metadata().num_rows();
5823 while remaining > 0 {
5824 let row_count = rng.random_range(1..=remaining);
5825 remaining -= row_count;
5826 case.push(RowSelector {
5827 row_count: row_count as usize,
5828 skip: rng.random_bool(0.5),
5829 });
5830 }
5831
5832 let filter = use_filter.then(|| {
5833 let filter = (0..metadata.file_metadata().num_rows())
5834 .map(|_| rng.random_bool(0.99))
5835 .collect::<Vec<_>>();
5836 let mut filter_offset = 0;
5837 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5838 ProjectionMask::all(),
5839 move |b| {
5840 let array = BooleanArray::from_iter(
5841 filter
5842 .iter()
5843 .skip(filter_offset)
5844 .take(b.num_rows())
5845 .map(|x| Some(*x)),
5846 );
5847 filter_offset += b.num_rows();
5848 Ok(array)
5849 },
5850 ))])
5851 });
5852
5853 let selection = RowSelection::from(case);
5854 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5855
5856 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5857 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5858 return;
5859 }
5860 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5861 .expect("Failed to concatenate");
5862 let values = actual
5864 .column(0)
5865 .as_primitive::<types::Int64Type>()
5866 .iter()
5867 .collect::<Vec<_>>();
5868 let row_numbers = actual
5869 .column(1)
5870 .as_primitive::<types::Int64Type>()
5871 .iter()
5872 .collect::<Vec<_>>();
5873 assert_eq!(
5874 row_numbers
5875 .into_iter()
5876 .map(|number| number.map(|number| number + 1))
5877 .collect::<Vec<_>>(),
5878 values
5879 );
5880 }
5881
5882 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5883 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5884 "value",
5885 ArrowDataType::Int64,
5886 false,
5887 )])));
5888
5889 let mut buf = Vec::with_capacity(1024);
5890 let mut writer =
5891 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5892
5893 let mut values = 1..=rng.random_range(1..4096);
5894 while !values.is_empty() {
5895 let batch_values = values
5896 .by_ref()
5897 .take(rng.random_range(1..4096))
5898 .collect::<Vec<_>>();
5899 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5900 let batch =
5901 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5902 writer.write(&batch).expect("Could not write batch");
5903 writer.flush().expect("Could not flush");
5904 }
5905 let metadata = writer.close().expect("Could not close writer");
5906
5907 (Bytes::from(buf), metadata)
5908 }
5909}