1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub const DEFAULT_BATCH_SIZE: usize = 1024;
62
63pub struct ArrowReaderBuilder<T> {
111 pub(crate) input: T,
119
120 pub(crate) metadata: Arc<ParquetMetaData>,
121
122 pub(crate) schema: SchemaRef,
123
124 pub(crate) fields: Option<Arc<ParquetField>>,
125
126 pub(crate) batch_size: usize,
127
128 pub(crate) row_groups: Option<Vec<usize>>,
129
130 pub(crate) projection: ProjectionMask,
131
132 pub(crate) filter: Option<RowFilter>,
133
134 pub(crate) selection: Option<RowSelection>,
135
136 pub(crate) row_selection_policy: RowSelectionPolicy,
137
138 pub(crate) limit: Option<usize>,
139
140 pub(crate) offset: Option<usize>,
141
142 pub(crate) metrics: ArrowReaderMetrics,
143
144 pub(crate) max_predicate_cache_size: usize,
145}
146
147impl<T: Debug> Debug for ArrowReaderBuilder<T> {
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("ArrowReaderBuilder<T>")
150 .field("input", &self.input)
151 .field("metadata", &self.metadata)
152 .field("schema", &self.schema)
153 .field("fields", &self.fields)
154 .field("batch_size", &self.batch_size)
155 .field("row_groups", &self.row_groups)
156 .field("projection", &self.projection)
157 .field("filter", &self.filter)
158 .field("selection", &self.selection)
159 .field("row_selection_policy", &self.row_selection_policy)
160 .field("limit", &self.limit)
161 .field("offset", &self.offset)
162 .field("metrics", &self.metrics)
163 .finish()
164 }
165}
166
167impl<T> ArrowReaderBuilder<T> {
168 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
169 Self {
170 input,
171 metadata: metadata.metadata,
172 schema: metadata.schema,
173 fields: metadata.fields,
174 batch_size: DEFAULT_BATCH_SIZE,
175 row_groups: None,
176 projection: ProjectionMask::all(),
177 filter: None,
178 selection: None,
179 row_selection_policy: RowSelectionPolicy::default(),
180 limit: None,
181 offset: None,
182 metrics: ArrowReaderMetrics::Disabled,
183 max_predicate_cache_size: 100 * 1024 * 1024, }
185 }
186
187 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
189 &self.metadata
190 }
191
192 pub fn parquet_schema(&self) -> &SchemaDescriptor {
194 self.metadata.file_metadata().schema_descr()
195 }
196
197 pub fn schema(&self) -> &SchemaRef {
199 &self.schema
200 }
201
202 pub fn with_batch_size(self, batch_size: usize) -> Self {
205 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
207 Self { batch_size, ..self }
208 }
209
210 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
214 Self {
215 row_groups: Some(row_groups),
216 ..self
217 }
218 }
219
220 pub fn with_projection(self, mask: ProjectionMask) -> Self {
222 Self {
223 projection: mask,
224 ..self
225 }
226 }
227
228 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
232 Self {
233 row_selection_policy: policy,
234 ..self
235 }
236 }
237
238 pub fn with_row_selection(self, selection: RowSelection) -> Self {
298 Self {
299 selection: Some(selection),
300 ..self
301 }
302 }
303
304 pub fn with_row_filter(self, filter: RowFilter) -> Self {
344 Self {
345 filter: Some(filter),
346 ..self
347 }
348 }
349
350 pub fn with_limit(self, limit: usize) -> Self {
358 Self {
359 limit: Some(limit),
360 ..self
361 }
362 }
363
364 pub fn with_offset(self, offset: usize) -> Self {
372 Self {
373 offset: Some(offset),
374 ..self
375 }
376 }
377
378 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
413 Self { metrics, ..self }
414 }
415
416 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
431 Self {
432 max_predicate_cache_size,
433 ..self
434 }
435 }
436}
437
438#[derive(Debug, Clone, Default)]
453pub struct ArrowReaderOptions {
454 skip_arrow_metadata: bool,
456 supplied_schema: Option<SchemaRef>,
461
462 pub(crate) column_index: PageIndexPolicy,
463 pub(crate) offset_index: PageIndexPolicy,
464
465 metadata_options: ParquetMetaDataOptions,
467 #[cfg(feature = "encryption")]
469 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
470
471 virtual_columns: Vec<FieldRef>,
472}
473
474impl ArrowReaderOptions {
475 pub fn new() -> Self {
477 Self::default()
478 }
479
480 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
487 Self {
488 skip_arrow_metadata,
489 ..self
490 }
491 }
492
493 pub fn with_schema(self, schema: SchemaRef) -> Self {
602 Self {
603 supplied_schema: Some(schema),
604 skip_arrow_metadata: true,
605 ..self
606 }
607 }
608
609 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
610 pub fn with_page_index(self, page_index: bool) -> Self {
623 self.with_page_index_policy(PageIndexPolicy::from(page_index))
624 }
625
626 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
634 self.with_column_index_policy(policy)
635 .with_offset_index_policy(policy)
636 }
637
638 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
645 self.column_index = policy;
646 self
647 }
648
649 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
656 self.offset_index = policy;
657 self
658 }
659
660 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
666 self.metadata_options.set_schema(schema);
667 self
668 }
669
670 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
681 self.metadata_options.set_encoding_stats_as_mask(val);
682 self
683 }
684
685 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
690 self.metadata_options.set_encoding_stats_policy(policy);
691 self
692 }
693
694 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
699 self.metadata_options.set_column_stats_policy(policy);
700 self
701 }
702
703 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
708 self.metadata_options.set_size_stats_policy(policy);
709 self
710 }
711
712 #[cfg(feature = "encryption")]
716 pub fn with_file_decryption_properties(
717 self,
718 file_decryption_properties: Arc<FileDecryptionProperties>,
719 ) -> Self {
720 Self {
721 file_decryption_properties: Some(file_decryption_properties),
722 ..self
723 }
724 }
725
726 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
779 for field in &virtual_columns {
781 if !is_virtual_column(field) {
782 return Err(ParquetError::General(format!(
783 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
784 field.name()
785 )));
786 }
787 }
788 Ok(Self {
789 virtual_columns,
790 ..self
791 })
792 }
793
794 #[deprecated(
795 since = "57.2.0",
796 note = "Use `column_index_policy` or `offset_index_policy` instead"
797 )]
798 pub fn page_index(&self) -> bool {
805 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
806 }
807
808 pub fn offset_index_policy(&self) -> PageIndexPolicy {
813 self.offset_index
814 }
815
816 pub fn column_index_policy(&self) -> PageIndexPolicy {
821 self.column_index
822 }
823
824 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
826 &self.metadata_options
827 }
828
829 #[cfg(feature = "encryption")]
834 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
835 self.file_decryption_properties.as_ref()
836 }
837}
838
839#[derive(Debug, Clone)]
854pub struct ArrowReaderMetadata {
855 pub(crate) metadata: Arc<ParquetMetaData>,
857 pub(crate) schema: SchemaRef,
859 pub(crate) fields: Option<Arc<ParquetField>>,
861}
862
863impl ArrowReaderMetadata {
864 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
876 let metadata = ParquetMetaDataReader::new()
877 .with_column_index_policy(options.column_index)
878 .with_offset_index_policy(options.offset_index)
879 .with_metadata_options(Some(options.metadata_options.clone()));
880 #[cfg(feature = "encryption")]
881 let metadata = metadata.with_decryption_properties(
882 options.file_decryption_properties.as_ref().map(Arc::clone),
883 );
884 let metadata = metadata.parse_and_finish(reader)?;
885 Self::try_new(Arc::new(metadata), options)
886 }
887
888 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
896 match options.supplied_schema {
897 Some(supplied_schema) => Self::with_supplied_schema(
898 metadata,
899 supplied_schema.clone(),
900 &options.virtual_columns,
901 ),
902 None => {
903 let kv_metadata = match options.skip_arrow_metadata {
904 true => None,
905 false => metadata.file_metadata().key_value_metadata(),
906 };
907
908 let (schema, fields) = parquet_to_arrow_schema_and_fields(
909 metadata.file_metadata().schema_descr(),
910 ProjectionMask::all(),
911 kv_metadata,
912 &options.virtual_columns,
913 )?;
914
915 Ok(Self {
916 metadata,
917 schema: Arc::new(schema),
918 fields: fields.map(Arc::new),
919 })
920 }
921 }
922 }
923
924 fn with_supplied_schema(
925 metadata: Arc<ParquetMetaData>,
926 supplied_schema: SchemaRef,
927 virtual_columns: &[FieldRef],
928 ) -> Result<Self> {
929 let parquet_schema = metadata.file_metadata().schema_descr();
930 let field_levels = parquet_to_arrow_field_levels_with_virtual(
931 parquet_schema,
932 ProjectionMask::all(),
933 Some(supplied_schema.fields()),
934 virtual_columns,
935 )?;
936 let fields = field_levels.fields;
937 let inferred_len = fields.len();
938 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
939 if inferred_len != supplied_len {
943 return Err(arrow_err!(format!(
944 "Incompatible supplied Arrow schema: expected {} columns received {}",
945 inferred_len, supplied_len
946 )));
947 }
948
949 let mut errors = Vec::new();
950
951 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
952
953 for (field1, field2) in field_iter {
954 if field1.data_type() != field2.data_type() {
955 errors.push(format!(
956 "data type mismatch for field {}: requested {} but found {}",
957 field1.name(),
958 field1.data_type(),
959 field2.data_type()
960 ));
961 }
962 if field1.is_nullable() != field2.is_nullable() {
963 errors.push(format!(
964 "nullability mismatch for field {}: expected {:?} but found {:?}",
965 field1.name(),
966 field1.is_nullable(),
967 field2.is_nullable()
968 ));
969 }
970 if field1.metadata() != field2.metadata() {
971 errors.push(format!(
972 "metadata mismatch for field {}: expected {:?} but found {:?}",
973 field1.name(),
974 field1.metadata(),
975 field2.metadata()
976 ));
977 }
978 }
979
980 if !errors.is_empty() {
981 let message = errors.join(", ");
982 return Err(ParquetError::ArrowError(format!(
983 "Incompatible supplied Arrow schema: {message}",
984 )));
985 }
986
987 Ok(Self {
988 metadata,
989 schema: supplied_schema,
990 fields: field_levels.levels.map(Arc::new),
991 })
992 }
993
994 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
996 &self.metadata
997 }
998
999 pub fn parquet_schema(&self) -> &SchemaDescriptor {
1001 self.metadata.file_metadata().schema_descr()
1002 }
1003
1004 pub fn schema(&self) -> &SchemaRef {
1006 &self.schema
1007 }
1008}
1009
1010#[doc(hidden)]
1011pub struct SyncReader<T: ChunkReader>(T);
1013
1014impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1015 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1016 f.debug_tuple("SyncReader").field(&self.0).finish()
1017 }
1018}
1019
1020pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1027
1028impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1029 pub fn try_new(reader: T) -> Result<Self> {
1060 Self::try_new_with_options(reader, Default::default())
1061 }
1062
1063 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1068 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1069 Ok(Self::new_with_metadata(reader, metadata))
1070 }
1071
1072 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1111 Self::new_builder(SyncReader(input), metadata)
1112 }
1113
1114 pub fn get_row_group_column_bloom_filter(
1120 &self,
1121 row_group_idx: usize,
1122 column_idx: usize,
1123 ) -> Result<Option<Sbbf>> {
1124 let metadata = self.metadata.row_group(row_group_idx);
1125 let column_metadata = metadata.column(column_idx);
1126
1127 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1128 offset
1129 .try_into()
1130 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1131 } else {
1132 return Ok(None);
1133 };
1134
1135 let buffer = match column_metadata.bloom_filter_length() {
1136 Some(length) => self.input.0.get_bytes(offset, length as usize),
1137 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1138 }?;
1139
1140 let (header, bitset_offset) =
1141 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1142
1143 match header.algorithm {
1144 BloomFilterAlgorithm::BLOCK => {
1145 }
1147 }
1148 match header.compression {
1149 BloomFilterCompression::UNCOMPRESSED => {
1150 }
1152 }
1153 match header.hash {
1154 BloomFilterHash::XXHASH => {
1155 }
1157 }
1158
1159 let bitset = match column_metadata.bloom_filter_length() {
1160 Some(_) => buffer.slice(
1161 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1162 - TryInto::<usize>::try_into(offset).unwrap())..,
1163 ),
1164 None => {
1165 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1166 ParquetError::General("Bloom filter length is invalid".to_string())
1167 })?;
1168 self.input.0.get_bytes(bitset_offset, bitset_length)?
1169 }
1170 };
1171 Ok(Some(Sbbf::new(&bitset)))
1172 }
1173
1174 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1178 let Self {
1179 input,
1180 metadata,
1181 schema: _,
1182 fields,
1183 batch_size,
1184 row_groups,
1185 projection,
1186 mut filter,
1187 selection,
1188 row_selection_policy,
1189 limit,
1190 offset,
1191 metrics,
1192 max_predicate_cache_size: _,
1194 } = self;
1195
1196 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1198
1199 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1200
1201 let reader = ReaderRowGroups {
1202 reader: Arc::new(input.0),
1203 metadata,
1204 row_groups,
1205 };
1206
1207 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1208 .with_selection(selection)
1209 .with_row_selection_policy(row_selection_policy);
1210
1211 if let Some(filter) = filter.as_mut() {
1213 for predicate in filter.predicates.iter_mut() {
1214 if !plan_builder.selects_any() {
1216 break;
1217 }
1218
1219 let mut cache_projection = predicate.projection().clone();
1220 cache_projection.intersect(&projection);
1221
1222 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1223 .with_batch_size(batch_size)
1224 .with_parquet_metadata(&reader.metadata)
1225 .build_array_reader(fields.as_deref(), predicate.projection())?;
1226
1227 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1228 }
1229 }
1230
1231 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1232 .with_batch_size(batch_size)
1233 .with_parquet_metadata(&reader.metadata)
1234 .build_array_reader(fields.as_deref(), &projection)?;
1235
1236 let read_plan = plan_builder
1237 .limited(reader.num_rows())
1238 .with_offset(offset)
1239 .with_limit(limit)
1240 .build_limited()
1241 .build();
1242
1243 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1244 }
1245}
1246
1247struct ReaderRowGroups<T: ChunkReader> {
1248 reader: Arc<T>,
1249
1250 metadata: Arc<ParquetMetaData>,
1251 row_groups: Vec<usize>,
1253}
1254
1255impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1256 fn num_rows(&self) -> usize {
1257 let meta = self.metadata.row_groups();
1258 self.row_groups
1259 .iter()
1260 .map(|x| meta[*x].num_rows() as usize)
1261 .sum()
1262 }
1263
1264 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1265 Ok(Box::new(ReaderPageIterator {
1266 column_idx: i,
1267 reader: self.reader.clone(),
1268 metadata: self.metadata.clone(),
1269 row_groups: self.row_groups.clone().into_iter(),
1270 }))
1271 }
1272
1273 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1274 Box::new(
1275 self.row_groups
1276 .iter()
1277 .map(move |i| self.metadata.row_group(*i)),
1278 )
1279 }
1280
1281 fn metadata(&self) -> &ParquetMetaData {
1282 self.metadata.as_ref()
1283 }
1284}
1285
1286struct ReaderPageIterator<T: ChunkReader> {
1287 reader: Arc<T>,
1288 column_idx: usize,
1289 row_groups: std::vec::IntoIter<usize>,
1290 metadata: Arc<ParquetMetaData>,
1291}
1292
1293impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1294 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1296 let rg = self.metadata.row_group(rg_idx);
1297 let column_chunk_metadata = rg.column(self.column_idx);
1298 let offset_index = self.metadata.offset_index();
1299 let page_locations = offset_index
1302 .filter(|i| !i[rg_idx].is_empty())
1303 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1304 let total_rows = rg.num_rows() as usize;
1305 let reader = self.reader.clone();
1306
1307 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1308 .add_crypto_context(
1309 rg_idx,
1310 self.column_idx,
1311 self.metadata.as_ref(),
1312 column_chunk_metadata,
1313 )
1314 }
1315}
1316
1317impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1318 type Item = Result<Box<dyn PageReader>>;
1319
1320 fn next(&mut self) -> Option<Self::Item> {
1321 let rg_idx = self.row_groups.next()?;
1322 let page_reader = self
1323 .next_page_reader(rg_idx)
1324 .map(|page_reader| Box::new(page_reader) as _);
1325 Some(page_reader)
1326 }
1327}
1328
1329impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1330
1331pub struct ParquetRecordBatchReader {
1342 array_reader: Box<dyn ArrayReader>,
1343 schema: SchemaRef,
1344 read_plan: ReadPlan,
1345}
1346
1347impl Debug for ParquetRecordBatchReader {
1348 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1349 f.debug_struct("ParquetRecordBatchReader")
1350 .field("array_reader", &"...")
1351 .field("schema", &self.schema)
1352 .field("read_plan", &self.read_plan)
1353 .finish()
1354 }
1355}
1356
1357impl Iterator for ParquetRecordBatchReader {
1358 type Item = Result<RecordBatch, ArrowError>;
1359
1360 fn next(&mut self) -> Option<Self::Item> {
1361 self.next_inner()
1362 .map_err(|arrow_err| arrow_err.into())
1363 .transpose()
1364 }
1365}
1366
1367impl ParquetRecordBatchReader {
1368 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1374 let mut read_records = 0;
1375 let batch_size = self.batch_size();
1376 if batch_size == 0 {
1377 return Ok(None);
1378 }
1379 match self.read_plan.row_selection_cursor_mut() {
1380 RowSelectionCursor::Mask(mask_cursor) => {
1381 while !mask_cursor.is_empty() {
1384 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1385 return Ok(None);
1386 };
1387
1388 if mask_chunk.initial_skip > 0 {
1389 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1390 if skipped != mask_chunk.initial_skip {
1391 return Err(general_err!(
1392 "failed to skip rows, expected {}, got {}",
1393 mask_chunk.initial_skip,
1394 skipped
1395 ));
1396 }
1397 }
1398
1399 if mask_chunk.chunk_rows == 0 {
1400 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1401 return Ok(None);
1402 }
1403 continue;
1404 }
1405
1406 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1407
1408 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1409 if read == 0 {
1410 return Err(general_err!(
1411 "reached end of column while expecting {} rows",
1412 mask_chunk.chunk_rows
1413 ));
1414 }
1415 if read != mask_chunk.chunk_rows {
1416 return Err(general_err!(
1417 "insufficient rows read from array reader - expected {}, got {}",
1418 mask_chunk.chunk_rows,
1419 read
1420 ));
1421 }
1422
1423 let array = self.array_reader.consume_batch()?;
1424 let struct_array = array.as_struct_opt().ok_or_else(|| {
1427 ArrowError::ParquetError(
1428 "Struct array reader should return struct array".to_string(),
1429 )
1430 })?;
1431
1432 let filtered_batch =
1433 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1434
1435 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1436 return Err(general_err!(
1437 "filtered rows mismatch selection - expected {}, got {}",
1438 mask_chunk.selected_rows,
1439 filtered_batch.num_rows()
1440 ));
1441 }
1442
1443 if filtered_batch.num_rows() == 0 {
1444 continue;
1445 }
1446
1447 return Ok(Some(filtered_batch));
1448 }
1449 }
1450 RowSelectionCursor::Selectors(selectors_cursor) => {
1451 while read_records < batch_size && !selectors_cursor.is_empty() {
1452 let front = selectors_cursor.next_selector();
1453 if front.skip {
1454 let skipped = self.array_reader.skip_records(front.row_count)?;
1455
1456 if skipped != front.row_count {
1457 return Err(general_err!(
1458 "failed to skip rows, expected {}, got {}",
1459 front.row_count,
1460 skipped
1461 ));
1462 }
1463 continue;
1464 }
1465
1466 if front.row_count == 0 {
1469 continue;
1470 }
1471
1472 let need_read = batch_size - read_records;
1474 let to_read = match front.row_count.checked_sub(need_read) {
1475 Some(remaining) if remaining != 0 => {
1476 selectors_cursor.return_selector(RowSelector::select(remaining));
1479 need_read
1480 }
1481 _ => front.row_count,
1482 };
1483 match self.array_reader.read_records(to_read)? {
1484 0 => break,
1485 rec => read_records += rec,
1486 };
1487 }
1488 }
1489 RowSelectionCursor::All => {
1490 self.array_reader.read_records(batch_size)?;
1491 }
1492 };
1493
1494 let array = self.array_reader.consume_batch()?;
1495 let struct_array = array.as_struct_opt().ok_or_else(|| {
1496 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1497 })?;
1498
1499 Ok(if struct_array.len() > 0 {
1500 Some(RecordBatch::from(struct_array))
1501 } else {
1502 None
1503 })
1504 }
1505}
1506
1507impl RecordBatchReader for ParquetRecordBatchReader {
1508 fn schema(&self) -> SchemaRef {
1513 self.schema.clone()
1514 }
1515}
1516
1517impl ParquetRecordBatchReader {
1518 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1522 ParquetRecordBatchReaderBuilder::try_new(reader)?
1523 .with_batch_size(batch_size)
1524 .build()
1525 }
1526
1527 pub fn try_new_with_row_groups(
1532 levels: &FieldLevels,
1533 row_groups: &dyn RowGroups,
1534 batch_size: usize,
1535 selection: Option<RowSelection>,
1536 ) -> Result<Self> {
1537 let metrics = ArrowReaderMetrics::disabled();
1539 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1540 .with_batch_size(batch_size)
1541 .with_parquet_metadata(row_groups.metadata())
1542 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1543
1544 let read_plan = ReadPlanBuilder::new(batch_size)
1545 .with_selection(selection)
1546 .build();
1547
1548 Ok(Self {
1549 array_reader,
1550 schema: Arc::new(Schema::new(levels.fields.clone())),
1551 read_plan,
1552 })
1553 }
1554
1555 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1559 let schema = match array_reader.get_data_type() {
1560 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1561 _ => unreachable!("Struct array reader's data type is not struct!"),
1562 };
1563
1564 Self {
1565 array_reader,
1566 schema: Arc::new(schema),
1567 read_plan,
1568 }
1569 }
1570
1571 #[inline(always)]
1572 pub(crate) fn batch_size(&self) -> usize {
1573 self.read_plan.batch_size()
1574 }
1575}
1576
1577#[cfg(test)]
1578pub(crate) mod tests {
1579 use std::cmp::min;
1580 use std::collections::{HashMap, VecDeque};
1581 use std::fmt::Formatter;
1582 use std::fs::File;
1583 use std::io::Seek;
1584 use std::path::PathBuf;
1585 use std::sync::Arc;
1586
1587 use rand::rngs::StdRng;
1588 use rand::{Rng, RngCore, SeedableRng, random, rng};
1589 use tempfile::tempfile;
1590
1591 use crate::arrow::arrow_reader::{
1592 ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1593 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1594 };
1595 use crate::arrow::schema::{
1596 add_encoded_arrow_schema_to_metadata,
1597 virtual_type::{RowGroupIndex, RowNumber},
1598 };
1599 use crate::arrow::{ArrowWriter, ProjectionMask};
1600 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1601 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1602 use crate::data_type::{
1603 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1604 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1605 };
1606 use crate::errors::Result;
1607 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1608 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1609 use crate::file::writer::SerializedFileWriter;
1610 use crate::schema::parser::parse_message_type;
1611 use crate::schema::types::{Type, TypePtr};
1612 use crate::util::test_common::rand_gen::RandGen;
1613 use arrow_array::builder::*;
1614 use arrow_array::cast::AsArray;
1615 use arrow_array::types::{
1616 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1617 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1618 Time64MicrosecondType,
1619 };
1620 use arrow_array::*;
1621 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1622 use arrow_data::{ArrayData, ArrayDataBuilder};
1623 use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1624 use arrow_select::concat::concat_batches;
1625 use bytes::Bytes;
1626 use half::f16;
1627 use num_traits::PrimInt;
1628
1629 #[test]
1630 fn test_arrow_reader_all_columns() {
1631 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1632
1633 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1634 let original_schema = Arc::clone(builder.schema());
1635 let reader = builder.build().unwrap();
1636
1637 assert_eq!(original_schema.fields(), reader.schema().fields());
1639 }
1640
1641 #[test]
1642 fn test_reuse_schema() {
1643 let file = get_test_file("parquet/alltypes-java.parquet");
1644
1645 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1646 let expected = builder.metadata;
1647 let schema = expected.file_metadata().schema_descr_ptr();
1648
1649 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1650 let builder =
1651 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1652
1653 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1655 }
1656
1657 #[test]
1658 fn test_page_encoding_stats_mask() {
1659 let testdata = arrow::util::test_util::parquet_test_data();
1660 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1661 let file = File::open(path).unwrap();
1662
1663 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1664 let builder =
1665 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1666
1667 let row_group_metadata = builder.metadata.row_group(0);
1668
1669 let page_encoding_stats = row_group_metadata
1671 .column(0)
1672 .page_encoding_stats_mask()
1673 .unwrap();
1674 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1675 let page_encoding_stats = row_group_metadata
1676 .column(2)
1677 .page_encoding_stats_mask()
1678 .unwrap();
1679 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1680 }
1681
1682 #[test]
1683 fn test_stats_stats_skipped() {
1684 let testdata = arrow::util::test_util::parquet_test_data();
1685 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1686 let file = File::open(path).unwrap();
1687
1688 let arrow_options = ArrowReaderOptions::new()
1690 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1691 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1692 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1693 file.try_clone().unwrap(),
1694 arrow_options,
1695 )
1696 .unwrap();
1697
1698 let row_group_metadata = builder.metadata.row_group(0);
1699 for column in row_group_metadata.columns() {
1700 assert!(column.page_encoding_stats().is_none());
1701 assert!(column.page_encoding_stats_mask().is_none());
1702 assert!(column.statistics().is_none());
1703 }
1704
1705 let arrow_options = ArrowReaderOptions::new()
1707 .with_encoding_stats_as_mask(true)
1708 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1709 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1710 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1711 file.try_clone().unwrap(),
1712 arrow_options,
1713 )
1714 .unwrap();
1715
1716 let row_group_metadata = builder.metadata.row_group(0);
1717 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1718 assert!(column.page_encoding_stats().is_none());
1719 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1720 assert_eq!(column.statistics().is_some(), idx == 0);
1721 }
1722 }
1723
1724 #[test]
1725 fn test_size_stats_stats_skipped() {
1726 let testdata = arrow::util::test_util::parquet_test_data();
1727 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1728 let file = File::open(path).unwrap();
1729
1730 let arrow_options =
1732 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1733 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1734 file.try_clone().unwrap(),
1735 arrow_options,
1736 )
1737 .unwrap();
1738
1739 let row_group_metadata = builder.metadata.row_group(0);
1740 for column in row_group_metadata.columns() {
1741 assert!(column.repetition_level_histogram().is_none());
1742 assert!(column.definition_level_histogram().is_none());
1743 assert!(column.unencoded_byte_array_data_bytes().is_none());
1744 }
1745
1746 let arrow_options = ArrowReaderOptions::new()
1748 .with_encoding_stats_as_mask(true)
1749 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1750 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1751 file.try_clone().unwrap(),
1752 arrow_options,
1753 )
1754 .unwrap();
1755
1756 let row_group_metadata = builder.metadata.row_group(0);
1757 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1758 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1759 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1760 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1761 }
1762 }
1763
1764 #[test]
1765 fn test_arrow_reader_single_column() {
1766 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1767
1768 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1769 let original_schema = Arc::clone(builder.schema());
1770
1771 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1772 let reader = builder.with_projection(mask).build().unwrap();
1773
1774 assert_eq!(1, reader.schema().fields().len());
1776 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1777 }
1778
1779 #[test]
1780 fn test_arrow_reader_single_column_by_name() {
1781 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1782
1783 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1784 let original_schema = Arc::clone(builder.schema());
1785
1786 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1787 let reader = builder.with_projection(mask).build().unwrap();
1788
1789 assert_eq!(1, reader.schema().fields().len());
1791 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1792 }
1793
1794 #[test]
1795 fn test_null_column_reader_test() {
1796 let mut file = tempfile::tempfile().unwrap();
1797
1798 let schema = "
1799 message message {
1800 OPTIONAL INT32 int32;
1801 }
1802 ";
1803 let schema = Arc::new(parse_message_type(schema).unwrap());
1804
1805 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1806 generate_single_column_file_with_data::<Int32Type>(
1807 &[vec![], vec![]],
1808 Some(&def_levels),
1809 file.try_clone().unwrap(), schema,
1811 Some(Field::new("int32", ArrowDataType::Null, true)),
1812 &Default::default(),
1813 )
1814 .unwrap();
1815
1816 file.rewind().unwrap();
1817
1818 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1819 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1820
1821 assert_eq!(batches.len(), 4);
1822 for batch in &batches[0..3] {
1823 assert_eq!(batch.num_rows(), 2);
1824 assert_eq!(batch.num_columns(), 1);
1825 assert_eq!(batch.column(0).null_count(), 2);
1826 }
1827
1828 assert_eq!(batches[3].num_rows(), 1);
1829 assert_eq!(batches[3].num_columns(), 1);
1830 assert_eq!(batches[3].column(0).null_count(), 1);
1831 }
1832
1833 #[test]
1834 fn test_primitive_single_column_reader_test() {
1835 run_single_column_reader_tests::<BoolType, _, BoolType>(
1836 2,
1837 ConvertedType::NONE,
1838 None,
1839 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1840 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1841 );
1842 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1843 2,
1844 ConvertedType::NONE,
1845 None,
1846 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1847 &[
1848 Encoding::PLAIN,
1849 Encoding::RLE_DICTIONARY,
1850 Encoding::DELTA_BINARY_PACKED,
1851 Encoding::BYTE_STREAM_SPLIT,
1852 ],
1853 );
1854 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1855 2,
1856 ConvertedType::NONE,
1857 None,
1858 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1859 &[
1860 Encoding::PLAIN,
1861 Encoding::RLE_DICTIONARY,
1862 Encoding::DELTA_BINARY_PACKED,
1863 Encoding::BYTE_STREAM_SPLIT,
1864 ],
1865 );
1866 run_single_column_reader_tests::<FloatType, _, FloatType>(
1867 2,
1868 ConvertedType::NONE,
1869 None,
1870 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1871 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1872 );
1873 }
1874
1875 #[test]
1876 fn test_unsigned_primitive_single_column_reader_test() {
1877 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1878 2,
1879 ConvertedType::UINT_32,
1880 Some(ArrowDataType::UInt32),
1881 |vals| {
1882 Arc::new(UInt32Array::from_iter(
1883 vals.iter().map(|x| x.map(|x| x as u32)),
1884 ))
1885 },
1886 &[
1887 Encoding::PLAIN,
1888 Encoding::RLE_DICTIONARY,
1889 Encoding::DELTA_BINARY_PACKED,
1890 ],
1891 );
1892 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1893 2,
1894 ConvertedType::UINT_64,
1895 Some(ArrowDataType::UInt64),
1896 |vals| {
1897 Arc::new(UInt64Array::from_iter(
1898 vals.iter().map(|x| x.map(|x| x as u64)),
1899 ))
1900 },
1901 &[
1902 Encoding::PLAIN,
1903 Encoding::RLE_DICTIONARY,
1904 Encoding::DELTA_BINARY_PACKED,
1905 ],
1906 );
1907 }
1908
1909 #[test]
1910 fn test_unsigned_roundtrip() {
1911 let schema = Arc::new(Schema::new(vec![
1912 Field::new("uint32", ArrowDataType::UInt32, true),
1913 Field::new("uint64", ArrowDataType::UInt64, true),
1914 ]));
1915
1916 let mut buf = Vec::with_capacity(1024);
1917 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1918
1919 let original = RecordBatch::try_new(
1920 schema,
1921 vec![
1922 Arc::new(UInt32Array::from_iter_values([
1923 0,
1924 i32::MAX as u32,
1925 u32::MAX,
1926 ])),
1927 Arc::new(UInt64Array::from_iter_values([
1928 0,
1929 i64::MAX as u64,
1930 u64::MAX,
1931 ])),
1932 ],
1933 )
1934 .unwrap();
1935
1936 writer.write(&original).unwrap();
1937 writer.close().unwrap();
1938
1939 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1940 let ret = reader.next().unwrap().unwrap();
1941 assert_eq!(ret, original);
1942
1943 ret.column(0)
1945 .as_any()
1946 .downcast_ref::<UInt32Array>()
1947 .unwrap();
1948
1949 ret.column(1)
1950 .as_any()
1951 .downcast_ref::<UInt64Array>()
1952 .unwrap();
1953 }
1954
1955 #[test]
1956 fn test_float16_roundtrip() -> Result<()> {
1957 let schema = Arc::new(Schema::new(vec![
1958 Field::new("float16", ArrowDataType::Float16, false),
1959 Field::new("float16-nullable", ArrowDataType::Float16, true),
1960 ]));
1961
1962 let mut buf = Vec::with_capacity(1024);
1963 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1964
1965 let original = RecordBatch::try_new(
1966 schema,
1967 vec![
1968 Arc::new(Float16Array::from_iter_values([
1969 f16::EPSILON,
1970 f16::MIN,
1971 f16::MAX,
1972 f16::NAN,
1973 f16::INFINITY,
1974 f16::NEG_INFINITY,
1975 f16::ONE,
1976 f16::NEG_ONE,
1977 f16::ZERO,
1978 f16::NEG_ZERO,
1979 f16::E,
1980 f16::PI,
1981 f16::FRAC_1_PI,
1982 ])),
1983 Arc::new(Float16Array::from(vec![
1984 None,
1985 None,
1986 None,
1987 Some(f16::NAN),
1988 Some(f16::INFINITY),
1989 Some(f16::NEG_INFINITY),
1990 None,
1991 None,
1992 None,
1993 None,
1994 None,
1995 None,
1996 Some(f16::FRAC_1_PI),
1997 ])),
1998 ],
1999 )?;
2000
2001 writer.write(&original)?;
2002 writer.close()?;
2003
2004 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2005 let ret = reader.next().unwrap()?;
2006 assert_eq!(ret, original);
2007
2008 ret.column(0).as_primitive::<Float16Type>();
2010 ret.column(1).as_primitive::<Float16Type>();
2011
2012 Ok(())
2013 }
2014
2015 #[test]
2016 fn test_time_utc_roundtrip() -> Result<()> {
2017 let schema = Arc::new(Schema::new(vec![
2018 Field::new(
2019 "time_millis",
2020 ArrowDataType::Time32(TimeUnit::Millisecond),
2021 true,
2022 )
2023 .with_metadata(HashMap::from_iter(vec![(
2024 "adjusted_to_utc".to_string(),
2025 "".to_string(),
2026 )])),
2027 Field::new(
2028 "time_micros",
2029 ArrowDataType::Time64(TimeUnit::Microsecond),
2030 true,
2031 )
2032 .with_metadata(HashMap::from_iter(vec![(
2033 "adjusted_to_utc".to_string(),
2034 "".to_string(),
2035 )])),
2036 ]));
2037
2038 let mut buf = Vec::with_capacity(1024);
2039 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2040
2041 let original = RecordBatch::try_new(
2042 schema,
2043 vec![
2044 Arc::new(Time32MillisecondArray::from(vec![
2045 Some(-1),
2046 Some(0),
2047 Some(86_399_000),
2048 Some(86_400_000),
2049 Some(86_401_000),
2050 None,
2051 ])),
2052 Arc::new(Time64MicrosecondArray::from(vec![
2053 Some(-1),
2054 Some(0),
2055 Some(86_399 * 1_000_000),
2056 Some(86_400 * 1_000_000),
2057 Some(86_401 * 1_000_000),
2058 None,
2059 ])),
2060 ],
2061 )?;
2062
2063 writer.write(&original)?;
2064 writer.close()?;
2065
2066 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2067 let ret = reader.next().unwrap()?;
2068 assert_eq!(ret, original);
2069
2070 ret.column(0).as_primitive::<Time32MillisecondType>();
2072 ret.column(1).as_primitive::<Time64MicrosecondType>();
2073
2074 Ok(())
2075 }
2076
2077 #[test]
2078 fn test_date32_roundtrip() -> Result<()> {
2079 use arrow_array::Date32Array;
2080
2081 let schema = Arc::new(Schema::new(vec![Field::new(
2082 "date32",
2083 ArrowDataType::Date32,
2084 false,
2085 )]));
2086
2087 let mut buf = Vec::with_capacity(1024);
2088
2089 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2090
2091 let original = RecordBatch::try_new(
2092 schema,
2093 vec![Arc::new(Date32Array::from(vec![
2094 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2095 ]))],
2096 )?;
2097
2098 writer.write(&original)?;
2099 writer.close()?;
2100
2101 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2102 let ret = reader.next().unwrap()?;
2103 assert_eq!(ret, original);
2104
2105 ret.column(0).as_primitive::<Date32Type>();
2107
2108 Ok(())
2109 }
2110
2111 #[test]
2112 fn test_date64_roundtrip() -> Result<()> {
2113 use arrow_array::Date64Array;
2114
2115 let schema = Arc::new(Schema::new(vec![
2116 Field::new("small-date64", ArrowDataType::Date64, false),
2117 Field::new("big-date64", ArrowDataType::Date64, false),
2118 Field::new("invalid-date64", ArrowDataType::Date64, false),
2119 ]));
2120
2121 let mut default_buf = Vec::with_capacity(1024);
2122 let mut coerce_buf = Vec::with_capacity(1024);
2123
2124 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2125
2126 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2127 let mut coerce_writer =
2128 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2129
2130 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2131
2132 let original = RecordBatch::try_new(
2133 schema,
2134 vec![
2135 Arc::new(Date64Array::from(vec![
2137 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2138 -1_000 * NUM_MILLISECONDS_IN_DAY,
2139 0,
2140 1_000 * NUM_MILLISECONDS_IN_DAY,
2141 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2142 ])),
2143 Arc::new(Date64Array::from(vec![
2145 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2146 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2147 0,
2148 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2149 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2150 ])),
2151 Arc::new(Date64Array::from(vec![
2153 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2154 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2155 1,
2156 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2157 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2158 ])),
2159 ],
2160 )?;
2161
2162 default_writer.write(&original)?;
2163 coerce_writer.write(&original)?;
2164
2165 default_writer.close()?;
2166 coerce_writer.close()?;
2167
2168 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2169 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2170
2171 let default_ret = default_reader.next().unwrap()?;
2172 let coerce_ret = coerce_reader.next().unwrap()?;
2173
2174 assert_eq!(default_ret, original);
2176
2177 assert_eq!(coerce_ret.column(0), original.column(0));
2179 assert_ne!(coerce_ret.column(1), original.column(1));
2180 assert_ne!(coerce_ret.column(2), original.column(2));
2181
2182 default_ret.column(0).as_primitive::<Date64Type>();
2184 coerce_ret.column(0).as_primitive::<Date64Type>();
2185
2186 Ok(())
2187 }
2188 struct RandFixedLenGen {}
2189
2190 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2191 fn r#gen(len: i32) -> FixedLenByteArray {
2192 let mut v = vec![0u8; len as usize];
2193 rng().fill_bytes(&mut v);
2194 ByteArray::from(v).into()
2195 }
2196 }
2197
2198 #[test]
2199 fn test_fixed_length_binary_column_reader() {
2200 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2201 20,
2202 ConvertedType::NONE,
2203 None,
2204 |vals| {
2205 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2206 for val in vals {
2207 match val {
2208 Some(b) => builder.append_value(b).unwrap(),
2209 None => builder.append_null(),
2210 }
2211 }
2212 Arc::new(builder.finish())
2213 },
2214 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2215 );
2216 }
2217
2218 #[test]
2219 fn test_interval_day_time_column_reader() {
2220 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2221 12,
2222 ConvertedType::INTERVAL,
2223 None,
2224 |vals| {
2225 Arc::new(
2226 vals.iter()
2227 .map(|x| {
2228 x.as_ref().map(|b| IntervalDayTime {
2229 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2230 milliseconds: i32::from_le_bytes(
2231 b.as_ref()[8..12].try_into().unwrap(),
2232 ),
2233 })
2234 })
2235 .collect::<IntervalDayTimeArray>(),
2236 )
2237 },
2238 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2239 );
2240 }
2241
2242 #[test]
2243 fn test_int96_single_column_reader_test() {
2244 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2245
2246 type TypeHintAndConversionFunction =
2247 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2248
2249 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2250 (None, |vals: &[Option<Int96>]| {
2252 Arc::new(TimestampNanosecondArray::from_iter(
2253 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2254 )) as ArrayRef
2255 }),
2256 (
2258 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2259 |vals: &[Option<Int96>]| {
2260 Arc::new(TimestampSecondArray::from_iter(
2261 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2262 )) as ArrayRef
2263 },
2264 ),
2265 (
2266 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2267 |vals: &[Option<Int96>]| {
2268 Arc::new(TimestampMillisecondArray::from_iter(
2269 vals.iter().map(|x| x.map(|x| x.to_millis())),
2270 )) as ArrayRef
2271 },
2272 ),
2273 (
2274 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2275 |vals: &[Option<Int96>]| {
2276 Arc::new(TimestampMicrosecondArray::from_iter(
2277 vals.iter().map(|x| x.map(|x| x.to_micros())),
2278 )) as ArrayRef
2279 },
2280 ),
2281 (
2282 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2283 |vals: &[Option<Int96>]| {
2284 Arc::new(TimestampNanosecondArray::from_iter(
2285 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2286 )) as ArrayRef
2287 },
2288 ),
2289 (
2291 Some(ArrowDataType::Timestamp(
2292 TimeUnit::Second,
2293 Some(Arc::from("-05:00")),
2294 )),
2295 |vals: &[Option<Int96>]| {
2296 Arc::new(
2297 TimestampSecondArray::from_iter(
2298 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2299 )
2300 .with_timezone("-05:00"),
2301 ) as ArrayRef
2302 },
2303 ),
2304 ];
2305
2306 resolutions.iter().for_each(|(arrow_type, converter)| {
2307 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2308 2,
2309 ConvertedType::NONE,
2310 arrow_type.clone(),
2311 converter,
2312 encodings,
2313 );
2314 })
2315 }
2316
2317 #[test]
2318 fn test_int96_from_spark_file_with_provided_schema() {
2319 use arrow_schema::DataType::Timestamp;
2323 let test_data = arrow::util::test_util::parquet_test_data();
2324 let path = format!("{test_data}/int96_from_spark.parquet");
2325 let file = File::open(path).unwrap();
2326
2327 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2328 "a",
2329 Timestamp(TimeUnit::Microsecond, None),
2330 true,
2331 )]));
2332 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2333
2334 let mut record_reader =
2335 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2336 .unwrap()
2337 .build()
2338 .unwrap();
2339
2340 let batch = record_reader.next().unwrap().unwrap();
2341 assert_eq!(batch.num_columns(), 1);
2342 let column = batch.column(0);
2343 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2344
2345 let expected = Arc::new(Int64Array::from(vec![
2346 Some(1704141296123456),
2347 Some(1704070800000000),
2348 Some(253402225200000000),
2349 Some(1735599600000000),
2350 None,
2351 Some(9089380393200000000),
2352 ]));
2353
2354 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2359 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2360
2361 assert_eq!(casted_timestamps.len(), expected.len());
2362
2363 casted_timestamps
2364 .iter()
2365 .zip(expected.iter())
2366 .for_each(|(lhs, rhs)| {
2367 assert_eq!(lhs, rhs);
2368 });
2369 }
2370
2371 #[test]
2372 fn test_int96_from_spark_file_without_provided_schema() {
2373 use arrow_schema::DataType::Timestamp;
2377 let test_data = arrow::util::test_util::parquet_test_data();
2378 let path = format!("{test_data}/int96_from_spark.parquet");
2379 let file = File::open(path).unwrap();
2380
2381 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2382 .unwrap()
2383 .build()
2384 .unwrap();
2385
2386 let batch = record_reader.next().unwrap().unwrap();
2387 assert_eq!(batch.num_columns(), 1);
2388 let column = batch.column(0);
2389 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2390
2391 let expected = Arc::new(Int64Array::from(vec![
2392 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2397 Some(-4864435138808946688), ]));
2399
2400 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2405 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2406
2407 assert_eq!(casted_timestamps.len(), expected.len());
2408
2409 casted_timestamps
2410 .iter()
2411 .zip(expected.iter())
2412 .for_each(|(lhs, rhs)| {
2413 assert_eq!(lhs, rhs);
2414 });
2415 }
2416
2417 struct RandUtf8Gen {}
2418
2419 impl RandGen<ByteArrayType> for RandUtf8Gen {
2420 fn r#gen(len: i32) -> ByteArray {
2421 Int32Type::r#gen(len).to_string().as_str().into()
2422 }
2423 }
2424
2425 #[test]
2426 fn test_utf8_single_column_reader_test() {
2427 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2428 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2429 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2430 })))
2431 }
2432
2433 let encodings = &[
2434 Encoding::PLAIN,
2435 Encoding::RLE_DICTIONARY,
2436 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2437 Encoding::DELTA_BYTE_ARRAY,
2438 ];
2439
2440 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2441 2,
2442 ConvertedType::NONE,
2443 None,
2444 |vals| {
2445 Arc::new(BinaryArray::from_iter(
2446 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2447 ))
2448 },
2449 encodings,
2450 );
2451
2452 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2453 2,
2454 ConvertedType::UTF8,
2455 None,
2456 string_converter::<i32>,
2457 encodings,
2458 );
2459
2460 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2461 2,
2462 ConvertedType::UTF8,
2463 Some(ArrowDataType::Utf8),
2464 string_converter::<i32>,
2465 encodings,
2466 );
2467
2468 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2469 2,
2470 ConvertedType::UTF8,
2471 Some(ArrowDataType::LargeUtf8),
2472 string_converter::<i64>,
2473 encodings,
2474 );
2475
2476 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2477 for key in &small_key_types {
2478 for encoding in encodings {
2479 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2480 opts.encoding = *encoding;
2481
2482 let data_type =
2483 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2484
2485 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2487 opts,
2488 2,
2489 ConvertedType::UTF8,
2490 Some(data_type.clone()),
2491 move |vals| {
2492 let vals = string_converter::<i32>(vals);
2493 arrow::compute::cast(&vals, &data_type).unwrap()
2494 },
2495 );
2496 }
2497 }
2498
2499 let key_types = [
2500 ArrowDataType::Int16,
2501 ArrowDataType::UInt16,
2502 ArrowDataType::Int32,
2503 ArrowDataType::UInt32,
2504 ArrowDataType::Int64,
2505 ArrowDataType::UInt64,
2506 ];
2507
2508 for key in &key_types {
2509 let data_type =
2510 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2511
2512 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2513 2,
2514 ConvertedType::UTF8,
2515 Some(data_type.clone()),
2516 move |vals| {
2517 let vals = string_converter::<i32>(vals);
2518 arrow::compute::cast(&vals, &data_type).unwrap()
2519 },
2520 encodings,
2521 );
2522
2523 let data_type = ArrowDataType::Dictionary(
2524 Box::new(key.clone()),
2525 Box::new(ArrowDataType::LargeUtf8),
2526 );
2527
2528 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2529 2,
2530 ConvertedType::UTF8,
2531 Some(data_type.clone()),
2532 move |vals| {
2533 let vals = string_converter::<i64>(vals);
2534 arrow::compute::cast(&vals, &data_type).unwrap()
2535 },
2536 encodings,
2537 );
2538 }
2539 }
2540
2541 #[test]
2542 fn test_decimal_nullable_struct() {
2543 let decimals = Decimal256Array::from_iter_values(
2544 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2545 );
2546
2547 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2548 "decimals",
2549 decimals.data_type().clone(),
2550 false,
2551 )])))
2552 .len(8)
2553 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2554 .child_data(vec![decimals.into_data()])
2555 .build()
2556 .unwrap();
2557
2558 let written =
2559 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2560 .unwrap();
2561
2562 let mut buffer = Vec::with_capacity(1024);
2563 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2564 writer.write(&written).unwrap();
2565 writer.close().unwrap();
2566
2567 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2568 .unwrap()
2569 .collect::<Result<Vec<_>, _>>()
2570 .unwrap();
2571
2572 assert_eq!(&written.slice(0, 3), &read[0]);
2573 assert_eq!(&written.slice(3, 3), &read[1]);
2574 assert_eq!(&written.slice(6, 2), &read[2]);
2575 }
2576
2577 #[test]
2578 fn test_int32_nullable_struct() {
2579 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2580 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2581 "int32",
2582 int32.data_type().clone(),
2583 false,
2584 )])))
2585 .len(8)
2586 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2587 .child_data(vec![int32.into_data()])
2588 .build()
2589 .unwrap();
2590
2591 let written =
2592 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2593 .unwrap();
2594
2595 let mut buffer = Vec::with_capacity(1024);
2596 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2597 writer.write(&written).unwrap();
2598 writer.close().unwrap();
2599
2600 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2601 .unwrap()
2602 .collect::<Result<Vec<_>, _>>()
2603 .unwrap();
2604
2605 assert_eq!(&written.slice(0, 3), &read[0]);
2606 assert_eq!(&written.slice(3, 3), &read[1]);
2607 assert_eq!(&written.slice(6, 2), &read[2]);
2608 }
2609
2610 #[test]
2611 fn test_decimal_list() {
2612 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2613
2614 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2616 decimals.data_type().clone(),
2617 false,
2618 ))))
2619 .len(7)
2620 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2621 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2622 .child_data(vec![decimals.into_data()])
2623 .build()
2624 .unwrap();
2625
2626 let written =
2627 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2628 .unwrap();
2629
2630 let mut buffer = Vec::with_capacity(1024);
2631 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2632 writer.write(&written).unwrap();
2633 writer.close().unwrap();
2634
2635 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2636 .unwrap()
2637 .collect::<Result<Vec<_>, _>>()
2638 .unwrap();
2639
2640 assert_eq!(&written.slice(0, 3), &read[0]);
2641 assert_eq!(&written.slice(3, 3), &read[1]);
2642 assert_eq!(&written.slice(6, 1), &read[2]);
2643 }
2644
2645 #[test]
2646 fn test_read_decimal_file() {
2647 use arrow_array::Decimal128Array;
2648 let testdata = arrow::util::test_util::parquet_test_data();
2649 let file_variants = vec![
2650 ("byte_array", 4),
2651 ("fixed_length", 25),
2652 ("int32", 4),
2653 ("int64", 10),
2654 ];
2655 for (prefix, target_precision) in file_variants {
2656 let path = format!("{testdata}/{prefix}_decimal.parquet");
2657 let file = File::open(path).unwrap();
2658 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2659
2660 let batch = record_reader.next().unwrap().unwrap();
2661 assert_eq!(batch.num_rows(), 24);
2662 let col = batch
2663 .column(0)
2664 .as_any()
2665 .downcast_ref::<Decimal128Array>()
2666 .unwrap();
2667
2668 let expected = 1..25;
2669
2670 assert_eq!(col.precision(), target_precision);
2671 assert_eq!(col.scale(), 2);
2672
2673 for (i, v) in expected.enumerate() {
2674 assert_eq!(col.value(i), v * 100_i128);
2675 }
2676 }
2677 }
2678
2679 #[test]
2680 fn test_read_float16_nonzeros_file() {
2681 use arrow_array::Float16Array;
2682 let testdata = arrow::util::test_util::parquet_test_data();
2683 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2685 let file = File::open(path).unwrap();
2686 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2687
2688 let batch = record_reader.next().unwrap().unwrap();
2689 assert_eq!(batch.num_rows(), 8);
2690 let col = batch
2691 .column(0)
2692 .as_any()
2693 .downcast_ref::<Float16Array>()
2694 .unwrap();
2695
2696 let f16_two = f16::ONE + f16::ONE;
2697
2698 assert_eq!(col.null_count(), 1);
2699 assert!(col.is_null(0));
2700 assert_eq!(col.value(1), f16::ONE);
2701 assert_eq!(col.value(2), -f16_two);
2702 assert!(col.value(3).is_nan());
2703 assert_eq!(col.value(4), f16::ZERO);
2704 assert!(col.value(4).is_sign_positive());
2705 assert_eq!(col.value(5), f16::NEG_ONE);
2706 assert_eq!(col.value(6), f16::NEG_ZERO);
2707 assert!(col.value(6).is_sign_negative());
2708 assert_eq!(col.value(7), f16_two);
2709 }
2710
2711 #[test]
2712 fn test_read_float16_zeros_file() {
2713 use arrow_array::Float16Array;
2714 let testdata = arrow::util::test_util::parquet_test_data();
2715 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2717 let file = File::open(path).unwrap();
2718 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2719
2720 let batch = record_reader.next().unwrap().unwrap();
2721 assert_eq!(batch.num_rows(), 3);
2722 let col = batch
2723 .column(0)
2724 .as_any()
2725 .downcast_ref::<Float16Array>()
2726 .unwrap();
2727
2728 assert_eq!(col.null_count(), 1);
2729 assert!(col.is_null(0));
2730 assert_eq!(col.value(1), f16::ZERO);
2731 assert!(col.value(1).is_sign_positive());
2732 assert!(col.value(2).is_nan());
2733 }
2734
2735 #[test]
2736 fn test_read_float32_float64_byte_stream_split() {
2737 let path = format!(
2738 "{}/byte_stream_split.zstd.parquet",
2739 arrow::util::test_util::parquet_test_data(),
2740 );
2741 let file = File::open(path).unwrap();
2742 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2743
2744 let mut row_count = 0;
2745 for batch in record_reader {
2746 let batch = batch.unwrap();
2747 row_count += batch.num_rows();
2748 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2749 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2750
2751 for &x in f32_col.values() {
2753 assert!(x > -10.0);
2754 assert!(x < 10.0);
2755 }
2756 for &x in f64_col.values() {
2757 assert!(x > -10.0);
2758 assert!(x < 10.0);
2759 }
2760 }
2761 assert_eq!(row_count, 300);
2762 }
2763
2764 #[test]
2765 fn test_read_extended_byte_stream_split() {
2766 let path = format!(
2767 "{}/byte_stream_split_extended.gzip.parquet",
2768 arrow::util::test_util::parquet_test_data(),
2769 );
2770 let file = File::open(path).unwrap();
2771 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2772
2773 let mut row_count = 0;
2774 for batch in record_reader {
2775 let batch = batch.unwrap();
2776 row_count += batch.num_rows();
2777
2778 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2780 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2781 assert_eq!(f16_col.len(), f16_bss.len());
2782 f16_col
2783 .iter()
2784 .zip(f16_bss.iter())
2785 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2786
2787 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2789 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2790 assert_eq!(f32_col.len(), f32_bss.len());
2791 f32_col
2792 .iter()
2793 .zip(f32_bss.iter())
2794 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2795
2796 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2798 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2799 assert_eq!(f64_col.len(), f64_bss.len());
2800 f64_col
2801 .iter()
2802 .zip(f64_bss.iter())
2803 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2804
2805 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2807 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2808 assert_eq!(i32_col.len(), i32_bss.len());
2809 i32_col
2810 .iter()
2811 .zip(i32_bss.iter())
2812 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2813
2814 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2816 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2817 assert_eq!(i64_col.len(), i64_bss.len());
2818 i64_col
2819 .iter()
2820 .zip(i64_bss.iter())
2821 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2822
2823 let flba_col = batch.column(10).as_fixed_size_binary();
2825 let flba_bss = batch.column(11).as_fixed_size_binary();
2826 assert_eq!(flba_col.len(), flba_bss.len());
2827 flba_col
2828 .iter()
2829 .zip(flba_bss.iter())
2830 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2831
2832 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2834 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2835 assert_eq!(dec_col.len(), dec_bss.len());
2836 dec_col
2837 .iter()
2838 .zip(dec_bss.iter())
2839 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2840 }
2841 assert_eq!(row_count, 200);
2842 }
2843
2844 #[test]
2845 fn test_read_incorrect_map_schema_file() {
2846 let testdata = arrow::util::test_util::parquet_test_data();
2847 let path = format!("{testdata}/incorrect_map_schema.parquet");
2849 let file = File::open(path).unwrap();
2850 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2851
2852 let batch = record_reader.next().unwrap().unwrap();
2853 assert_eq!(batch.num_rows(), 1);
2854
2855 let expected_schema = Schema::new(vec![Field::new(
2856 "my_map",
2857 ArrowDataType::Map(
2858 Arc::new(Field::new(
2859 "key_value",
2860 ArrowDataType::Struct(Fields::from(vec![
2861 Field::new("key", ArrowDataType::Utf8, false),
2862 Field::new("value", ArrowDataType::Utf8, true),
2863 ])),
2864 false,
2865 )),
2866 false,
2867 ),
2868 true,
2869 )]);
2870 assert_eq!(batch.schema().as_ref(), &expected_schema);
2871
2872 assert_eq!(batch.num_rows(), 1);
2873 assert_eq!(batch.column(0).null_count(), 0);
2874 assert_eq!(
2875 batch.column(0).as_map().keys().as_ref(),
2876 &StringArray::from(vec!["parent", "name"])
2877 );
2878 assert_eq!(
2879 batch.column(0).as_map().values().as_ref(),
2880 &StringArray::from(vec!["another", "report"])
2881 );
2882 }
2883
2884 #[test]
2885 fn test_read_dict_fixed_size_binary() {
2886 let schema = Arc::new(Schema::new(vec![Field::new(
2887 "a",
2888 ArrowDataType::Dictionary(
2889 Box::new(ArrowDataType::UInt8),
2890 Box::new(ArrowDataType::FixedSizeBinary(8)),
2891 ),
2892 true,
2893 )]));
2894 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2895 let values = FixedSizeBinaryArray::try_from_iter(
2896 vec![
2897 (0u8..8u8).collect::<Vec<u8>>(),
2898 (24u8..32u8).collect::<Vec<u8>>(),
2899 ]
2900 .into_iter(),
2901 )
2902 .unwrap();
2903 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2904 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2905
2906 let mut buffer = Vec::with_capacity(1024);
2907 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2908 writer.write(&batch).unwrap();
2909 writer.close().unwrap();
2910 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2911 .unwrap()
2912 .collect::<Result<Vec<_>, _>>()
2913 .unwrap();
2914
2915 assert_eq!(read.len(), 1);
2916 assert_eq!(&batch, &read[0])
2917 }
2918
2919 #[test]
2920 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2921 let struct_fields = Fields::from(vec![
2928 Field::new(
2929 "city",
2930 ArrowDataType::Dictionary(
2931 Box::new(ArrowDataType::UInt8),
2932 Box::new(ArrowDataType::Utf8),
2933 ),
2934 true,
2935 ),
2936 Field::new("name", ArrowDataType::Utf8, true),
2937 ]);
2938 let schema = Arc::new(Schema::new(vec![Field::new(
2939 "items",
2940 ArrowDataType::Struct(struct_fields.clone()),
2941 true,
2942 )]));
2943
2944 let items_arr = StructArray::new(
2945 struct_fields,
2946 vec![
2947 Arc::new(DictionaryArray::new(
2948 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2949 Arc::new(StringArray::from_iter_values(vec![
2950 "quebec",
2951 "fredericton",
2952 "halifax",
2953 ])),
2954 )),
2955 Arc::new(StringArray::from_iter_values(vec![
2956 "albert", "terry", "lance", "", "tim",
2957 ])),
2958 ],
2959 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2960 );
2961
2962 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2963 let mut buffer = Vec::with_capacity(1024);
2964 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2965 writer.write(&batch).unwrap();
2966 writer.close().unwrap();
2967 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2968 .unwrap()
2969 .collect::<Result<Vec<_>, _>>()
2970 .unwrap();
2971
2972 assert_eq!(read.len(), 1);
2973 assert_eq!(&batch, &read[0])
2974 }
2975
2976 #[derive(Clone)]
2978 struct TestOptions {
2979 num_row_groups: usize,
2982 num_rows: usize,
2984 record_batch_size: usize,
2986 null_percent: Option<usize>,
2988 write_batch_size: usize,
2993 max_data_page_size: usize,
2995 max_dict_page_size: usize,
2997 writer_version: WriterVersion,
2999 enabled_statistics: EnabledStatistics,
3001 encoding: Encoding,
3003 row_selections: Option<(RowSelection, usize)>,
3005 row_filter: Option<Vec<bool>>,
3007 limit: Option<usize>,
3009 offset: Option<usize>,
3011 }
3012
3013 impl std::fmt::Debug for TestOptions {
3015 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3016 f.debug_struct("TestOptions")
3017 .field("num_row_groups", &self.num_row_groups)
3018 .field("num_rows", &self.num_rows)
3019 .field("record_batch_size", &self.record_batch_size)
3020 .field("null_percent", &self.null_percent)
3021 .field("write_batch_size", &self.write_batch_size)
3022 .field("max_data_page_size", &self.max_data_page_size)
3023 .field("max_dict_page_size", &self.max_dict_page_size)
3024 .field("writer_version", &self.writer_version)
3025 .field("enabled_statistics", &self.enabled_statistics)
3026 .field("encoding", &self.encoding)
3027 .field("row_selections", &self.row_selections.is_some())
3028 .field("row_filter", &self.row_filter.is_some())
3029 .field("limit", &self.limit)
3030 .field("offset", &self.offset)
3031 .finish()
3032 }
3033 }
3034
3035 impl Default for TestOptions {
3036 fn default() -> Self {
3037 Self {
3038 num_row_groups: 2,
3039 num_rows: 100,
3040 record_batch_size: 15,
3041 null_percent: None,
3042 write_batch_size: 64,
3043 max_data_page_size: 1024 * 1024,
3044 max_dict_page_size: 1024 * 1024,
3045 writer_version: WriterVersion::PARQUET_1_0,
3046 enabled_statistics: EnabledStatistics::Page,
3047 encoding: Encoding::PLAIN,
3048 row_selections: None,
3049 row_filter: None,
3050 limit: None,
3051 offset: None,
3052 }
3053 }
3054 }
3055
3056 impl TestOptions {
3057 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3058 Self {
3059 num_row_groups,
3060 num_rows,
3061 record_batch_size,
3062 ..Default::default()
3063 }
3064 }
3065
3066 fn with_null_percent(self, null_percent: usize) -> Self {
3067 Self {
3068 null_percent: Some(null_percent),
3069 ..self
3070 }
3071 }
3072
3073 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3074 Self {
3075 max_data_page_size,
3076 ..self
3077 }
3078 }
3079
3080 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3081 Self {
3082 max_dict_page_size,
3083 ..self
3084 }
3085 }
3086
3087 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3088 Self {
3089 enabled_statistics,
3090 ..self
3091 }
3092 }
3093
3094 fn with_row_selections(self) -> Self {
3095 assert!(self.row_filter.is_none(), "Must set row selection first");
3096
3097 let mut rng = rng();
3098 let step = rng.random_range(self.record_batch_size..self.num_rows);
3099 let row_selections = create_test_selection(
3100 step,
3101 self.num_row_groups * self.num_rows,
3102 rng.random::<bool>(),
3103 );
3104 Self {
3105 row_selections: Some(row_selections),
3106 ..self
3107 }
3108 }
3109
3110 fn with_row_filter(self) -> Self {
3111 let row_count = match &self.row_selections {
3112 Some((_, count)) => *count,
3113 None => self.num_row_groups * self.num_rows,
3114 };
3115
3116 let mut rng = rng();
3117 Self {
3118 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3119 ..self
3120 }
3121 }
3122
3123 fn with_limit(self, limit: usize) -> Self {
3124 Self {
3125 limit: Some(limit),
3126 ..self
3127 }
3128 }
3129
3130 fn with_offset(self, offset: usize) -> Self {
3131 Self {
3132 offset: Some(offset),
3133 ..self
3134 }
3135 }
3136
3137 fn writer_props(&self) -> WriterProperties {
3138 let builder = WriterProperties::builder()
3139 .set_data_page_size_limit(self.max_data_page_size)
3140 .set_write_batch_size(self.write_batch_size)
3141 .set_writer_version(self.writer_version)
3142 .set_statistics_enabled(self.enabled_statistics);
3143
3144 let builder = match self.encoding {
3145 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3146 .set_dictionary_enabled(true)
3147 .set_dictionary_page_size_limit(self.max_dict_page_size),
3148 _ => builder
3149 .set_dictionary_enabled(false)
3150 .set_encoding(self.encoding),
3151 };
3152
3153 builder.build()
3154 }
3155 }
3156
3157 fn run_single_column_reader_tests<T, F, G>(
3164 rand_max: i32,
3165 converted_type: ConvertedType,
3166 arrow_type: Option<ArrowDataType>,
3167 converter: F,
3168 encodings: &[Encoding],
3169 ) where
3170 T: DataType,
3171 G: RandGen<T>,
3172 F: Fn(&[Option<T::T>]) -> ArrayRef,
3173 {
3174 let all_options = vec![
3175 TestOptions::new(2, 100, 15),
3178 TestOptions::new(3, 25, 5),
3183 TestOptions::new(4, 100, 25),
3187 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3189 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3191 TestOptions::new(2, 256, 127).with_null_percent(0),
3193 TestOptions::new(2, 256, 93).with_null_percent(25),
3195 TestOptions::new(4, 100, 25).with_limit(0),
3197 TestOptions::new(4, 100, 25).with_limit(50),
3199 TestOptions::new(4, 100, 25).with_limit(10),
3201 TestOptions::new(4, 100, 25).with_limit(101),
3203 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3205 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3207 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3209 TestOptions::new(2, 256, 91)
3211 .with_null_percent(25)
3212 .with_enabled_statistics(EnabledStatistics::Chunk),
3213 TestOptions::new(2, 256, 91)
3215 .with_null_percent(25)
3216 .with_enabled_statistics(EnabledStatistics::None),
3217 TestOptions::new(2, 128, 91)
3219 .with_null_percent(100)
3220 .with_enabled_statistics(EnabledStatistics::None),
3221 TestOptions::new(2, 100, 15).with_row_selections(),
3226 TestOptions::new(3, 25, 5).with_row_selections(),
3231 TestOptions::new(4, 100, 25).with_row_selections(),
3235 TestOptions::new(3, 256, 73)
3237 .with_max_data_page_size(128)
3238 .with_row_selections(),
3239 TestOptions::new(3, 256, 57)
3241 .with_max_dict_page_size(128)
3242 .with_row_selections(),
3243 TestOptions::new(2, 256, 127)
3245 .with_null_percent(0)
3246 .with_row_selections(),
3247 TestOptions::new(2, 256, 93)
3249 .with_null_percent(25)
3250 .with_row_selections(),
3251 TestOptions::new(2, 256, 93)
3253 .with_null_percent(25)
3254 .with_row_selections()
3255 .with_limit(10),
3256 TestOptions::new(2, 256, 93)
3258 .with_null_percent(25)
3259 .with_row_selections()
3260 .with_offset(20)
3261 .with_limit(10),
3262 TestOptions::new(4, 100, 25).with_row_filter(),
3266 TestOptions::new(4, 100, 25)
3268 .with_row_selections()
3269 .with_row_filter(),
3270 TestOptions::new(2, 256, 93)
3272 .with_null_percent(25)
3273 .with_max_data_page_size(10)
3274 .with_row_filter(),
3275 TestOptions::new(2, 256, 93)
3277 .with_null_percent(25)
3278 .with_max_data_page_size(10)
3279 .with_row_selections()
3280 .with_row_filter(),
3281 TestOptions::new(2, 256, 93)
3283 .with_enabled_statistics(EnabledStatistics::None)
3284 .with_max_data_page_size(10)
3285 .with_row_selections(),
3286 ];
3287
3288 all_options.into_iter().for_each(|opts| {
3289 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3290 for encoding in encodings {
3291 let opts = TestOptions {
3292 writer_version,
3293 encoding: *encoding,
3294 ..opts.clone()
3295 };
3296
3297 single_column_reader_test::<T, _, G>(
3298 opts,
3299 rand_max,
3300 converted_type,
3301 arrow_type.clone(),
3302 &converter,
3303 )
3304 }
3305 }
3306 });
3307 }
3308
3309 fn single_column_reader_test<T, F, G>(
3313 opts: TestOptions,
3314 rand_max: i32,
3315 converted_type: ConvertedType,
3316 arrow_type: Option<ArrowDataType>,
3317 converter: F,
3318 ) where
3319 T: DataType,
3320 G: RandGen<T>,
3321 F: Fn(&[Option<T::T>]) -> ArrayRef,
3322 {
3323 println!(
3325 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3326 T::get_physical_type(),
3327 converted_type,
3328 arrow_type,
3329 opts
3330 );
3331
3332 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3334 Some(null_percent) => {
3335 let mut rng = rng();
3336
3337 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3338 .map(|_| {
3339 std::iter::from_fn(|| {
3340 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3341 })
3342 .take(opts.num_rows)
3343 .collect()
3344 })
3345 .collect();
3346 (Repetition::OPTIONAL, Some(def_levels))
3347 }
3348 None => (Repetition::REQUIRED, None),
3349 };
3350
3351 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3353 .map(|idx| {
3354 let null_count = match def_levels.as_ref() {
3355 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3356 None => 0,
3357 };
3358 G::gen_vec(rand_max, opts.num_rows - null_count)
3359 })
3360 .collect();
3361
3362 let len = match T::get_physical_type() {
3363 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3364 crate::basic::Type::INT96 => 12,
3365 _ => -1,
3366 };
3367
3368 let fields = vec![Arc::new(
3369 Type::primitive_type_builder("leaf", T::get_physical_type())
3370 .with_repetition(repetition)
3371 .with_converted_type(converted_type)
3372 .with_length(len)
3373 .build()
3374 .unwrap(),
3375 )];
3376
3377 let schema = Arc::new(
3378 Type::group_type_builder("test_schema")
3379 .with_fields(fields)
3380 .build()
3381 .unwrap(),
3382 );
3383
3384 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3385
3386 let mut file = tempfile::tempfile().unwrap();
3387
3388 generate_single_column_file_with_data::<T>(
3389 &values,
3390 def_levels.as_ref(),
3391 file.try_clone().unwrap(), schema,
3393 arrow_field,
3394 &opts,
3395 )
3396 .unwrap();
3397
3398 file.rewind().unwrap();
3399
3400 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3401 opts.enabled_statistics == EnabledStatistics::Page,
3402 ));
3403
3404 let mut builder =
3405 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3406
3407 let expected_data = match opts.row_selections {
3408 Some((selections, row_count)) => {
3409 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3410
3411 let mut skip_data: Vec<Option<T::T>> = vec![];
3412 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3413 for select in dequeue {
3414 if select.skip {
3415 without_skip_data.drain(0..select.row_count);
3416 } else {
3417 skip_data.extend(without_skip_data.drain(0..select.row_count));
3418 }
3419 }
3420 builder = builder.with_row_selection(selections);
3421
3422 assert_eq!(skip_data.len(), row_count);
3423 skip_data
3424 }
3425 None => {
3426 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3428 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3429 expected_data
3430 }
3431 };
3432
3433 let mut expected_data = match opts.row_filter {
3434 Some(filter) => {
3435 let expected_data = expected_data
3436 .into_iter()
3437 .zip(filter.iter())
3438 .filter_map(|(d, f)| f.then(|| d))
3439 .collect();
3440
3441 let mut filter_offset = 0;
3442 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3443 ProjectionMask::all(),
3444 move |b| {
3445 let array = BooleanArray::from_iter(
3446 filter
3447 .iter()
3448 .skip(filter_offset)
3449 .take(b.num_rows())
3450 .map(|x| Some(*x)),
3451 );
3452 filter_offset += b.num_rows();
3453 Ok(array)
3454 },
3455 ))]);
3456
3457 builder = builder.with_row_filter(filter);
3458 expected_data
3459 }
3460 None => expected_data,
3461 };
3462
3463 if let Some(offset) = opts.offset {
3464 builder = builder.with_offset(offset);
3465 expected_data = expected_data.into_iter().skip(offset).collect();
3466 }
3467
3468 if let Some(limit) = opts.limit {
3469 builder = builder.with_limit(limit);
3470 expected_data = expected_data.into_iter().take(limit).collect();
3471 }
3472
3473 let mut record_reader = builder
3474 .with_batch_size(opts.record_batch_size)
3475 .build()
3476 .unwrap();
3477
3478 let mut total_read = 0;
3479 loop {
3480 let maybe_batch = record_reader.next();
3481 if total_read < expected_data.len() {
3482 let end = min(total_read + opts.record_batch_size, expected_data.len());
3483 let batch = maybe_batch.unwrap().unwrap();
3484 assert_eq!(end - total_read, batch.num_rows());
3485
3486 let a = converter(&expected_data[total_read..end]);
3487 let b = batch.column(0);
3488
3489 assert_eq!(a.data_type(), b.data_type());
3490 assert_eq!(a.to_data(), b.to_data());
3491 assert_eq!(
3492 a.as_any().type_id(),
3493 b.as_any().type_id(),
3494 "incorrect type ids"
3495 );
3496
3497 total_read = end;
3498 } else {
3499 assert!(maybe_batch.is_none());
3500 break;
3501 }
3502 }
3503 }
3504
3505 fn gen_expected_data<T: DataType>(
3506 def_levels: Option<&Vec<Vec<i16>>>,
3507 values: &[Vec<T::T>],
3508 ) -> Vec<Option<T::T>> {
3509 let data: Vec<Option<T::T>> = match def_levels {
3510 Some(levels) => {
3511 let mut values_iter = values.iter().flatten();
3512 levels
3513 .iter()
3514 .flatten()
3515 .map(|d| match d {
3516 1 => Some(values_iter.next().cloned().unwrap()),
3517 0 => None,
3518 _ => unreachable!(),
3519 })
3520 .collect()
3521 }
3522 None => values.iter().flatten().cloned().map(Some).collect(),
3523 };
3524 data
3525 }
3526
3527 fn generate_single_column_file_with_data<T: DataType>(
3528 values: &[Vec<T::T>],
3529 def_levels: Option<&Vec<Vec<i16>>>,
3530 file: File,
3531 schema: TypePtr,
3532 field: Option<Field>,
3533 opts: &TestOptions,
3534 ) -> Result<ParquetMetaData> {
3535 let mut writer_props = opts.writer_props();
3536 if let Some(field) = field {
3537 let arrow_schema = Schema::new(vec![field]);
3538 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3539 }
3540
3541 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3542
3543 for (idx, v) in values.iter().enumerate() {
3544 let def_levels = def_levels.map(|d| d[idx].as_slice());
3545 let mut row_group_writer = writer.next_row_group()?;
3546 {
3547 let mut column_writer = row_group_writer
3548 .next_column()?
3549 .expect("Column writer is none!");
3550
3551 column_writer
3552 .typed::<T>()
3553 .write_batch(v, def_levels, None)?;
3554
3555 column_writer.close()?;
3556 }
3557 row_group_writer.close()?;
3558 }
3559
3560 writer.close()
3561 }
3562
3563 fn get_test_file(file_name: &str) -> File {
3564 let mut path = PathBuf::new();
3565 path.push(arrow::util::test_util::arrow_test_data());
3566 path.push(file_name);
3567
3568 File::open(path.as_path()).expect("File not found!")
3569 }
3570
3571 #[test]
3572 fn test_read_structs() {
3573 let testdata = arrow::util::test_util::parquet_test_data();
3577 let path = format!("{testdata}/nested_structs.rust.parquet");
3578 let file = File::open(&path).unwrap();
3579 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3580
3581 for batch in record_batch_reader {
3582 batch.unwrap();
3583 }
3584
3585 let file = File::open(&path).unwrap();
3586 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3587
3588 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3589 let projected_reader = builder
3590 .with_projection(mask)
3591 .with_batch_size(60)
3592 .build()
3593 .unwrap();
3594
3595 let expected_schema = Schema::new(vec![
3596 Field::new(
3597 "roll_num",
3598 ArrowDataType::Struct(Fields::from(vec![Field::new(
3599 "count",
3600 ArrowDataType::UInt64,
3601 false,
3602 )])),
3603 false,
3604 ),
3605 Field::new(
3606 "PC_CUR",
3607 ArrowDataType::Struct(Fields::from(vec![
3608 Field::new("mean", ArrowDataType::Int64, false),
3609 Field::new("sum", ArrowDataType::Int64, false),
3610 ])),
3611 false,
3612 ),
3613 ]);
3614
3615 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3617
3618 for batch in projected_reader {
3619 let batch = batch.unwrap();
3620 assert_eq!(batch.schema().as_ref(), &expected_schema);
3621 }
3622 }
3623
3624 #[test]
3625 fn test_read_structs_by_name() {
3627 let testdata = arrow::util::test_util::parquet_test_data();
3628 let path = format!("{testdata}/nested_structs.rust.parquet");
3629 let file = File::open(&path).unwrap();
3630 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3631
3632 for batch in record_batch_reader {
3633 batch.unwrap();
3634 }
3635
3636 let file = File::open(&path).unwrap();
3637 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3638
3639 let mask = ProjectionMask::columns(
3640 builder.parquet_schema(),
3641 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3642 );
3643 let projected_reader = builder
3644 .with_projection(mask)
3645 .with_batch_size(60)
3646 .build()
3647 .unwrap();
3648
3649 let expected_schema = Schema::new(vec![
3650 Field::new(
3651 "roll_num",
3652 ArrowDataType::Struct(Fields::from(vec![Field::new(
3653 "count",
3654 ArrowDataType::UInt64,
3655 false,
3656 )])),
3657 false,
3658 ),
3659 Field::new(
3660 "PC_CUR",
3661 ArrowDataType::Struct(Fields::from(vec![
3662 Field::new("mean", ArrowDataType::Int64, false),
3663 Field::new("sum", ArrowDataType::Int64, false),
3664 ])),
3665 false,
3666 ),
3667 ]);
3668
3669 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3670
3671 for batch in projected_reader {
3672 let batch = batch.unwrap();
3673 assert_eq!(batch.schema().as_ref(), &expected_schema);
3674 }
3675 }
3676
3677 #[test]
3678 fn test_read_maps() {
3679 let testdata = arrow::util::test_util::parquet_test_data();
3680 let path = format!("{testdata}/nested_maps.snappy.parquet");
3681 let file = File::open(path).unwrap();
3682 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3683
3684 for batch in record_batch_reader {
3685 batch.unwrap();
3686 }
3687 }
3688
3689 #[test]
3690 fn test_nested_nullability() {
3691 let message_type = "message nested {
3692 OPTIONAL Group group {
3693 REQUIRED INT32 leaf;
3694 }
3695 }";
3696
3697 let file = tempfile::tempfile().unwrap();
3698 let schema = Arc::new(parse_message_type(message_type).unwrap());
3699
3700 {
3701 let mut writer =
3703 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3704 .unwrap();
3705
3706 {
3707 let mut row_group_writer = writer.next_row_group().unwrap();
3708 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3709
3710 column_writer
3711 .typed::<Int32Type>()
3712 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3713 .unwrap();
3714
3715 column_writer.close().unwrap();
3716 row_group_writer.close().unwrap();
3717 }
3718
3719 writer.close().unwrap();
3720 }
3721
3722 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3723 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3724
3725 let reader = builder.with_projection(mask).build().unwrap();
3726
3727 let expected_schema = Schema::new(vec![Field::new(
3728 "group",
3729 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3730 true,
3731 )]);
3732
3733 let batch = reader.into_iter().next().unwrap().unwrap();
3734 assert_eq!(batch.schema().as_ref(), &expected_schema);
3735 assert_eq!(batch.num_rows(), 4);
3736 assert_eq!(batch.column(0).null_count(), 2);
3737 }
3738
3739 #[test]
3740 fn test_dictionary_preservation() {
3741 let fields = vec![Arc::new(
3742 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3743 .with_repetition(Repetition::OPTIONAL)
3744 .with_converted_type(ConvertedType::UTF8)
3745 .build()
3746 .unwrap(),
3747 )];
3748
3749 let schema = Arc::new(
3750 Type::group_type_builder("test_schema")
3751 .with_fields(fields)
3752 .build()
3753 .unwrap(),
3754 );
3755
3756 let dict_type = ArrowDataType::Dictionary(
3757 Box::new(ArrowDataType::Int32),
3758 Box::new(ArrowDataType::Utf8),
3759 );
3760
3761 let arrow_field = Field::new("leaf", dict_type, true);
3762
3763 let mut file = tempfile::tempfile().unwrap();
3764
3765 let values = vec![
3766 vec![
3767 ByteArray::from("hello"),
3768 ByteArray::from("a"),
3769 ByteArray::from("b"),
3770 ByteArray::from("d"),
3771 ],
3772 vec![
3773 ByteArray::from("c"),
3774 ByteArray::from("a"),
3775 ByteArray::from("b"),
3776 ],
3777 ];
3778
3779 let def_levels = vec![
3780 vec![1, 0, 0, 1, 0, 0, 1, 1],
3781 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3782 ];
3783
3784 let opts = TestOptions {
3785 encoding: Encoding::RLE_DICTIONARY,
3786 ..Default::default()
3787 };
3788
3789 generate_single_column_file_with_data::<ByteArrayType>(
3790 &values,
3791 Some(&def_levels),
3792 file.try_clone().unwrap(), schema,
3794 Some(arrow_field),
3795 &opts,
3796 )
3797 .unwrap();
3798
3799 file.rewind().unwrap();
3800
3801 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3802
3803 let batches = record_reader
3804 .collect::<Result<Vec<RecordBatch>, _>>()
3805 .unwrap();
3806
3807 assert_eq!(batches.len(), 6);
3808 assert!(batches.iter().all(|x| x.num_columns() == 1));
3809
3810 let row_counts = batches
3811 .iter()
3812 .map(|x| (x.num_rows(), x.column(0).null_count()))
3813 .collect::<Vec<_>>();
3814
3815 assert_eq!(
3816 row_counts,
3817 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3818 );
3819
3820 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3821
3822 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3824 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3826 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3827 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3829 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3830 }
3831
3832 #[test]
3833 fn test_read_null_list() {
3834 let testdata = arrow::util::test_util::parquet_test_data();
3835 let path = format!("{testdata}/null_list.parquet");
3836 let file = File::open(path).unwrap();
3837 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3838
3839 let batch = record_batch_reader.next().unwrap().unwrap();
3840 assert_eq!(batch.num_rows(), 1);
3841 assert_eq!(batch.num_columns(), 1);
3842 assert_eq!(batch.column(0).len(), 1);
3843
3844 let list = batch
3845 .column(0)
3846 .as_any()
3847 .downcast_ref::<ListArray>()
3848 .unwrap();
3849 assert_eq!(list.len(), 1);
3850 assert!(list.is_valid(0));
3851
3852 let val = list.value(0);
3853 assert_eq!(val.len(), 0);
3854 }
3855
3856 #[test]
3857 fn test_null_schema_inference() {
3858 let testdata = arrow::util::test_util::parquet_test_data();
3859 let path = format!("{testdata}/null_list.parquet");
3860 let file = File::open(path).unwrap();
3861
3862 let arrow_field = Field::new(
3863 "emptylist",
3864 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3865 true,
3866 );
3867
3868 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3869 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3870 let schema = builder.schema();
3871 assert_eq!(schema.fields().len(), 1);
3872 assert_eq!(schema.field(0), &arrow_field);
3873 }
3874
3875 #[test]
3876 fn test_skip_metadata() {
3877 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3878 let field = Field::new("col", col.data_type().clone(), true);
3879
3880 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3881
3882 let metadata = [("key".to_string(), "value".to_string())]
3883 .into_iter()
3884 .collect();
3885
3886 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3887
3888 assert_ne!(schema_with_metadata, schema_without_metadata);
3889
3890 let batch =
3891 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3892
3893 let file = |version: WriterVersion| {
3894 let props = WriterProperties::builder()
3895 .set_writer_version(version)
3896 .build();
3897
3898 let file = tempfile().unwrap();
3899 let mut writer =
3900 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3901 .unwrap();
3902 writer.write(&batch).unwrap();
3903 writer.close().unwrap();
3904 file
3905 };
3906
3907 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3908
3909 let v1_reader = file(WriterVersion::PARQUET_1_0);
3910 let v2_reader = file(WriterVersion::PARQUET_2_0);
3911
3912 let arrow_reader =
3913 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3914 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3915
3916 let reader =
3917 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3918 .unwrap()
3919 .build()
3920 .unwrap();
3921 assert_eq!(reader.schema(), schema_without_metadata);
3922
3923 let arrow_reader =
3924 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3925 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3926
3927 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3928 .unwrap()
3929 .build()
3930 .unwrap();
3931 assert_eq!(reader.schema(), schema_without_metadata);
3932 }
3933
3934 fn write_parquet_from_iter<I, F>(value: I) -> File
3935 where
3936 I: IntoIterator<Item = (F, ArrayRef)>,
3937 F: AsRef<str>,
3938 {
3939 let batch = RecordBatch::try_from_iter(value).unwrap();
3940 let file = tempfile().unwrap();
3941 let mut writer =
3942 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3943 writer.write(&batch).unwrap();
3944 writer.close().unwrap();
3945 file
3946 }
3947
3948 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3949 where
3950 I: IntoIterator<Item = (F, ArrayRef)>,
3951 F: AsRef<str>,
3952 {
3953 let file = write_parquet_from_iter(value);
3954 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3955 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3956 file.try_clone().unwrap(),
3957 options_with_schema,
3958 );
3959 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3960 }
3961
3962 #[test]
3963 fn test_schema_too_few_columns() {
3964 run_schema_test_with_error(
3965 vec![
3966 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3967 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3968 ],
3969 Arc::new(Schema::new(vec![Field::new(
3970 "int64",
3971 ArrowDataType::Int64,
3972 false,
3973 )])),
3974 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3975 );
3976 }
3977
3978 #[test]
3979 fn test_schema_too_many_columns() {
3980 run_schema_test_with_error(
3981 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3982 Arc::new(Schema::new(vec![
3983 Field::new("int64", ArrowDataType::Int64, false),
3984 Field::new("int32", ArrowDataType::Int32, false),
3985 ])),
3986 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3987 );
3988 }
3989
3990 #[test]
3991 fn test_schema_mismatched_column_names() {
3992 run_schema_test_with_error(
3993 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3994 Arc::new(Schema::new(vec![Field::new(
3995 "other",
3996 ArrowDataType::Int64,
3997 false,
3998 )])),
3999 "Arrow: incompatible arrow schema, expected field named int64 got other",
4000 );
4001 }
4002
4003 #[test]
4004 fn test_schema_incompatible_columns() {
4005 run_schema_test_with_error(
4006 vec![
4007 (
4008 "col1_invalid",
4009 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4010 ),
4011 (
4012 "col2_valid",
4013 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4014 ),
4015 (
4016 "col3_invalid",
4017 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4018 ),
4019 ],
4020 Arc::new(Schema::new(vec![
4021 Field::new("col1_invalid", ArrowDataType::Int32, false),
4022 Field::new("col2_valid", ArrowDataType::Int32, false),
4023 Field::new("col3_invalid", ArrowDataType::Int32, false),
4024 ])),
4025 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4026 );
4027 }
4028
4029 #[test]
4030 fn test_one_incompatible_nested_column() {
4031 let nested_fields = Fields::from(vec![
4032 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4033 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4034 ]);
4035 let nested = StructArray::try_new(
4036 nested_fields,
4037 vec![
4038 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4039 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4040 ],
4041 None,
4042 )
4043 .expect("struct array");
4044 let supplied_nested_fields = Fields::from(vec![
4045 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4046 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4047 ]);
4048 run_schema_test_with_error(
4049 vec![
4050 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4051 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4052 ("nested", Arc::new(nested) as ArrayRef),
4053 ],
4054 Arc::new(Schema::new(vec![
4055 Field::new("col1", ArrowDataType::Int64, false),
4056 Field::new("col2", ArrowDataType::Int32, false),
4057 Field::new(
4058 "nested",
4059 ArrowDataType::Struct(supplied_nested_fields),
4060 false,
4061 ),
4062 ])),
4063 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4064 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4065 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4066 );
4067 }
4068
4069 fn utf8_parquet() -> Bytes {
4071 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4072 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4073 let props = None;
4074 let mut parquet_data = vec![];
4076 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4077 writer.write(&batch).unwrap();
4078 writer.close().unwrap();
4079 Bytes::from(parquet_data)
4080 }
4081
4082 #[test]
4083 fn test_schema_error_bad_types() {
4084 let parquet_data = utf8_parquet();
4086
4087 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4089 "column1",
4090 arrow::datatypes::DataType::Int32,
4091 false,
4092 )]));
4093
4094 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4096 let err =
4097 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4098 .unwrap_err();
4099 assert_eq!(
4100 err.to_string(),
4101 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4102 )
4103 }
4104
4105 #[test]
4106 fn test_schema_error_bad_nullability() {
4107 let parquet_data = utf8_parquet();
4109
4110 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4112 "column1",
4113 arrow::datatypes::DataType::Utf8,
4114 true,
4115 )]));
4116
4117 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4119 let err =
4120 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4121 .unwrap_err();
4122 assert_eq!(
4123 err.to_string(),
4124 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4125 )
4126 }
4127
4128 #[test]
4129 fn test_read_binary_as_utf8() {
4130 let file = write_parquet_from_iter(vec![
4131 (
4132 "binary_to_utf8",
4133 Arc::new(BinaryArray::from(vec![
4134 b"one".as_ref(),
4135 b"two".as_ref(),
4136 b"three".as_ref(),
4137 ])) as ArrayRef,
4138 ),
4139 (
4140 "large_binary_to_large_utf8",
4141 Arc::new(LargeBinaryArray::from(vec![
4142 b"one".as_ref(),
4143 b"two".as_ref(),
4144 b"three".as_ref(),
4145 ])) as ArrayRef,
4146 ),
4147 (
4148 "binary_view_to_utf8_view",
4149 Arc::new(BinaryViewArray::from(vec![
4150 b"one".as_ref(),
4151 b"two".as_ref(),
4152 b"three".as_ref(),
4153 ])) as ArrayRef,
4154 ),
4155 ]);
4156 let supplied_fields = Fields::from(vec![
4157 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4158 Field::new(
4159 "large_binary_to_large_utf8",
4160 ArrowDataType::LargeUtf8,
4161 false,
4162 ),
4163 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4164 ]);
4165
4166 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4167 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4168 file.try_clone().unwrap(),
4169 options,
4170 )
4171 .expect("reader builder with schema")
4172 .build()
4173 .expect("reader with schema");
4174
4175 let batch = arrow_reader.next().unwrap().unwrap();
4176 assert_eq!(batch.num_columns(), 3);
4177 assert_eq!(batch.num_rows(), 3);
4178 assert_eq!(
4179 batch
4180 .column(0)
4181 .as_string::<i32>()
4182 .iter()
4183 .collect::<Vec<_>>(),
4184 vec![Some("one"), Some("two"), Some("three")]
4185 );
4186
4187 assert_eq!(
4188 batch
4189 .column(1)
4190 .as_string::<i64>()
4191 .iter()
4192 .collect::<Vec<_>>(),
4193 vec![Some("one"), Some("two"), Some("three")]
4194 );
4195
4196 assert_eq!(
4197 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4198 vec![Some("one"), Some("two"), Some("three")]
4199 );
4200 }
4201
4202 #[test]
4203 #[should_panic(expected = "Invalid UTF8 sequence at")]
4204 fn test_read_non_utf8_binary_as_utf8() {
4205 let file = write_parquet_from_iter(vec![(
4206 "non_utf8_binary",
4207 Arc::new(BinaryArray::from(vec![
4208 b"\xDE\x00\xFF".as_ref(),
4209 b"\xDE\x01\xAA".as_ref(),
4210 b"\xDE\x02\xFF".as_ref(),
4211 ])) as ArrayRef,
4212 )]);
4213 let supplied_fields = Fields::from(vec![Field::new(
4214 "non_utf8_binary",
4215 ArrowDataType::Utf8,
4216 false,
4217 )]);
4218
4219 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4220 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4221 file.try_clone().unwrap(),
4222 options,
4223 )
4224 .expect("reader builder with schema")
4225 .build()
4226 .expect("reader with schema");
4227 arrow_reader.next().unwrap().unwrap_err();
4228 }
4229
4230 #[test]
4231 fn test_with_schema() {
4232 let nested_fields = Fields::from(vec![
4233 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4234 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4235 ]);
4236
4237 let nested_arrays: Vec<ArrayRef> = vec![
4238 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4239 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4240 ];
4241
4242 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4243
4244 let file = write_parquet_from_iter(vec![
4245 (
4246 "int32_to_ts_second",
4247 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4248 ),
4249 (
4250 "date32_to_date64",
4251 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4252 ),
4253 ("nested", Arc::new(nested) as ArrayRef),
4254 ]);
4255
4256 let supplied_nested_fields = Fields::from(vec![
4257 Field::new(
4258 "utf8_to_dict",
4259 ArrowDataType::Dictionary(
4260 Box::new(ArrowDataType::Int32),
4261 Box::new(ArrowDataType::Utf8),
4262 ),
4263 false,
4264 ),
4265 Field::new(
4266 "int64_to_ts_nano",
4267 ArrowDataType::Timestamp(
4268 arrow::datatypes::TimeUnit::Nanosecond,
4269 Some("+10:00".into()),
4270 ),
4271 false,
4272 ),
4273 ]);
4274
4275 let supplied_schema = Arc::new(Schema::new(vec![
4276 Field::new(
4277 "int32_to_ts_second",
4278 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4279 false,
4280 ),
4281 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4282 Field::new(
4283 "nested",
4284 ArrowDataType::Struct(supplied_nested_fields),
4285 false,
4286 ),
4287 ]));
4288
4289 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4290 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4291 file.try_clone().unwrap(),
4292 options,
4293 )
4294 .expect("reader builder with schema")
4295 .build()
4296 .expect("reader with schema");
4297
4298 assert_eq!(arrow_reader.schema(), supplied_schema);
4299 let batch = arrow_reader.next().unwrap().unwrap();
4300 assert_eq!(batch.num_columns(), 3);
4301 assert_eq!(batch.num_rows(), 4);
4302 assert_eq!(
4303 batch
4304 .column(0)
4305 .as_any()
4306 .downcast_ref::<TimestampSecondArray>()
4307 .expect("downcast to timestamp second")
4308 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4309 .map(|v| v.to_string())
4310 .expect("value as datetime"),
4311 "1970-01-01 01:00:00 +01:00"
4312 );
4313 assert_eq!(
4314 batch
4315 .column(1)
4316 .as_any()
4317 .downcast_ref::<Date64Array>()
4318 .expect("downcast to date64")
4319 .value_as_date(0)
4320 .map(|v| v.to_string())
4321 .expect("value as date"),
4322 "1970-01-01"
4323 );
4324
4325 let nested = batch
4326 .column(2)
4327 .as_any()
4328 .downcast_ref::<StructArray>()
4329 .expect("downcast to struct");
4330
4331 let nested_dict = nested
4332 .column(0)
4333 .as_any()
4334 .downcast_ref::<Int32DictionaryArray>()
4335 .expect("downcast to dictionary");
4336
4337 assert_eq!(
4338 nested_dict
4339 .values()
4340 .as_any()
4341 .downcast_ref::<StringArray>()
4342 .expect("downcast to string")
4343 .iter()
4344 .collect::<Vec<_>>(),
4345 vec![Some("a"), Some("b")]
4346 );
4347
4348 assert_eq!(
4349 nested_dict.keys().iter().collect::<Vec<_>>(),
4350 vec![Some(0), Some(0), Some(0), Some(1)]
4351 );
4352
4353 assert_eq!(
4354 nested
4355 .column(1)
4356 .as_any()
4357 .downcast_ref::<TimestampNanosecondArray>()
4358 .expect("downcast to timestamp nanosecond")
4359 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4360 .map(|v| v.to_string())
4361 .expect("value as datetime"),
4362 "1970-01-01 10:00:00.000000001 +10:00"
4363 );
4364 }
4365
4366 #[test]
4367 fn test_empty_projection() {
4368 let testdata = arrow::util::test_util::parquet_test_data();
4369 let path = format!("{testdata}/alltypes_plain.parquet");
4370 let file = File::open(path).unwrap();
4371
4372 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4373 let file_metadata = builder.metadata().file_metadata();
4374 let expected_rows = file_metadata.num_rows() as usize;
4375
4376 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4377 let batch_reader = builder
4378 .with_projection(mask)
4379 .with_batch_size(2)
4380 .build()
4381 .unwrap();
4382
4383 let mut total_rows = 0;
4384 for maybe_batch in batch_reader {
4385 let batch = maybe_batch.unwrap();
4386 total_rows += batch.num_rows();
4387 assert_eq!(batch.num_columns(), 0);
4388 assert!(batch.num_rows() <= 2);
4389 }
4390
4391 assert_eq!(total_rows, expected_rows);
4392 }
4393
4394 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4395 let schema = Arc::new(Schema::new(vec![Field::new(
4396 "list",
4397 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4398 true,
4399 )]));
4400
4401 let mut buf = Vec::with_capacity(1024);
4402
4403 let mut writer = ArrowWriter::try_new(
4404 &mut buf,
4405 schema.clone(),
4406 Some(
4407 WriterProperties::builder()
4408 .set_max_row_group_row_count(Some(row_group_size))
4409 .build(),
4410 ),
4411 )
4412 .unwrap();
4413 for _ in 0..2 {
4414 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4415 for _ in 0..(batch_size) {
4416 list_builder.append(true);
4417 }
4418 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4419 .unwrap();
4420 writer.write(&batch).unwrap();
4421 }
4422 writer.close().unwrap();
4423
4424 let mut record_reader =
4425 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4426 assert_eq!(
4427 batch_size,
4428 record_reader.next().unwrap().unwrap().num_rows()
4429 );
4430 assert_eq!(
4431 batch_size,
4432 record_reader.next().unwrap().unwrap().num_rows()
4433 );
4434 }
4435
4436 #[test]
4437 fn test_row_group_exact_multiple() {
4438 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4439 test_row_group_batch(8, 8);
4440 test_row_group_batch(10, 8);
4441 test_row_group_batch(8, 10);
4442 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4443 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4444 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4445 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4446 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4447 }
4448
4449 fn get_expected_batches(
4452 column: &RecordBatch,
4453 selection: &RowSelection,
4454 batch_size: usize,
4455 ) -> Vec<RecordBatch> {
4456 let mut expected_batches = vec![];
4457
4458 let mut selection: VecDeque<_> = selection.clone().into();
4459 let mut row_offset = 0;
4460 let mut last_start = None;
4461 while row_offset < column.num_rows() && !selection.is_empty() {
4462 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4463 while batch_remaining > 0 && !selection.is_empty() {
4464 let (to_read, skip) = match selection.front_mut() {
4465 Some(selection) if selection.row_count > batch_remaining => {
4466 selection.row_count -= batch_remaining;
4467 (batch_remaining, selection.skip)
4468 }
4469 Some(_) => {
4470 let select = selection.pop_front().unwrap();
4471 (select.row_count, select.skip)
4472 }
4473 None => break,
4474 };
4475
4476 batch_remaining -= to_read;
4477
4478 match skip {
4479 true => {
4480 if let Some(last_start) = last_start.take() {
4481 expected_batches.push(column.slice(last_start, row_offset - last_start))
4482 }
4483 row_offset += to_read
4484 }
4485 false => {
4486 last_start.get_or_insert(row_offset);
4487 row_offset += to_read
4488 }
4489 }
4490 }
4491 }
4492
4493 if let Some(last_start) = last_start.take() {
4494 expected_batches.push(column.slice(last_start, row_offset - last_start))
4495 }
4496
4497 for batch in &expected_batches[..expected_batches.len() - 1] {
4499 assert_eq!(batch.num_rows(), batch_size);
4500 }
4501
4502 expected_batches
4503 }
4504
4505 fn create_test_selection(
4506 step_len: usize,
4507 total_len: usize,
4508 skip_first: bool,
4509 ) -> (RowSelection, usize) {
4510 let mut remaining = total_len;
4511 let mut skip = skip_first;
4512 let mut vec = vec![];
4513 let mut selected_count = 0;
4514 while remaining != 0 {
4515 let step = if remaining > step_len {
4516 step_len
4517 } else {
4518 remaining
4519 };
4520 vec.push(RowSelector {
4521 row_count: step,
4522 skip,
4523 });
4524 remaining -= step;
4525 if !skip {
4526 selected_count += step;
4527 }
4528 skip = !skip;
4529 }
4530 (vec.into(), selected_count)
4531 }
4532
4533 #[test]
4534 fn test_scan_row_with_selection() {
4535 let testdata = arrow::util::test_util::parquet_test_data();
4536 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4537 let test_file = File::open(&path).unwrap();
4538
4539 let mut serial_reader =
4540 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4541 let data = serial_reader.next().unwrap().unwrap();
4542
4543 let do_test = |batch_size: usize, selection_len: usize| {
4544 for skip_first in [false, true] {
4545 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4546
4547 let expected = get_expected_batches(&data, &selections, batch_size);
4548 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4549 assert_eq!(
4550 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4551 expected,
4552 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4553 );
4554 }
4555 };
4556
4557 do_test(1000, 1000);
4560
4561 do_test(20, 20);
4563
4564 do_test(20, 5);
4566
4567 do_test(20, 5);
4570
4571 fn create_skip_reader(
4572 test_file: &File,
4573 batch_size: usize,
4574 selections: RowSelection,
4575 ) -> ParquetRecordBatchReader {
4576 let options =
4577 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4578 let file = test_file.try_clone().unwrap();
4579 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4580 .unwrap()
4581 .with_batch_size(batch_size)
4582 .with_row_selection(selections)
4583 .build()
4584 .unwrap()
4585 }
4586 }
4587
4588 #[test]
4589 fn test_batch_size_overallocate() {
4590 let testdata = arrow::util::test_util::parquet_test_data();
4591 let path = format!("{testdata}/alltypes_plain.parquet");
4593 let test_file = File::open(path).unwrap();
4594
4595 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4596 let num_rows = builder.metadata.file_metadata().num_rows();
4597 let reader = builder
4598 .with_batch_size(1024)
4599 .with_projection(ProjectionMask::all())
4600 .build()
4601 .unwrap();
4602 assert_ne!(1024, num_rows);
4603 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4604 }
4605
4606 #[test]
4607 fn test_read_with_page_index_enabled() {
4608 let testdata = arrow::util::test_util::parquet_test_data();
4609
4610 {
4611 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4613 let test_file = File::open(path).unwrap();
4614 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4615 test_file,
4616 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4617 )
4618 .unwrap();
4619 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4620 let reader = builder.build().unwrap();
4621 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4622 assert_eq!(batches.len(), 8);
4623 }
4624
4625 {
4626 let path = format!("{testdata}/alltypes_plain.parquet");
4628 let test_file = File::open(path).unwrap();
4629 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4630 test_file,
4631 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4632 )
4633 .unwrap();
4634 assert!(builder.metadata().offset_index().is_none());
4637 let reader = builder.build().unwrap();
4638 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4639 assert_eq!(batches.len(), 1);
4640 }
4641 }
4642
4643 #[test]
4644 fn test_raw_repetition() {
4645 const MESSAGE_TYPE: &str = "
4646 message Log {
4647 OPTIONAL INT32 eventType;
4648 REPEATED INT32 category;
4649 REPEATED group filter {
4650 OPTIONAL INT32 error;
4651 }
4652 }
4653 ";
4654 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4655 let props = Default::default();
4656
4657 let mut buf = Vec::with_capacity(1024);
4658 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4659 let mut row_group_writer = writer.next_row_group().unwrap();
4660
4661 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4663 col_writer
4664 .typed::<Int32Type>()
4665 .write_batch(&[1], Some(&[1]), None)
4666 .unwrap();
4667 col_writer.close().unwrap();
4668 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4670 col_writer
4671 .typed::<Int32Type>()
4672 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4673 .unwrap();
4674 col_writer.close().unwrap();
4675 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4677 col_writer
4678 .typed::<Int32Type>()
4679 .write_batch(&[1], Some(&[1]), Some(&[0]))
4680 .unwrap();
4681 col_writer.close().unwrap();
4682
4683 let rg_md = row_group_writer.close().unwrap();
4684 assert_eq!(rg_md.num_rows(), 1);
4685 writer.close().unwrap();
4686
4687 let bytes = Bytes::from(buf);
4688
4689 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4690 let full = no_mask.next().unwrap().unwrap();
4691
4692 assert_eq!(full.num_columns(), 3);
4693
4694 for idx in 0..3 {
4695 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4696 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4697 let mut reader = b.with_projection(mask).build().unwrap();
4698 let projected = reader.next().unwrap().unwrap();
4699
4700 assert_eq!(projected.num_columns(), 1);
4701 assert_eq!(full.column(idx), projected.column(0));
4702 }
4703 }
4704
4705 #[test]
4706 fn test_read_lz4_raw() {
4707 let testdata = arrow::util::test_util::parquet_test_data();
4708 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4709 let file = File::open(path).unwrap();
4710
4711 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4712 .unwrap()
4713 .collect::<Result<Vec<_>, _>>()
4714 .unwrap();
4715 assert_eq!(batches.len(), 1);
4716 let batch = &batches[0];
4717
4718 assert_eq!(batch.num_columns(), 3);
4719 assert_eq!(batch.num_rows(), 4);
4720
4721 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4723 assert_eq!(
4724 a.values(),
4725 &[1593604800, 1593604800, 1593604801, 1593604801]
4726 );
4727
4728 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4729 let a: Vec<_> = a.iter().flatten().collect();
4730 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4731
4732 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4733 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4734 }
4735
4736 #[test]
4746 fn test_read_lz4_hadoop_fallback() {
4747 for file in [
4748 "hadoop_lz4_compressed.parquet",
4749 "non_hadoop_lz4_compressed.parquet",
4750 ] {
4751 let testdata = arrow::util::test_util::parquet_test_data();
4752 let path = format!("{testdata}/{file}");
4753 let file = File::open(path).unwrap();
4754 let expected_rows = 4;
4755
4756 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4757 .unwrap()
4758 .collect::<Result<Vec<_>, _>>()
4759 .unwrap();
4760 assert_eq!(batches.len(), 1);
4761 let batch = &batches[0];
4762
4763 assert_eq!(batch.num_columns(), 3);
4764 assert_eq!(batch.num_rows(), expected_rows);
4765
4766 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4767 assert_eq!(
4768 a.values(),
4769 &[1593604800, 1593604800, 1593604801, 1593604801]
4770 );
4771
4772 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4773 let b: Vec<_> = b.iter().flatten().collect();
4774 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4775
4776 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4777 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4778 }
4779 }
4780
4781 #[test]
4782 fn test_read_lz4_hadoop_large() {
4783 let testdata = arrow::util::test_util::parquet_test_data();
4784 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4785 let file = File::open(path).unwrap();
4786 let expected_rows = 10000;
4787
4788 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4789 .unwrap()
4790 .collect::<Result<Vec<_>, _>>()
4791 .unwrap();
4792 assert_eq!(batches.len(), 1);
4793 let batch = &batches[0];
4794
4795 assert_eq!(batch.num_columns(), 1);
4796 assert_eq!(batch.num_rows(), expected_rows);
4797
4798 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4799 let a: Vec<_> = a.iter().flatten().collect();
4800 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4801 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4802 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4803 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4804 }
4805
4806 #[test]
4807 #[cfg(feature = "snap")]
4808 fn test_read_nested_lists() {
4809 let testdata = arrow::util::test_util::parquet_test_data();
4810 let path = format!("{testdata}/nested_lists.snappy.parquet");
4811 let file = File::open(path).unwrap();
4812
4813 let f = file.try_clone().unwrap();
4814 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4815 let expected = reader.next().unwrap().unwrap();
4816 assert_eq!(expected.num_rows(), 3);
4817
4818 let selection = RowSelection::from(vec![
4819 RowSelector::skip(1),
4820 RowSelector::select(1),
4821 RowSelector::skip(1),
4822 ]);
4823 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4824 .unwrap()
4825 .with_row_selection(selection)
4826 .build()
4827 .unwrap();
4828
4829 let actual = reader.next().unwrap().unwrap();
4830 assert_eq!(actual.num_rows(), 1);
4831 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4832 }
4833
4834 #[test]
4835 fn test_arbitrary_decimal() {
4836 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4837 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4838 .with_precision_and_scale(19, 0)
4839 .unwrap();
4840 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4841 .with_precision_and_scale(12, 0)
4842 .unwrap();
4843 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4844 .with_precision_and_scale(17, 10)
4845 .unwrap();
4846
4847 let written = RecordBatch::try_from_iter([
4848 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4849 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4850 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4851 ])
4852 .unwrap();
4853
4854 let mut buffer = Vec::with_capacity(1024);
4855 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4856 writer.write(&written).unwrap();
4857 writer.close().unwrap();
4858
4859 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4860 .unwrap()
4861 .collect::<Result<Vec<_>, _>>()
4862 .unwrap();
4863
4864 assert_eq!(&written.slice(0, 8), &read[0]);
4865 }
4866
4867 #[test]
4868 fn test_list_skip() {
4869 let mut list = ListBuilder::new(Int32Builder::new());
4870 list.append_value([Some(1), Some(2)]);
4871 list.append_value([Some(3)]);
4872 list.append_value([Some(4)]);
4873 let list = list.finish();
4874 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4875
4876 let props = WriterProperties::builder()
4878 .set_data_page_row_count_limit(1)
4879 .set_write_batch_size(2)
4880 .build();
4881
4882 let mut buffer = Vec::with_capacity(1024);
4883 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4884 writer.write(&batch).unwrap();
4885 writer.close().unwrap();
4886
4887 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4888 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4889 .unwrap()
4890 .with_row_selection(selection.into())
4891 .build()
4892 .unwrap();
4893 let out = reader.next().unwrap().unwrap();
4894 assert_eq!(out.num_rows(), 1);
4895 assert_eq!(out, batch.slice(2, 1));
4896 }
4897
4898 fn test_decimal32_roundtrip() {
4899 let d = |values: Vec<i32>, p: u8| {
4900 let iter = values.into_iter();
4901 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4902 .with_precision_and_scale(p, 2)
4903 .unwrap()
4904 };
4905
4906 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4907 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4908
4909 let mut buffer = Vec::with_capacity(1024);
4910 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4911 writer.write(&batch).unwrap();
4912 writer.close().unwrap();
4913
4914 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4915 let t1 = builder.parquet_schema().columns()[0].physical_type();
4916 assert_eq!(t1, PhysicalType::INT32);
4917
4918 let mut reader = builder.build().unwrap();
4919 assert_eq!(batch.schema(), reader.schema());
4920
4921 let out = reader.next().unwrap().unwrap();
4922 assert_eq!(batch, out);
4923 }
4924
4925 fn test_decimal64_roundtrip() {
4926 let d = |values: Vec<i64>, p: u8| {
4930 let iter = values.into_iter();
4931 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4932 .with_precision_and_scale(p, 2)
4933 .unwrap()
4934 };
4935
4936 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4937 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4938 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4939
4940 let batch = RecordBatch::try_from_iter([
4941 ("d1", Arc::new(d1) as ArrayRef),
4942 ("d2", Arc::new(d2) as ArrayRef),
4943 ("d3", Arc::new(d3) as ArrayRef),
4944 ])
4945 .unwrap();
4946
4947 let mut buffer = Vec::with_capacity(1024);
4948 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4949 writer.write(&batch).unwrap();
4950 writer.close().unwrap();
4951
4952 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4953 let t1 = builder.parquet_schema().columns()[0].physical_type();
4954 assert_eq!(t1, PhysicalType::INT32);
4955 let t2 = builder.parquet_schema().columns()[1].physical_type();
4956 assert_eq!(t2, PhysicalType::INT64);
4957 let t3 = builder.parquet_schema().columns()[2].physical_type();
4958 assert_eq!(t3, PhysicalType::INT64);
4959
4960 let mut reader = builder.build().unwrap();
4961 assert_eq!(batch.schema(), reader.schema());
4962
4963 let out = reader.next().unwrap().unwrap();
4964 assert_eq!(batch, out);
4965 }
4966
4967 fn test_decimal_roundtrip<T: DecimalType>() {
4968 let d = |values: Vec<usize>, p: u8| {
4973 let iter = values.into_iter().map(T::Native::usize_as);
4974 PrimitiveArray::<T>::from_iter_values(iter)
4975 .with_precision_and_scale(p, 2)
4976 .unwrap()
4977 };
4978
4979 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4980 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4981 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4982 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4983
4984 let batch = RecordBatch::try_from_iter([
4985 ("d1", Arc::new(d1) as ArrayRef),
4986 ("d2", Arc::new(d2) as ArrayRef),
4987 ("d3", Arc::new(d3) as ArrayRef),
4988 ("d4", Arc::new(d4) as ArrayRef),
4989 ])
4990 .unwrap();
4991
4992 let mut buffer = Vec::with_capacity(1024);
4993 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4994 writer.write(&batch).unwrap();
4995 writer.close().unwrap();
4996
4997 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4998 let t1 = builder.parquet_schema().columns()[0].physical_type();
4999 assert_eq!(t1, PhysicalType::INT32);
5000 let t2 = builder.parquet_schema().columns()[1].physical_type();
5001 assert_eq!(t2, PhysicalType::INT64);
5002 let t3 = builder.parquet_schema().columns()[2].physical_type();
5003 assert_eq!(t3, PhysicalType::INT64);
5004 let t4 = builder.parquet_schema().columns()[3].physical_type();
5005 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5006
5007 let mut reader = builder.build().unwrap();
5008 assert_eq!(batch.schema(), reader.schema());
5009
5010 let out = reader.next().unwrap().unwrap();
5011 assert_eq!(batch, out);
5012 }
5013
5014 #[test]
5015 fn test_decimal() {
5016 test_decimal32_roundtrip();
5017 test_decimal64_roundtrip();
5018 test_decimal_roundtrip::<Decimal128Type>();
5019 test_decimal_roundtrip::<Decimal256Type>();
5020 }
5021
5022 #[test]
5023 fn test_list_selection() {
5024 let schema = Arc::new(Schema::new(vec![Field::new_list(
5025 "list",
5026 Field::new_list_field(ArrowDataType::Utf8, true),
5027 false,
5028 )]));
5029 let mut buf = Vec::with_capacity(1024);
5030
5031 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5032
5033 for i in 0..2 {
5034 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5035 for j in 0..1024 {
5036 list_a_builder.values().append_value(format!("{i} {j}"));
5037 list_a_builder.append(true);
5038 }
5039 let batch =
5040 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5041 .unwrap();
5042 writer.write(&batch).unwrap();
5043 }
5044 let _metadata = writer.close().unwrap();
5045
5046 let buf = Bytes::from(buf);
5047 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5048 .unwrap()
5049 .with_row_selection(RowSelection::from(vec![
5050 RowSelector::skip(100),
5051 RowSelector::select(924),
5052 RowSelector::skip(100),
5053 RowSelector::select(924),
5054 ]))
5055 .build()
5056 .unwrap();
5057
5058 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5059 let batch = concat_batches(&schema, &batches).unwrap();
5060
5061 assert_eq!(batch.num_rows(), 924 * 2);
5062 let list = batch.column(0).as_list::<i32>();
5063
5064 for w in list.value_offsets().windows(2) {
5065 assert_eq!(w[0] + 1, w[1])
5066 }
5067 let mut values = list.values().as_string::<i32>().iter();
5068
5069 for i in 0..2 {
5070 for j in 100..1024 {
5071 let expected = format!("{i} {j}");
5072 assert_eq!(values.next().unwrap().unwrap(), &expected);
5073 }
5074 }
5075 }
5076
5077 #[test]
5078 fn test_list_selection_fuzz() {
5079 let mut rng = rng();
5080 let schema = Arc::new(Schema::new(vec![Field::new_list(
5081 "list",
5082 Field::new_list(
5083 Field::LIST_FIELD_DEFAULT_NAME,
5084 Field::new_list_field(ArrowDataType::Int32, true),
5085 true,
5086 ),
5087 true,
5088 )]));
5089 let mut buf = Vec::with_capacity(1024);
5090 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5091
5092 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5093
5094 for _ in 0..2048 {
5095 if rng.random_bool(0.2) {
5096 list_a_builder.append(false);
5097 continue;
5098 }
5099
5100 let list_a_len = rng.random_range(0..10);
5101 let list_b_builder = list_a_builder.values();
5102
5103 for _ in 0..list_a_len {
5104 if rng.random_bool(0.2) {
5105 list_b_builder.append(false);
5106 continue;
5107 }
5108
5109 let list_b_len = rng.random_range(0..10);
5110 let int_builder = list_b_builder.values();
5111 for _ in 0..list_b_len {
5112 match rng.random_bool(0.2) {
5113 true => int_builder.append_null(),
5114 false => int_builder.append_value(rng.random()),
5115 }
5116 }
5117 list_b_builder.append(true)
5118 }
5119 list_a_builder.append(true);
5120 }
5121
5122 let array = Arc::new(list_a_builder.finish());
5123 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5124
5125 writer.write(&batch).unwrap();
5126 let _metadata = writer.close().unwrap();
5127
5128 let buf = Bytes::from(buf);
5129
5130 let cases = [
5131 vec![
5132 RowSelector::skip(100),
5133 RowSelector::select(924),
5134 RowSelector::skip(100),
5135 RowSelector::select(924),
5136 ],
5137 vec![
5138 RowSelector::select(924),
5139 RowSelector::skip(100),
5140 RowSelector::select(924),
5141 RowSelector::skip(100),
5142 ],
5143 vec![
5144 RowSelector::skip(1023),
5145 RowSelector::select(1),
5146 RowSelector::skip(1023),
5147 RowSelector::select(1),
5148 ],
5149 vec![
5150 RowSelector::select(1),
5151 RowSelector::skip(1023),
5152 RowSelector::select(1),
5153 RowSelector::skip(1023),
5154 ],
5155 ];
5156
5157 for batch_size in [100, 1024, 2048] {
5158 for selection in &cases {
5159 let selection = RowSelection::from(selection.clone());
5160 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5161 .unwrap()
5162 .with_row_selection(selection.clone())
5163 .with_batch_size(batch_size)
5164 .build()
5165 .unwrap();
5166
5167 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5168 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5169 assert_eq!(actual.num_rows(), selection.row_count());
5170
5171 let mut batch_offset = 0;
5172 let mut actual_offset = 0;
5173 for selector in selection.iter() {
5174 if selector.skip {
5175 batch_offset += selector.row_count;
5176 continue;
5177 }
5178
5179 assert_eq!(
5180 batch.slice(batch_offset, selector.row_count),
5181 actual.slice(actual_offset, selector.row_count)
5182 );
5183
5184 batch_offset += selector.row_count;
5185 actual_offset += selector.row_count;
5186 }
5187 }
5188 }
5189 }
5190
5191 #[test]
5192 fn test_read_old_nested_list() {
5193 use arrow::datatypes::DataType;
5194 use arrow::datatypes::ToByteSlice;
5195
5196 let testdata = arrow::util::test_util::parquet_test_data();
5197 let path = format!("{testdata}/old_list_structure.parquet");
5206 let test_file = File::open(path).unwrap();
5207
5208 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5210
5211 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5213
5214 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5216 "array",
5217 DataType::Int32,
5218 false,
5219 ))))
5220 .len(2)
5221 .add_buffer(a_value_offsets)
5222 .add_child_data(a_values.into_data())
5223 .build()
5224 .unwrap();
5225 let a = ListArray::from(a_list_data);
5226
5227 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5228 let mut reader = builder.build().unwrap();
5229 let out = reader.next().unwrap().unwrap();
5230 assert_eq!(out.num_rows(), 1);
5231 assert_eq!(out.num_columns(), 1);
5232 let c0 = out.column(0);
5234 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5235 let r0 = c0arr.value(0);
5237 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5238 assert_eq!(r0arr, &a);
5239 }
5240
5241 #[test]
5242 fn test_map_no_value() {
5243 let testdata = arrow::util::test_util::parquet_test_data();
5263 let path = format!("{testdata}/map_no_value.parquet");
5264 let file = File::open(path).unwrap();
5265
5266 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5267 .unwrap()
5268 .build()
5269 .unwrap();
5270 let out = reader.next().unwrap().unwrap();
5271 assert_eq!(out.num_rows(), 3);
5272 assert_eq!(out.num_columns(), 3);
5273 let c0 = out.column(1).as_list::<i32>();
5275 let c1 = out.column(2).as_list::<i32>();
5276 assert_eq!(c0.len(), c1.len());
5277 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5278 }
5279
5280 #[test]
5281 fn test_read_unknown_logical_type() {
5282 let testdata = arrow::util::test_util::parquet_test_data();
5283 let path = format!("{testdata}/unknown-logical-type.parquet");
5284 let test_file = File::open(path).unwrap();
5285
5286 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5287 .expect("Error creating reader builder");
5288
5289 let schema = builder.metadata().file_metadata().schema_descr();
5290 assert_eq!(
5291 schema.column(0).logical_type_ref(),
5292 Some(&LogicalType::String)
5293 );
5294 assert_eq!(
5295 schema.column(1).logical_type_ref(),
5296 Some(&LogicalType::_Unknown { field_id: 2555 })
5297 );
5298 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5299
5300 let mut reader = builder.build().unwrap();
5301 let out = reader.next().unwrap().unwrap();
5302 assert_eq!(out.num_rows(), 3);
5303 assert_eq!(out.num_columns(), 2);
5304 }
5305
5306 #[test]
5307 fn test_read_row_numbers() {
5308 let file = write_parquet_from_iter(vec![(
5309 "value",
5310 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5311 )]);
5312 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5313
5314 let row_number_field = Arc::new(
5315 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5316 );
5317
5318 let options = ArrowReaderOptions::new()
5319 .with_schema(Arc::new(Schema::new(supplied_fields)))
5320 .with_virtual_columns(vec![row_number_field.clone()])
5321 .unwrap();
5322 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5323 file.try_clone().unwrap(),
5324 options,
5325 )
5326 .expect("reader builder with schema")
5327 .build()
5328 .expect("reader with schema");
5329
5330 let batch = arrow_reader.next().unwrap().unwrap();
5331 let schema = Arc::new(Schema::new(vec![
5332 Field::new("value", ArrowDataType::Int64, false),
5333 (*row_number_field).clone(),
5334 ]));
5335
5336 assert_eq!(batch.schema(), schema);
5337 assert_eq!(batch.num_columns(), 2);
5338 assert_eq!(batch.num_rows(), 3);
5339 assert_eq!(
5340 batch
5341 .column(0)
5342 .as_primitive::<types::Int64Type>()
5343 .iter()
5344 .collect::<Vec<_>>(),
5345 vec![Some(1), Some(2), Some(3)]
5346 );
5347 assert_eq!(
5348 batch
5349 .column(1)
5350 .as_primitive::<types::Int64Type>()
5351 .iter()
5352 .collect::<Vec<_>>(),
5353 vec![Some(0), Some(1), Some(2)]
5354 );
5355 }
5356
5357 #[test]
5358 fn test_read_only_row_numbers() {
5359 let file = write_parquet_from_iter(vec![(
5360 "value",
5361 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5362 )]);
5363 let row_number_field = Arc::new(
5364 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5365 );
5366 let options = ArrowReaderOptions::new()
5367 .with_virtual_columns(vec![row_number_field.clone()])
5368 .unwrap();
5369 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5370 let num_columns = metadata
5371 .metadata
5372 .file_metadata()
5373 .schema_descr()
5374 .num_columns();
5375
5376 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5377 .with_projection(ProjectionMask::none(num_columns))
5378 .build()
5379 .expect("reader with schema");
5380
5381 let batch = arrow_reader.next().unwrap().unwrap();
5382 let schema = Arc::new(Schema::new(vec![row_number_field]));
5383
5384 assert_eq!(batch.schema(), schema);
5385 assert_eq!(batch.num_columns(), 1);
5386 assert_eq!(batch.num_rows(), 3);
5387 assert_eq!(
5388 batch
5389 .column(0)
5390 .as_primitive::<types::Int64Type>()
5391 .iter()
5392 .collect::<Vec<_>>(),
5393 vec![Some(0), Some(1), Some(2)]
5394 );
5395 }
5396
5397 #[test]
5398 fn test_read_row_numbers_row_group_order() -> Result<()> {
5399 let array = Int64Array::from_iter_values(5000..5100);
5401 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5402 let mut buffer = Vec::new();
5403 let options = WriterProperties::builder()
5404 .set_max_row_group_row_count(Some(50))
5405 .build();
5406 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5407 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5409 writer.write(&batch_chunk)?;
5410 }
5411 writer.close()?;
5412
5413 let row_number_field = Arc::new(
5414 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5415 );
5416
5417 let buffer = Bytes::from(buffer);
5418
5419 let options =
5420 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5421
5422 let arrow_reader =
5424 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5425 .build()?;
5426
5427 assert_eq!(
5428 ValuesAndRowNumbers {
5429 values: (5000..5100).collect(),
5430 row_numbers: (0..100).collect()
5431 },
5432 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5433 );
5434
5435 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5437 .with_row_groups(vec![1, 0])
5438 .build()?;
5439
5440 assert_eq!(
5441 ValuesAndRowNumbers {
5442 values: (5050..5100).chain(5000..5050).collect(),
5443 row_numbers: (50..100).chain(0..50).collect(),
5444 },
5445 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5446 );
5447
5448 Ok(())
5449 }
5450
5451 #[derive(Debug, PartialEq)]
5452 struct ValuesAndRowNumbers {
5453 values: Vec<i64>,
5454 row_numbers: Vec<i64>,
5455 }
5456 impl ValuesAndRowNumbers {
5457 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5458 let mut values = vec![];
5459 let mut row_numbers = vec![];
5460 for batch in reader {
5461 let batch = batch.expect("Could not read batch");
5462 values.extend(
5463 batch
5464 .column_by_name("col")
5465 .expect("Could not get col column")
5466 .as_primitive::<arrow::datatypes::Int64Type>()
5467 .iter()
5468 .map(|v| v.expect("Could not get value")),
5469 );
5470
5471 row_numbers.extend(
5472 batch
5473 .column_by_name("row_number")
5474 .expect("Could not get row_number column")
5475 .as_primitive::<arrow::datatypes::Int64Type>()
5476 .iter()
5477 .map(|v| v.expect("Could not get row number"))
5478 .collect::<Vec<_>>(),
5479 );
5480 }
5481 Self {
5482 values,
5483 row_numbers,
5484 }
5485 }
5486 }
5487
5488 #[test]
5489 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5490 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5492 assert_eq!(
5493 ArrowReaderOptions::new()
5494 .with_virtual_columns(vec![regular_field])
5495 .unwrap_err()
5496 .to_string(),
5497 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5498 );
5499 }
5500
5501 #[test]
5502 fn test_row_numbers_with_multiple_row_groups() {
5503 test_row_numbers_with_multiple_row_groups_helper(
5504 false,
5505 |path, selection, _row_filter, batch_size| {
5506 let file = File::open(path).unwrap();
5507 let row_number_field = Arc::new(
5508 Field::new("row_number", ArrowDataType::Int64, false)
5509 .with_extension_type(RowNumber),
5510 );
5511 let options = ArrowReaderOptions::new()
5512 .with_virtual_columns(vec![row_number_field])
5513 .unwrap();
5514 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5515 .unwrap()
5516 .with_row_selection(selection)
5517 .with_batch_size(batch_size)
5518 .build()
5519 .expect("Could not create reader");
5520 reader
5521 .collect::<Result<Vec<_>, _>>()
5522 .expect("Could not read")
5523 },
5524 );
5525 }
5526
5527 #[test]
5528 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5529 test_row_numbers_with_multiple_row_groups_helper(
5530 true,
5531 |path, selection, row_filter, batch_size| {
5532 let file = File::open(path).unwrap();
5533 let row_number_field = Arc::new(
5534 Field::new("row_number", ArrowDataType::Int64, false)
5535 .with_extension_type(RowNumber),
5536 );
5537 let options = ArrowReaderOptions::new()
5538 .with_virtual_columns(vec![row_number_field])
5539 .unwrap();
5540 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5541 .unwrap()
5542 .with_row_selection(selection)
5543 .with_batch_size(batch_size)
5544 .with_row_filter(row_filter.expect("No filter"))
5545 .build()
5546 .expect("Could not create reader");
5547 reader
5548 .collect::<Result<Vec<_>, _>>()
5549 .expect("Could not read")
5550 },
5551 );
5552 }
5553
5554 #[test]
5555 fn test_read_row_group_indices() {
5556 let array1 = Int64Array::from(vec![1, 2]);
5558 let array2 = Int64Array::from(vec![3, 4]);
5559 let array3 = Int64Array::from(vec![5, 6]);
5560
5561 let batch1 =
5562 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5563 let batch2 =
5564 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5565 let batch3 =
5566 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5567
5568 let mut buffer = Vec::new();
5569 let options = WriterProperties::builder()
5570 .set_max_row_group_row_count(Some(2))
5571 .build();
5572 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5573 writer.write(&batch1).unwrap();
5574 writer.write(&batch2).unwrap();
5575 writer.write(&batch3).unwrap();
5576 writer.close().unwrap();
5577
5578 let file = Bytes::from(buffer);
5579 let row_group_index_field = Arc::new(
5580 Field::new("row_group_index", ArrowDataType::Int64, false)
5581 .with_extension_type(RowGroupIndex),
5582 );
5583
5584 let options = ArrowReaderOptions::new()
5585 .with_virtual_columns(vec![row_group_index_field.clone()])
5586 .unwrap();
5587 let mut arrow_reader =
5588 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5589 .expect("reader builder with virtual columns")
5590 .build()
5591 .expect("reader with virtual columns");
5592
5593 let batch = arrow_reader.next().unwrap().unwrap();
5594
5595 assert_eq!(batch.num_columns(), 2);
5596 assert_eq!(batch.num_rows(), 6);
5597
5598 assert_eq!(
5599 batch
5600 .column(0)
5601 .as_primitive::<types::Int64Type>()
5602 .iter()
5603 .collect::<Vec<_>>(),
5604 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5605 );
5606
5607 assert_eq!(
5608 batch
5609 .column(1)
5610 .as_primitive::<types::Int64Type>()
5611 .iter()
5612 .collect::<Vec<_>>(),
5613 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5614 );
5615 }
5616
5617 #[test]
5618 fn test_read_only_row_group_indices() {
5619 let array1 = Int64Array::from(vec![1, 2, 3]);
5620 let array2 = Int64Array::from(vec![4, 5]);
5621
5622 let batch1 =
5623 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5624 let batch2 =
5625 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5626
5627 let mut buffer = Vec::new();
5628 let options = WriterProperties::builder()
5629 .set_max_row_group_row_count(Some(3))
5630 .build();
5631 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5632 writer.write(&batch1).unwrap();
5633 writer.write(&batch2).unwrap();
5634 writer.close().unwrap();
5635
5636 let file = Bytes::from(buffer);
5637 let row_group_index_field = Arc::new(
5638 Field::new("row_group_index", ArrowDataType::Int64, false)
5639 .with_extension_type(RowGroupIndex),
5640 );
5641
5642 let options = ArrowReaderOptions::new()
5643 .with_virtual_columns(vec![row_group_index_field.clone()])
5644 .unwrap();
5645 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5646 let num_columns = metadata
5647 .metadata
5648 .file_metadata()
5649 .schema_descr()
5650 .num_columns();
5651
5652 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5653 .with_projection(ProjectionMask::none(num_columns))
5654 .build()
5655 .expect("reader with virtual columns only");
5656
5657 let batch = arrow_reader.next().unwrap().unwrap();
5658 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5659
5660 assert_eq!(batch.schema(), schema);
5661 assert_eq!(batch.num_columns(), 1);
5662 assert_eq!(batch.num_rows(), 5);
5663
5664 assert_eq!(
5665 batch
5666 .column(0)
5667 .as_primitive::<types::Int64Type>()
5668 .iter()
5669 .collect::<Vec<_>>(),
5670 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5671 );
5672 }
5673
5674 #[test]
5675 fn test_read_row_group_indices_with_selection() -> Result<()> {
5676 let mut buffer = Vec::new();
5677 let options = WriterProperties::builder()
5678 .set_max_row_group_row_count(Some(10))
5679 .build();
5680
5681 let schema = Arc::new(Schema::new(vec![Field::new(
5682 "value",
5683 ArrowDataType::Int64,
5684 false,
5685 )]));
5686
5687 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5688
5689 for i in 0..3 {
5691 let start = i * 10;
5692 let array = Int64Array::from_iter_values(start..start + 10);
5693 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5694 writer.write(&batch)?;
5695 }
5696 writer.close()?;
5697
5698 let file = Bytes::from(buffer);
5699 let row_group_index_field = Arc::new(
5700 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5701 );
5702
5703 let options =
5704 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5705
5706 let arrow_reader =
5708 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5709 .with_row_groups(vec![2, 1, 0])
5710 .build()?;
5711
5712 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5713 let combined = concat_batches(&batches[0].schema(), &batches)?;
5714
5715 let values = combined.column(0).as_primitive::<types::Int64Type>();
5716 let first_val = values.value(0);
5717 let last_val = values.value(combined.num_rows() - 1);
5718 assert_eq!(first_val, 20);
5720 assert_eq!(last_val, 9);
5722
5723 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5724 assert_eq!(rg_indices.value(0), 2);
5725 assert_eq!(rg_indices.value(10), 1);
5726 assert_eq!(rg_indices.value(20), 0);
5727
5728 Ok(())
5729 }
5730
5731 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5732 use_filter: bool,
5733 test_case: F,
5734 ) where
5735 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5736 {
5737 let seed: u64 = random();
5738 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5739 let mut rng = StdRng::seed_from_u64(seed);
5740
5741 use tempfile::TempDir;
5742 let tempdir = TempDir::new().expect("Could not create temp dir");
5743
5744 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5745
5746 let path = tempdir.path().join("test.parquet");
5747 std::fs::write(&path, bytes).expect("Could not write file");
5748
5749 let mut case = vec![];
5750 let mut remaining = metadata.file_metadata().num_rows();
5751 while remaining > 0 {
5752 let row_count = rng.random_range(1..=remaining);
5753 remaining -= row_count;
5754 case.push(RowSelector {
5755 row_count: row_count as usize,
5756 skip: rng.random_bool(0.5),
5757 });
5758 }
5759
5760 let filter = use_filter.then(|| {
5761 let filter = (0..metadata.file_metadata().num_rows())
5762 .map(|_| rng.random_bool(0.99))
5763 .collect::<Vec<_>>();
5764 let mut filter_offset = 0;
5765 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5766 ProjectionMask::all(),
5767 move |b| {
5768 let array = BooleanArray::from_iter(
5769 filter
5770 .iter()
5771 .skip(filter_offset)
5772 .take(b.num_rows())
5773 .map(|x| Some(*x)),
5774 );
5775 filter_offset += b.num_rows();
5776 Ok(array)
5777 },
5778 ))])
5779 });
5780
5781 let selection = RowSelection::from(case);
5782 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5783
5784 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5785 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5786 return;
5787 }
5788 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5789 .expect("Failed to concatenate");
5790 let values = actual
5792 .column(0)
5793 .as_primitive::<types::Int64Type>()
5794 .iter()
5795 .collect::<Vec<_>>();
5796 let row_numbers = actual
5797 .column(1)
5798 .as_primitive::<types::Int64Type>()
5799 .iter()
5800 .collect::<Vec<_>>();
5801 assert_eq!(
5802 row_numbers
5803 .into_iter()
5804 .map(|number| number.map(|number| number + 1))
5805 .collect::<Vec<_>>(),
5806 values
5807 );
5808 }
5809
5810 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5811 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5812 "value",
5813 ArrowDataType::Int64,
5814 false,
5815 )])));
5816
5817 let mut buf = Vec::with_capacity(1024);
5818 let mut writer =
5819 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5820
5821 let mut values = 1..=rng.random_range(1..4096);
5822 while !values.is_empty() {
5823 let batch_values = values
5824 .by_ref()
5825 .take(rng.random_range(1..4096))
5826 .collect::<Vec<_>>();
5827 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5828 let batch =
5829 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5830 writer.write(&batch).expect("Could not write batch");
5831 writer.flush().expect("Could not flush");
5832 }
5833 let metadata = writer.close().expect("Could not close writer");
5834
5835 (Bytes::from(buf), metadata)
5836 }
5837}