1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub struct ArrowReaderBuilder<T> {
108 pub(crate) input: T,
116
117 pub(crate) metadata: Arc<ParquetMetaData>,
118
119 pub(crate) schema: SchemaRef,
120
121 pub(crate) fields: Option<Arc<ParquetField>>,
122
123 pub(crate) batch_size: usize,
124
125 pub(crate) row_groups: Option<Vec<usize>>,
126
127 pub(crate) projection: ProjectionMask,
128
129 pub(crate) filter: Option<RowFilter>,
130
131 pub(crate) selection: Option<RowSelection>,
132
133 pub(crate) row_selection_policy: RowSelectionPolicy,
134
135 pub(crate) limit: Option<usize>,
136
137 pub(crate) offset: Option<usize>,
138
139 pub(crate) metrics: ArrowReaderMetrics,
140
141 pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("ArrowReaderBuilder<T>")
147 .field("input", &self.input)
148 .field("metadata", &self.metadata)
149 .field("schema", &self.schema)
150 .field("fields", &self.fields)
151 .field("batch_size", &self.batch_size)
152 .field("row_groups", &self.row_groups)
153 .field("projection", &self.projection)
154 .field("filter", &self.filter)
155 .field("selection", &self.selection)
156 .field("row_selection_policy", &self.row_selection_policy)
157 .field("limit", &self.limit)
158 .field("offset", &self.offset)
159 .field("metrics", &self.metrics)
160 .finish()
161 }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166 Self {
167 input,
168 metadata: metadata.metadata,
169 schema: metadata.schema,
170 fields: metadata.fields,
171 batch_size: 1024,
172 row_groups: None,
173 projection: ProjectionMask::all(),
174 filter: None,
175 selection: None,
176 row_selection_policy: RowSelectionPolicy::default(),
177 limit: None,
178 offset: None,
179 metrics: ArrowReaderMetrics::Disabled,
180 max_predicate_cache_size: 100 * 1024 * 1024, }
182 }
183
184 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186 &self.metadata
187 }
188
189 pub fn parquet_schema(&self) -> &SchemaDescriptor {
191 self.metadata.file_metadata().schema_descr()
192 }
193
194 pub fn schema(&self) -> &SchemaRef {
196 &self.schema
197 }
198
199 pub fn with_batch_size(self, batch_size: usize) -> Self {
202 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204 Self { batch_size, ..self }
205 }
206
207 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211 Self {
212 row_groups: Some(row_groups),
213 ..self
214 }
215 }
216
217 pub fn with_projection(self, mask: ProjectionMask) -> Self {
219 Self {
220 projection: mask,
221 ..self
222 }
223 }
224
225 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229 Self {
230 row_selection_policy: policy,
231 ..self
232 }
233 }
234
235 pub fn with_row_selection(self, selection: RowSelection) -> Self {
295 Self {
296 selection: Some(selection),
297 ..self
298 }
299 }
300
301 pub fn with_row_filter(self, filter: RowFilter) -> Self {
341 Self {
342 filter: Some(filter),
343 ..self
344 }
345 }
346
347 pub fn with_limit(self, limit: usize) -> Self {
355 Self {
356 limit: Some(limit),
357 ..self
358 }
359 }
360
361 pub fn with_offset(self, offset: usize) -> Self {
369 Self {
370 offset: Some(offset),
371 ..self
372 }
373 }
374
375 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410 Self { metrics, ..self }
411 }
412
413 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428 Self {
429 max_predicate_cache_size,
430 ..self
431 }
432 }
433}
434
435#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451 skip_arrow_metadata: bool,
453 supplied_schema: Option<SchemaRef>,
458
459 pub(crate) column_index: PageIndexPolicy,
460 pub(crate) offset_index: PageIndexPolicy,
461
462 metadata_options: ParquetMetaDataOptions,
464 #[cfg(feature = "encryption")]
466 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468 virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472 pub fn new() -> Self {
474 Self::default()
475 }
476
477 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484 Self {
485 skip_arrow_metadata,
486 ..self
487 }
488 }
489
490 pub fn with_schema(self, schema: SchemaRef) -> Self {
599 Self {
600 supplied_schema: Some(schema),
601 skip_arrow_metadata: true,
602 ..self
603 }
604 }
605
606 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607 pub fn with_page_index(self, page_index: bool) -> Self {
620 self.with_page_index_policy(PageIndexPolicy::from(page_index))
621 }
622
623 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631 self.with_column_index_policy(policy)
632 .with_offset_index_policy(policy)
633 }
634
635 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642 self.column_index = policy;
643 self
644 }
645
646 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653 self.offset_index = policy;
654 self
655 }
656
657 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663 self.metadata_options.set_schema(schema);
664 self
665 }
666
667 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678 self.metadata_options.set_encoding_stats_as_mask(val);
679 self
680 }
681
682 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687 self.metadata_options.set_encoding_stats_policy(policy);
688 self
689 }
690
691 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696 self.metadata_options.set_column_stats_policy(policy);
697 self
698 }
699
700 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705 self.metadata_options.set_size_stats_policy(policy);
706 self
707 }
708
709 #[cfg(feature = "encryption")]
713 pub fn with_file_decryption_properties(
714 self,
715 file_decryption_properties: Arc<FileDecryptionProperties>,
716 ) -> Self {
717 Self {
718 file_decryption_properties: Some(file_decryption_properties),
719 ..self
720 }
721 }
722
723 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776 for field in &virtual_columns {
778 if !is_virtual_column(field) {
779 return Err(ParquetError::General(format!(
780 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781 field.name()
782 )));
783 }
784 }
785 Ok(Self {
786 virtual_columns,
787 ..self
788 })
789 }
790
791 #[deprecated(
792 since = "57.2.0",
793 note = "Use `column_index_policy` or `offset_index_policy` instead"
794 )]
795 pub fn page_index(&self) -> bool {
802 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803 }
804
805 pub fn offset_index_policy(&self) -> PageIndexPolicy {
810 self.offset_index
811 }
812
813 pub fn column_index_policy(&self) -> PageIndexPolicy {
818 self.column_index
819 }
820
821 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823 &self.metadata_options
824 }
825
826 #[cfg(feature = "encryption")]
831 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832 self.file_decryption_properties.as_ref()
833 }
834}
835
836#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852 pub(crate) metadata: Arc<ParquetMetaData>,
854 pub(crate) schema: SchemaRef,
856 pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873 let metadata = ParquetMetaDataReader::new()
874 .with_column_index_policy(options.column_index)
875 .with_offset_index_policy(options.offset_index)
876 .with_metadata_options(Some(options.metadata_options.clone()));
877 #[cfg(feature = "encryption")]
878 let metadata = metadata.with_decryption_properties(
879 options.file_decryption_properties.as_ref().map(Arc::clone),
880 );
881 let metadata = metadata.parse_and_finish(reader)?;
882 Self::try_new(Arc::new(metadata), options)
883 }
884
885 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893 match options.supplied_schema {
894 Some(supplied_schema) => Self::with_supplied_schema(
895 metadata,
896 supplied_schema.clone(),
897 &options.virtual_columns,
898 ),
899 None => {
900 let kv_metadata = match options.skip_arrow_metadata {
901 true => None,
902 false => metadata.file_metadata().key_value_metadata(),
903 };
904
905 let (schema, fields) = parquet_to_arrow_schema_and_fields(
906 metadata.file_metadata().schema_descr(),
907 ProjectionMask::all(),
908 kv_metadata,
909 &options.virtual_columns,
910 )?;
911
912 Ok(Self {
913 metadata,
914 schema: Arc::new(schema),
915 fields: fields.map(Arc::new),
916 })
917 }
918 }
919 }
920
921 fn with_supplied_schema(
922 metadata: Arc<ParquetMetaData>,
923 supplied_schema: SchemaRef,
924 virtual_columns: &[FieldRef],
925 ) -> Result<Self> {
926 let parquet_schema = metadata.file_metadata().schema_descr();
927 let field_levels = parquet_to_arrow_field_levels_with_virtual(
928 parquet_schema,
929 ProjectionMask::all(),
930 Some(supplied_schema.fields()),
931 virtual_columns,
932 )?;
933 let fields = field_levels.fields;
934 let inferred_len = fields.len();
935 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936 if inferred_len != supplied_len {
940 return Err(arrow_err!(format!(
941 "Incompatible supplied Arrow schema: expected {} columns received {}",
942 inferred_len, supplied_len
943 )));
944 }
945
946 let mut errors = Vec::new();
947
948 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950 for (field1, field2) in field_iter {
951 if field1.data_type() != field2.data_type() {
952 errors.push(format!(
953 "data type mismatch for field {}: requested {} but found {}",
954 field1.name(),
955 field1.data_type(),
956 field2.data_type()
957 ));
958 }
959 if field1.is_nullable() != field2.is_nullable() {
960 errors.push(format!(
961 "nullability mismatch for field {}: expected {:?} but found {:?}",
962 field1.name(),
963 field1.is_nullable(),
964 field2.is_nullable()
965 ));
966 }
967 if field1.metadata() != field2.metadata() {
968 errors.push(format!(
969 "metadata mismatch for field {}: expected {:?} but found {:?}",
970 field1.name(),
971 field1.metadata(),
972 field2.metadata()
973 ));
974 }
975 }
976
977 if !errors.is_empty() {
978 let message = errors.join(", ");
979 return Err(ParquetError::ArrowError(format!(
980 "Incompatible supplied Arrow schema: {message}",
981 )));
982 }
983
984 Ok(Self {
985 metadata,
986 schema: supplied_schema,
987 fields: field_levels.levels.map(Arc::new),
988 })
989 }
990
991 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993 &self.metadata
994 }
995
996 pub fn parquet_schema(&self) -> &SchemaDescriptor {
998 self.metadata.file_metadata().schema_descr()
999 }
1000
1001 pub fn schema(&self) -> &SchemaRef {
1003 &self.schema
1004 }
1005}
1006
1007#[doc(hidden)]
1008pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013 f.debug_tuple("SyncReader").field(&self.0).finish()
1014 }
1015}
1016
1017pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026 pub fn try_new(reader: T) -> Result<Self> {
1057 Self::try_new_with_options(reader, Default::default())
1058 }
1059
1060 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066 Ok(Self::new_with_metadata(reader, metadata))
1067 }
1068
1069 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108 Self::new_builder(SyncReader(input), metadata)
1109 }
1110
1111 pub fn get_row_group_column_bloom_filter(
1117 &self,
1118 row_group_idx: usize,
1119 column_idx: usize,
1120 ) -> Result<Option<Sbbf>> {
1121 let metadata = self.metadata.row_group(row_group_idx);
1122 let column_metadata = metadata.column(column_idx);
1123
1124 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125 offset
1126 .try_into()
1127 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128 } else {
1129 return Ok(None);
1130 };
1131
1132 let buffer = match column_metadata.bloom_filter_length() {
1133 Some(length) => self.input.0.get_bytes(offset, length as usize),
1134 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135 }?;
1136
1137 let (header, bitset_offset) =
1138 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140 match header.algorithm {
1141 BloomFilterAlgorithm::BLOCK => {
1142 }
1144 }
1145 match header.compression {
1146 BloomFilterCompression::UNCOMPRESSED => {
1147 }
1149 }
1150 match header.hash {
1151 BloomFilterHash::XXHASH => {
1152 }
1154 }
1155
1156 let bitset = match column_metadata.bloom_filter_length() {
1157 Some(_) => buffer.slice(
1158 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159 - TryInto::<usize>::try_into(offset).unwrap())..,
1160 ),
1161 None => {
1162 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163 ParquetError::General("Bloom filter length is invalid".to_string())
1164 })?;
1165 self.input.0.get_bytes(bitset_offset, bitset_length)?
1166 }
1167 };
1168 Ok(Some(Sbbf::new(&bitset)))
1169 }
1170
1171 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175 let Self {
1176 input,
1177 metadata,
1178 schema: _,
1179 fields,
1180 batch_size,
1181 row_groups,
1182 projection,
1183 mut filter,
1184 selection,
1185 row_selection_policy,
1186 limit,
1187 offset,
1188 metrics,
1189 max_predicate_cache_size: _,
1191 } = self;
1192
1193 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198 let reader = ReaderRowGroups {
1199 reader: Arc::new(input.0),
1200 metadata,
1201 row_groups,
1202 };
1203
1204 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205 .with_selection(selection)
1206 .with_row_selection_policy(row_selection_policy);
1207
1208 if let Some(filter) = filter.as_mut() {
1210 for predicate in filter.predicates.iter_mut() {
1211 if !plan_builder.selects_any() {
1213 break;
1214 }
1215
1216 let mut cache_projection = predicate.projection().clone();
1217 cache_projection.intersect(&projection);
1218
1219 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1220 .with_parquet_metadata(&reader.metadata)
1221 .build_array_reader(fields.as_deref(), predicate.projection())?;
1222
1223 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1224 }
1225 }
1226
1227 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1228 .with_parquet_metadata(&reader.metadata)
1229 .build_array_reader(fields.as_deref(), &projection)?;
1230
1231 let read_plan = plan_builder
1232 .limited(reader.num_rows())
1233 .with_offset(offset)
1234 .with_limit(limit)
1235 .build_limited()
1236 .build();
1237
1238 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1239 }
1240}
1241
1242struct ReaderRowGroups<T: ChunkReader> {
1243 reader: Arc<T>,
1244
1245 metadata: Arc<ParquetMetaData>,
1246 row_groups: Vec<usize>,
1248}
1249
1250impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1251 fn num_rows(&self) -> usize {
1252 let meta = self.metadata.row_groups();
1253 self.row_groups
1254 .iter()
1255 .map(|x| meta[*x].num_rows() as usize)
1256 .sum()
1257 }
1258
1259 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1260 Ok(Box::new(ReaderPageIterator {
1261 column_idx: i,
1262 reader: self.reader.clone(),
1263 metadata: self.metadata.clone(),
1264 row_groups: self.row_groups.clone().into_iter(),
1265 }))
1266 }
1267
1268 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1269 Box::new(
1270 self.row_groups
1271 .iter()
1272 .map(move |i| self.metadata.row_group(*i)),
1273 )
1274 }
1275
1276 fn metadata(&self) -> &ParquetMetaData {
1277 self.metadata.as_ref()
1278 }
1279}
1280
1281struct ReaderPageIterator<T: ChunkReader> {
1282 reader: Arc<T>,
1283 column_idx: usize,
1284 row_groups: std::vec::IntoIter<usize>,
1285 metadata: Arc<ParquetMetaData>,
1286}
1287
1288impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1289 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1291 let rg = self.metadata.row_group(rg_idx);
1292 let column_chunk_metadata = rg.column(self.column_idx);
1293 let offset_index = self.metadata.offset_index();
1294 let page_locations = offset_index
1297 .filter(|i| !i[rg_idx].is_empty())
1298 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1299 let total_rows = rg.num_rows() as usize;
1300 let reader = self.reader.clone();
1301
1302 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1303 .add_crypto_context(
1304 rg_idx,
1305 self.column_idx,
1306 self.metadata.as_ref(),
1307 column_chunk_metadata,
1308 )
1309 }
1310}
1311
1312impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1313 type Item = Result<Box<dyn PageReader>>;
1314
1315 fn next(&mut self) -> Option<Self::Item> {
1316 let rg_idx = self.row_groups.next()?;
1317 let page_reader = self
1318 .next_page_reader(rg_idx)
1319 .map(|page_reader| Box::new(page_reader) as _);
1320 Some(page_reader)
1321 }
1322}
1323
1324impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1325
1326pub struct ParquetRecordBatchReader {
1337 array_reader: Box<dyn ArrayReader>,
1338 schema: SchemaRef,
1339 read_plan: ReadPlan,
1340}
1341
1342impl Debug for ParquetRecordBatchReader {
1343 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1344 f.debug_struct("ParquetRecordBatchReader")
1345 .field("array_reader", &"...")
1346 .field("schema", &self.schema)
1347 .field("read_plan", &self.read_plan)
1348 .finish()
1349 }
1350}
1351
1352impl Iterator for ParquetRecordBatchReader {
1353 type Item = Result<RecordBatch, ArrowError>;
1354
1355 fn next(&mut self) -> Option<Self::Item> {
1356 self.next_inner()
1357 .map_err(|arrow_err| arrow_err.into())
1358 .transpose()
1359 }
1360}
1361
1362impl ParquetRecordBatchReader {
1363 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1369 let mut read_records = 0;
1370 let batch_size = self.batch_size();
1371 if batch_size == 0 {
1372 return Ok(None);
1373 }
1374 match self.read_plan.row_selection_cursor_mut() {
1375 RowSelectionCursor::Mask(mask_cursor) => {
1376 while !mask_cursor.is_empty() {
1379 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1380 return Ok(None);
1381 };
1382
1383 if mask_chunk.initial_skip > 0 {
1384 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1385 if skipped != mask_chunk.initial_skip {
1386 return Err(general_err!(
1387 "failed to skip rows, expected {}, got {}",
1388 mask_chunk.initial_skip,
1389 skipped
1390 ));
1391 }
1392 }
1393
1394 if mask_chunk.chunk_rows == 0 {
1395 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1396 return Ok(None);
1397 }
1398 continue;
1399 }
1400
1401 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1402
1403 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1404 if read == 0 {
1405 return Err(general_err!(
1406 "reached end of column while expecting {} rows",
1407 mask_chunk.chunk_rows
1408 ));
1409 }
1410 if read != mask_chunk.chunk_rows {
1411 return Err(general_err!(
1412 "insufficient rows read from array reader - expected {}, got {}",
1413 mask_chunk.chunk_rows,
1414 read
1415 ));
1416 }
1417
1418 let array = self.array_reader.consume_batch()?;
1419 let struct_array = array.as_struct_opt().ok_or_else(|| {
1422 ArrowError::ParquetError(
1423 "Struct array reader should return struct array".to_string(),
1424 )
1425 })?;
1426
1427 let filtered_batch =
1428 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1429
1430 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1431 return Err(general_err!(
1432 "filtered rows mismatch selection - expected {}, got {}",
1433 mask_chunk.selected_rows,
1434 filtered_batch.num_rows()
1435 ));
1436 }
1437
1438 if filtered_batch.num_rows() == 0 {
1439 continue;
1440 }
1441
1442 return Ok(Some(filtered_batch));
1443 }
1444 }
1445 RowSelectionCursor::Selectors(selectors_cursor) => {
1446 while read_records < batch_size && !selectors_cursor.is_empty() {
1447 let front = selectors_cursor.next_selector();
1448 if front.skip {
1449 let skipped = self.array_reader.skip_records(front.row_count)?;
1450
1451 if skipped != front.row_count {
1452 return Err(general_err!(
1453 "failed to skip rows, expected {}, got {}",
1454 front.row_count,
1455 skipped
1456 ));
1457 }
1458 continue;
1459 }
1460
1461 if front.row_count == 0 {
1464 continue;
1465 }
1466
1467 let need_read = batch_size - read_records;
1469 let to_read = match front.row_count.checked_sub(need_read) {
1470 Some(remaining) if remaining != 0 => {
1471 selectors_cursor.return_selector(RowSelector::select(remaining));
1474 need_read
1475 }
1476 _ => front.row_count,
1477 };
1478 match self.array_reader.read_records(to_read)? {
1479 0 => break,
1480 rec => read_records += rec,
1481 };
1482 }
1483 }
1484 RowSelectionCursor::All => {
1485 self.array_reader.read_records(batch_size)?;
1486 }
1487 };
1488
1489 let array = self.array_reader.consume_batch()?;
1490 let struct_array = array.as_struct_opt().ok_or_else(|| {
1491 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1492 })?;
1493
1494 Ok(if struct_array.len() > 0 {
1495 Some(RecordBatch::from(struct_array))
1496 } else {
1497 None
1498 })
1499 }
1500}
1501
1502impl RecordBatchReader for ParquetRecordBatchReader {
1503 fn schema(&self) -> SchemaRef {
1508 self.schema.clone()
1509 }
1510}
1511
1512impl ParquetRecordBatchReader {
1513 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1517 ParquetRecordBatchReaderBuilder::try_new(reader)?
1518 .with_batch_size(batch_size)
1519 .build()
1520 }
1521
1522 pub fn try_new_with_row_groups(
1527 levels: &FieldLevels,
1528 row_groups: &dyn RowGroups,
1529 batch_size: usize,
1530 selection: Option<RowSelection>,
1531 ) -> Result<Self> {
1532 let metrics = ArrowReaderMetrics::disabled();
1534 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1535 .with_parquet_metadata(row_groups.metadata())
1536 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1537
1538 let read_plan = ReadPlanBuilder::new(batch_size)
1539 .with_selection(selection)
1540 .build();
1541
1542 Ok(Self {
1543 array_reader,
1544 schema: Arc::new(Schema::new(levels.fields.clone())),
1545 read_plan,
1546 })
1547 }
1548
1549 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1553 let schema = match array_reader.get_data_type() {
1554 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1555 _ => unreachable!("Struct array reader's data type is not struct!"),
1556 };
1557
1558 Self {
1559 array_reader,
1560 schema: Arc::new(schema),
1561 read_plan,
1562 }
1563 }
1564
1565 #[inline(always)]
1566 pub(crate) fn batch_size(&self) -> usize {
1567 self.read_plan.batch_size()
1568 }
1569}
1570
1571#[cfg(test)]
1572pub(crate) mod tests {
1573 use std::cmp::min;
1574 use std::collections::{HashMap, VecDeque};
1575 use std::fmt::Formatter;
1576 use std::fs::File;
1577 use std::io::Seek;
1578 use std::path::PathBuf;
1579 use std::sync::Arc;
1580
1581 use rand::rngs::StdRng;
1582 use rand::{Rng, RngCore, SeedableRng, random, rng};
1583 use tempfile::tempfile;
1584
1585 use crate::arrow::arrow_reader::{
1586 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1587 ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1588 RowSelectionPolicy, RowSelector,
1589 };
1590 use crate::arrow::schema::{
1591 add_encoded_arrow_schema_to_metadata,
1592 virtual_type::{RowGroupIndex, RowNumber},
1593 };
1594 use crate::arrow::{ArrowWriter, ProjectionMask};
1595 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1596 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1597 use crate::data_type::{
1598 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1599 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1600 };
1601 use crate::errors::Result;
1602 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1603 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1604 use crate::file::writer::SerializedFileWriter;
1605 use crate::schema::parser::parse_message_type;
1606 use crate::schema::types::{Type, TypePtr};
1607 use crate::util::test_common::rand_gen::RandGen;
1608 use arrow::compute::kernels::cmp::eq;
1609 use arrow::compute::or;
1610 use arrow_array::builder::*;
1611 use arrow_array::cast::AsArray;
1612 use arrow_array::types::{
1613 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1614 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1615 Time64MicrosecondType,
1616 };
1617 use arrow_array::*;
1618 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1619 use arrow_data::{ArrayData, ArrayDataBuilder};
1620 use arrow_schema::{
1621 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1622 };
1623 use arrow_select::concat::concat_batches;
1624 use bytes::Bytes;
1625 use half::f16;
1626 use num_traits::PrimInt;
1627
1628 #[test]
1629 fn test_arrow_reader_all_columns() {
1630 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1631
1632 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1633 let original_schema = Arc::clone(builder.schema());
1634 let reader = builder.build().unwrap();
1635
1636 assert_eq!(original_schema.fields(), reader.schema().fields());
1638 }
1639
1640 #[test]
1641 fn test_reuse_schema() {
1642 let file = get_test_file("parquet/alltypes-java.parquet");
1643
1644 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1645 let expected = builder.metadata;
1646 let schema = expected.file_metadata().schema_descr_ptr();
1647
1648 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1649 let builder =
1650 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1651
1652 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1654 }
1655
1656 #[test]
1657 fn test_page_encoding_stats_mask() {
1658 let testdata = arrow::util::test_util::parquet_test_data();
1659 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1660 let file = File::open(path).unwrap();
1661
1662 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1663 let builder =
1664 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1665
1666 let row_group_metadata = builder.metadata.row_group(0);
1667
1668 let page_encoding_stats = row_group_metadata
1670 .column(0)
1671 .page_encoding_stats_mask()
1672 .unwrap();
1673 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1674 let page_encoding_stats = row_group_metadata
1675 .column(2)
1676 .page_encoding_stats_mask()
1677 .unwrap();
1678 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1679 }
1680
1681 #[test]
1682 fn test_stats_stats_skipped() {
1683 let testdata = arrow::util::test_util::parquet_test_data();
1684 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1685 let file = File::open(path).unwrap();
1686
1687 let arrow_options = ArrowReaderOptions::new()
1689 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1690 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1691 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1692 file.try_clone().unwrap(),
1693 arrow_options,
1694 )
1695 .unwrap();
1696
1697 let row_group_metadata = builder.metadata.row_group(0);
1698 for column in row_group_metadata.columns() {
1699 assert!(column.page_encoding_stats().is_none());
1700 assert!(column.page_encoding_stats_mask().is_none());
1701 assert!(column.statistics().is_none());
1702 }
1703
1704 let arrow_options = ArrowReaderOptions::new()
1706 .with_encoding_stats_as_mask(true)
1707 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1708 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1709 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1710 file.try_clone().unwrap(),
1711 arrow_options,
1712 )
1713 .unwrap();
1714
1715 let row_group_metadata = builder.metadata.row_group(0);
1716 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1717 assert!(column.page_encoding_stats().is_none());
1718 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1719 assert_eq!(column.statistics().is_some(), idx == 0);
1720 }
1721 }
1722
1723 #[test]
1724 fn test_size_stats_stats_skipped() {
1725 let testdata = arrow::util::test_util::parquet_test_data();
1726 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1727 let file = File::open(path).unwrap();
1728
1729 let arrow_options =
1731 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1732 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1733 file.try_clone().unwrap(),
1734 arrow_options,
1735 )
1736 .unwrap();
1737
1738 let row_group_metadata = builder.metadata.row_group(0);
1739 for column in row_group_metadata.columns() {
1740 assert!(column.repetition_level_histogram().is_none());
1741 assert!(column.definition_level_histogram().is_none());
1742 assert!(column.unencoded_byte_array_data_bytes().is_none());
1743 }
1744
1745 let arrow_options = ArrowReaderOptions::new()
1747 .with_encoding_stats_as_mask(true)
1748 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1749 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1750 file.try_clone().unwrap(),
1751 arrow_options,
1752 )
1753 .unwrap();
1754
1755 let row_group_metadata = builder.metadata.row_group(0);
1756 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1757 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1758 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1759 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1760 }
1761 }
1762
1763 #[test]
1764 fn test_arrow_reader_single_column() {
1765 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1766
1767 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1768 let original_schema = Arc::clone(builder.schema());
1769
1770 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1771 let reader = builder.with_projection(mask).build().unwrap();
1772
1773 assert_eq!(1, reader.schema().fields().len());
1775 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1776 }
1777
1778 #[test]
1779 fn test_arrow_reader_single_column_by_name() {
1780 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1781
1782 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1783 let original_schema = Arc::clone(builder.schema());
1784
1785 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1786 let reader = builder.with_projection(mask).build().unwrap();
1787
1788 assert_eq!(1, reader.schema().fields().len());
1790 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1791 }
1792
1793 #[test]
1794 fn test_null_column_reader_test() {
1795 let mut file = tempfile::tempfile().unwrap();
1796
1797 let schema = "
1798 message message {
1799 OPTIONAL INT32 int32;
1800 }
1801 ";
1802 let schema = Arc::new(parse_message_type(schema).unwrap());
1803
1804 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1805 generate_single_column_file_with_data::<Int32Type>(
1806 &[vec![], vec![]],
1807 Some(&def_levels),
1808 file.try_clone().unwrap(), schema,
1810 Some(Field::new("int32", ArrowDataType::Null, true)),
1811 &Default::default(),
1812 )
1813 .unwrap();
1814
1815 file.rewind().unwrap();
1816
1817 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1818 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1819
1820 assert_eq!(batches.len(), 4);
1821 for batch in &batches[0..3] {
1822 assert_eq!(batch.num_rows(), 2);
1823 assert_eq!(batch.num_columns(), 1);
1824 assert_eq!(batch.column(0).null_count(), 2);
1825 }
1826
1827 assert_eq!(batches[3].num_rows(), 1);
1828 assert_eq!(batches[3].num_columns(), 1);
1829 assert_eq!(batches[3].column(0).null_count(), 1);
1830 }
1831
1832 #[test]
1833 fn test_primitive_single_column_reader_test() {
1834 run_single_column_reader_tests::<BoolType, _, BoolType>(
1835 2,
1836 ConvertedType::NONE,
1837 None,
1838 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1839 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1840 );
1841 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1842 2,
1843 ConvertedType::NONE,
1844 None,
1845 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1846 &[
1847 Encoding::PLAIN,
1848 Encoding::RLE_DICTIONARY,
1849 Encoding::DELTA_BINARY_PACKED,
1850 Encoding::BYTE_STREAM_SPLIT,
1851 ],
1852 );
1853 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1854 2,
1855 ConvertedType::NONE,
1856 None,
1857 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1858 &[
1859 Encoding::PLAIN,
1860 Encoding::RLE_DICTIONARY,
1861 Encoding::DELTA_BINARY_PACKED,
1862 Encoding::BYTE_STREAM_SPLIT,
1863 ],
1864 );
1865 run_single_column_reader_tests::<FloatType, _, FloatType>(
1866 2,
1867 ConvertedType::NONE,
1868 None,
1869 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1870 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1871 );
1872 }
1873
1874 #[test]
1875 fn test_unsigned_primitive_single_column_reader_test() {
1876 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1877 2,
1878 ConvertedType::UINT_32,
1879 Some(ArrowDataType::UInt32),
1880 |vals| {
1881 Arc::new(UInt32Array::from_iter(
1882 vals.iter().map(|x| x.map(|x| x as u32)),
1883 ))
1884 },
1885 &[
1886 Encoding::PLAIN,
1887 Encoding::RLE_DICTIONARY,
1888 Encoding::DELTA_BINARY_PACKED,
1889 ],
1890 );
1891 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1892 2,
1893 ConvertedType::UINT_64,
1894 Some(ArrowDataType::UInt64),
1895 |vals| {
1896 Arc::new(UInt64Array::from_iter(
1897 vals.iter().map(|x| x.map(|x| x as u64)),
1898 ))
1899 },
1900 &[
1901 Encoding::PLAIN,
1902 Encoding::RLE_DICTIONARY,
1903 Encoding::DELTA_BINARY_PACKED,
1904 ],
1905 );
1906 }
1907
1908 #[test]
1909 fn test_unsigned_roundtrip() {
1910 let schema = Arc::new(Schema::new(vec![
1911 Field::new("uint32", ArrowDataType::UInt32, true),
1912 Field::new("uint64", ArrowDataType::UInt64, true),
1913 ]));
1914
1915 let mut buf = Vec::with_capacity(1024);
1916 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1917
1918 let original = RecordBatch::try_new(
1919 schema,
1920 vec![
1921 Arc::new(UInt32Array::from_iter_values([
1922 0,
1923 i32::MAX as u32,
1924 u32::MAX,
1925 ])),
1926 Arc::new(UInt64Array::from_iter_values([
1927 0,
1928 i64::MAX as u64,
1929 u64::MAX,
1930 ])),
1931 ],
1932 )
1933 .unwrap();
1934
1935 writer.write(&original).unwrap();
1936 writer.close().unwrap();
1937
1938 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1939 let ret = reader.next().unwrap().unwrap();
1940 assert_eq!(ret, original);
1941
1942 ret.column(0)
1944 .as_any()
1945 .downcast_ref::<UInt32Array>()
1946 .unwrap();
1947
1948 ret.column(1)
1949 .as_any()
1950 .downcast_ref::<UInt64Array>()
1951 .unwrap();
1952 }
1953
1954 #[test]
1955 fn test_float16_roundtrip() -> Result<()> {
1956 let schema = Arc::new(Schema::new(vec![
1957 Field::new("float16", ArrowDataType::Float16, false),
1958 Field::new("float16-nullable", ArrowDataType::Float16, true),
1959 ]));
1960
1961 let mut buf = Vec::with_capacity(1024);
1962 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1963
1964 let original = RecordBatch::try_new(
1965 schema,
1966 vec![
1967 Arc::new(Float16Array::from_iter_values([
1968 f16::EPSILON,
1969 f16::MIN,
1970 f16::MAX,
1971 f16::NAN,
1972 f16::INFINITY,
1973 f16::NEG_INFINITY,
1974 f16::ONE,
1975 f16::NEG_ONE,
1976 f16::ZERO,
1977 f16::NEG_ZERO,
1978 f16::E,
1979 f16::PI,
1980 f16::FRAC_1_PI,
1981 ])),
1982 Arc::new(Float16Array::from(vec![
1983 None,
1984 None,
1985 None,
1986 Some(f16::NAN),
1987 Some(f16::INFINITY),
1988 Some(f16::NEG_INFINITY),
1989 None,
1990 None,
1991 None,
1992 None,
1993 None,
1994 None,
1995 Some(f16::FRAC_1_PI),
1996 ])),
1997 ],
1998 )?;
1999
2000 writer.write(&original)?;
2001 writer.close()?;
2002
2003 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2004 let ret = reader.next().unwrap()?;
2005 assert_eq!(ret, original);
2006
2007 ret.column(0).as_primitive::<Float16Type>();
2009 ret.column(1).as_primitive::<Float16Type>();
2010
2011 Ok(())
2012 }
2013
2014 #[test]
2015 fn test_time_utc_roundtrip() -> Result<()> {
2016 let schema = Arc::new(Schema::new(vec![
2017 Field::new(
2018 "time_millis",
2019 ArrowDataType::Time32(TimeUnit::Millisecond),
2020 true,
2021 )
2022 .with_metadata(HashMap::from_iter(vec![(
2023 "adjusted_to_utc".to_string(),
2024 "".to_string(),
2025 )])),
2026 Field::new(
2027 "time_micros",
2028 ArrowDataType::Time64(TimeUnit::Microsecond),
2029 true,
2030 )
2031 .with_metadata(HashMap::from_iter(vec![(
2032 "adjusted_to_utc".to_string(),
2033 "".to_string(),
2034 )])),
2035 ]));
2036
2037 let mut buf = Vec::with_capacity(1024);
2038 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2039
2040 let original = RecordBatch::try_new(
2041 schema,
2042 vec![
2043 Arc::new(Time32MillisecondArray::from(vec![
2044 Some(-1),
2045 Some(0),
2046 Some(86_399_000),
2047 Some(86_400_000),
2048 Some(86_401_000),
2049 None,
2050 ])),
2051 Arc::new(Time64MicrosecondArray::from(vec![
2052 Some(-1),
2053 Some(0),
2054 Some(86_399 * 1_000_000),
2055 Some(86_400 * 1_000_000),
2056 Some(86_401 * 1_000_000),
2057 None,
2058 ])),
2059 ],
2060 )?;
2061
2062 writer.write(&original)?;
2063 writer.close()?;
2064
2065 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2066 let ret = reader.next().unwrap()?;
2067 assert_eq!(ret, original);
2068
2069 ret.column(0).as_primitive::<Time32MillisecondType>();
2071 ret.column(1).as_primitive::<Time64MicrosecondType>();
2072
2073 Ok(())
2074 }
2075
2076 #[test]
2077 fn test_date32_roundtrip() -> Result<()> {
2078 use arrow_array::Date32Array;
2079
2080 let schema = Arc::new(Schema::new(vec![Field::new(
2081 "date32",
2082 ArrowDataType::Date32,
2083 false,
2084 )]));
2085
2086 let mut buf = Vec::with_capacity(1024);
2087
2088 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2089
2090 let original = RecordBatch::try_new(
2091 schema,
2092 vec![Arc::new(Date32Array::from(vec![
2093 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2094 ]))],
2095 )?;
2096
2097 writer.write(&original)?;
2098 writer.close()?;
2099
2100 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2101 let ret = reader.next().unwrap()?;
2102 assert_eq!(ret, original);
2103
2104 ret.column(0).as_primitive::<Date32Type>();
2106
2107 Ok(())
2108 }
2109
2110 #[test]
2111 fn test_date64_roundtrip() -> Result<()> {
2112 use arrow_array::Date64Array;
2113
2114 let schema = Arc::new(Schema::new(vec![
2115 Field::new("small-date64", ArrowDataType::Date64, false),
2116 Field::new("big-date64", ArrowDataType::Date64, false),
2117 Field::new("invalid-date64", ArrowDataType::Date64, false),
2118 ]));
2119
2120 let mut default_buf = Vec::with_capacity(1024);
2121 let mut coerce_buf = Vec::with_capacity(1024);
2122
2123 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2124
2125 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2126 let mut coerce_writer =
2127 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2128
2129 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2130
2131 let original = RecordBatch::try_new(
2132 schema,
2133 vec![
2134 Arc::new(Date64Array::from(vec![
2136 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2137 -1_000 * NUM_MILLISECONDS_IN_DAY,
2138 0,
2139 1_000 * NUM_MILLISECONDS_IN_DAY,
2140 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2141 ])),
2142 Arc::new(Date64Array::from(vec![
2144 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2145 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2146 0,
2147 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2148 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2149 ])),
2150 Arc::new(Date64Array::from(vec![
2152 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2153 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2154 1,
2155 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2156 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2157 ])),
2158 ],
2159 )?;
2160
2161 default_writer.write(&original)?;
2162 coerce_writer.write(&original)?;
2163
2164 default_writer.close()?;
2165 coerce_writer.close()?;
2166
2167 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2168 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2169
2170 let default_ret = default_reader.next().unwrap()?;
2171 let coerce_ret = coerce_reader.next().unwrap()?;
2172
2173 assert_eq!(default_ret, original);
2175
2176 assert_eq!(coerce_ret.column(0), original.column(0));
2178 assert_ne!(coerce_ret.column(1), original.column(1));
2179 assert_ne!(coerce_ret.column(2), original.column(2));
2180
2181 default_ret.column(0).as_primitive::<Date64Type>();
2183 coerce_ret.column(0).as_primitive::<Date64Type>();
2184
2185 Ok(())
2186 }
2187 struct RandFixedLenGen {}
2188
2189 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2190 fn r#gen(len: i32) -> FixedLenByteArray {
2191 let mut v = vec![0u8; len as usize];
2192 rng().fill_bytes(&mut v);
2193 ByteArray::from(v).into()
2194 }
2195 }
2196
2197 #[test]
2198 fn test_fixed_length_binary_column_reader() {
2199 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2200 20,
2201 ConvertedType::NONE,
2202 None,
2203 |vals| {
2204 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2205 for val in vals {
2206 match val {
2207 Some(b) => builder.append_value(b).unwrap(),
2208 None => builder.append_null(),
2209 }
2210 }
2211 Arc::new(builder.finish())
2212 },
2213 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2214 );
2215 }
2216
2217 #[test]
2218 fn test_interval_day_time_column_reader() {
2219 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2220 12,
2221 ConvertedType::INTERVAL,
2222 None,
2223 |vals| {
2224 Arc::new(
2225 vals.iter()
2226 .map(|x| {
2227 x.as_ref().map(|b| IntervalDayTime {
2228 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2229 milliseconds: i32::from_le_bytes(
2230 b.as_ref()[8..12].try_into().unwrap(),
2231 ),
2232 })
2233 })
2234 .collect::<IntervalDayTimeArray>(),
2235 )
2236 },
2237 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2238 );
2239 }
2240
2241 #[test]
2242 fn test_int96_single_column_reader_test() {
2243 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2244
2245 type TypeHintAndConversionFunction =
2246 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2247
2248 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2249 (None, |vals: &[Option<Int96>]| {
2251 Arc::new(TimestampNanosecondArray::from_iter(
2252 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2253 )) as ArrayRef
2254 }),
2255 (
2257 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2258 |vals: &[Option<Int96>]| {
2259 Arc::new(TimestampSecondArray::from_iter(
2260 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2261 )) as ArrayRef
2262 },
2263 ),
2264 (
2265 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2266 |vals: &[Option<Int96>]| {
2267 Arc::new(TimestampMillisecondArray::from_iter(
2268 vals.iter().map(|x| x.map(|x| x.to_millis())),
2269 )) as ArrayRef
2270 },
2271 ),
2272 (
2273 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2274 |vals: &[Option<Int96>]| {
2275 Arc::new(TimestampMicrosecondArray::from_iter(
2276 vals.iter().map(|x| x.map(|x| x.to_micros())),
2277 )) as ArrayRef
2278 },
2279 ),
2280 (
2281 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2282 |vals: &[Option<Int96>]| {
2283 Arc::new(TimestampNanosecondArray::from_iter(
2284 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2285 )) as ArrayRef
2286 },
2287 ),
2288 (
2290 Some(ArrowDataType::Timestamp(
2291 TimeUnit::Second,
2292 Some(Arc::from("-05:00")),
2293 )),
2294 |vals: &[Option<Int96>]| {
2295 Arc::new(
2296 TimestampSecondArray::from_iter(
2297 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2298 )
2299 .with_timezone("-05:00"),
2300 ) as ArrayRef
2301 },
2302 ),
2303 ];
2304
2305 resolutions.iter().for_each(|(arrow_type, converter)| {
2306 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2307 2,
2308 ConvertedType::NONE,
2309 arrow_type.clone(),
2310 converter,
2311 encodings,
2312 );
2313 })
2314 }
2315
2316 #[test]
2317 fn test_int96_from_spark_file_with_provided_schema() {
2318 use arrow_schema::DataType::Timestamp;
2322 let test_data = arrow::util::test_util::parquet_test_data();
2323 let path = format!("{test_data}/int96_from_spark.parquet");
2324 let file = File::open(path).unwrap();
2325
2326 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2327 "a",
2328 Timestamp(TimeUnit::Microsecond, None),
2329 true,
2330 )]));
2331 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2332
2333 let mut record_reader =
2334 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2335 .unwrap()
2336 .build()
2337 .unwrap();
2338
2339 let batch = record_reader.next().unwrap().unwrap();
2340 assert_eq!(batch.num_columns(), 1);
2341 let column = batch.column(0);
2342 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2343
2344 let expected = Arc::new(Int64Array::from(vec![
2345 Some(1704141296123456),
2346 Some(1704070800000000),
2347 Some(253402225200000000),
2348 Some(1735599600000000),
2349 None,
2350 Some(9089380393200000000),
2351 ]));
2352
2353 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2358 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2359
2360 assert_eq!(casted_timestamps.len(), expected.len());
2361
2362 casted_timestamps
2363 .iter()
2364 .zip(expected.iter())
2365 .for_each(|(lhs, rhs)| {
2366 assert_eq!(lhs, rhs);
2367 });
2368 }
2369
2370 #[test]
2371 fn test_int96_from_spark_file_without_provided_schema() {
2372 use arrow_schema::DataType::Timestamp;
2376 let test_data = arrow::util::test_util::parquet_test_data();
2377 let path = format!("{test_data}/int96_from_spark.parquet");
2378 let file = File::open(path).unwrap();
2379
2380 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2381 .unwrap()
2382 .build()
2383 .unwrap();
2384
2385 let batch = record_reader.next().unwrap().unwrap();
2386 assert_eq!(batch.num_columns(), 1);
2387 let column = batch.column(0);
2388 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2389
2390 let expected = Arc::new(Int64Array::from(vec![
2391 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2396 Some(-4864435138808946688), ]));
2398
2399 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2404 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2405
2406 assert_eq!(casted_timestamps.len(), expected.len());
2407
2408 casted_timestamps
2409 .iter()
2410 .zip(expected.iter())
2411 .for_each(|(lhs, rhs)| {
2412 assert_eq!(lhs, rhs);
2413 });
2414 }
2415
2416 struct RandUtf8Gen {}
2417
2418 impl RandGen<ByteArrayType> for RandUtf8Gen {
2419 fn r#gen(len: i32) -> ByteArray {
2420 Int32Type::r#gen(len).to_string().as_str().into()
2421 }
2422 }
2423
2424 #[test]
2425 fn test_utf8_single_column_reader_test() {
2426 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2427 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2428 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2429 })))
2430 }
2431
2432 let encodings = &[
2433 Encoding::PLAIN,
2434 Encoding::RLE_DICTIONARY,
2435 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2436 Encoding::DELTA_BYTE_ARRAY,
2437 ];
2438
2439 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2440 2,
2441 ConvertedType::NONE,
2442 None,
2443 |vals| {
2444 Arc::new(BinaryArray::from_iter(
2445 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2446 ))
2447 },
2448 encodings,
2449 );
2450
2451 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2452 2,
2453 ConvertedType::UTF8,
2454 None,
2455 string_converter::<i32>,
2456 encodings,
2457 );
2458
2459 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2460 2,
2461 ConvertedType::UTF8,
2462 Some(ArrowDataType::Utf8),
2463 string_converter::<i32>,
2464 encodings,
2465 );
2466
2467 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2468 2,
2469 ConvertedType::UTF8,
2470 Some(ArrowDataType::LargeUtf8),
2471 string_converter::<i64>,
2472 encodings,
2473 );
2474
2475 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2476 for key in &small_key_types {
2477 for encoding in encodings {
2478 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2479 opts.encoding = *encoding;
2480
2481 let data_type =
2482 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2483
2484 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2486 opts,
2487 2,
2488 ConvertedType::UTF8,
2489 Some(data_type.clone()),
2490 move |vals| {
2491 let vals = string_converter::<i32>(vals);
2492 arrow::compute::cast(&vals, &data_type).unwrap()
2493 },
2494 );
2495 }
2496 }
2497
2498 let key_types = [
2499 ArrowDataType::Int16,
2500 ArrowDataType::UInt16,
2501 ArrowDataType::Int32,
2502 ArrowDataType::UInt32,
2503 ArrowDataType::Int64,
2504 ArrowDataType::UInt64,
2505 ];
2506
2507 for key in &key_types {
2508 let data_type =
2509 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2510
2511 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2512 2,
2513 ConvertedType::UTF8,
2514 Some(data_type.clone()),
2515 move |vals| {
2516 let vals = string_converter::<i32>(vals);
2517 arrow::compute::cast(&vals, &data_type).unwrap()
2518 },
2519 encodings,
2520 );
2521
2522 let data_type = ArrowDataType::Dictionary(
2523 Box::new(key.clone()),
2524 Box::new(ArrowDataType::LargeUtf8),
2525 );
2526
2527 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2528 2,
2529 ConvertedType::UTF8,
2530 Some(data_type.clone()),
2531 move |vals| {
2532 let vals = string_converter::<i64>(vals);
2533 arrow::compute::cast(&vals, &data_type).unwrap()
2534 },
2535 encodings,
2536 );
2537 }
2538 }
2539
2540 #[test]
2541 fn test_decimal_nullable_struct() {
2542 let decimals = Decimal256Array::from_iter_values(
2543 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2544 );
2545
2546 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2547 "decimals",
2548 decimals.data_type().clone(),
2549 false,
2550 )])))
2551 .len(8)
2552 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2553 .child_data(vec![decimals.into_data()])
2554 .build()
2555 .unwrap();
2556
2557 let written =
2558 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2559 .unwrap();
2560
2561 let mut buffer = Vec::with_capacity(1024);
2562 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2563 writer.write(&written).unwrap();
2564 writer.close().unwrap();
2565
2566 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2567 .unwrap()
2568 .collect::<Result<Vec<_>, _>>()
2569 .unwrap();
2570
2571 assert_eq!(&written.slice(0, 3), &read[0]);
2572 assert_eq!(&written.slice(3, 3), &read[1]);
2573 assert_eq!(&written.slice(6, 2), &read[2]);
2574 }
2575
2576 #[test]
2577 fn test_int32_nullable_struct() {
2578 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2579 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2580 "int32",
2581 int32.data_type().clone(),
2582 false,
2583 )])))
2584 .len(8)
2585 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2586 .child_data(vec![int32.into_data()])
2587 .build()
2588 .unwrap();
2589
2590 let written =
2591 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2592 .unwrap();
2593
2594 let mut buffer = Vec::with_capacity(1024);
2595 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2596 writer.write(&written).unwrap();
2597 writer.close().unwrap();
2598
2599 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2600 .unwrap()
2601 .collect::<Result<Vec<_>, _>>()
2602 .unwrap();
2603
2604 assert_eq!(&written.slice(0, 3), &read[0]);
2605 assert_eq!(&written.slice(3, 3), &read[1]);
2606 assert_eq!(&written.slice(6, 2), &read[2]);
2607 }
2608
2609 #[test]
2610 fn test_decimal_list() {
2611 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2612
2613 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2615 decimals.data_type().clone(),
2616 false,
2617 ))))
2618 .len(7)
2619 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2620 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2621 .child_data(vec![decimals.into_data()])
2622 .build()
2623 .unwrap();
2624
2625 let written =
2626 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2627 .unwrap();
2628
2629 let mut buffer = Vec::with_capacity(1024);
2630 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2631 writer.write(&written).unwrap();
2632 writer.close().unwrap();
2633
2634 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2635 .unwrap()
2636 .collect::<Result<Vec<_>, _>>()
2637 .unwrap();
2638
2639 assert_eq!(&written.slice(0, 3), &read[0]);
2640 assert_eq!(&written.slice(3, 3), &read[1]);
2641 assert_eq!(&written.slice(6, 1), &read[2]);
2642 }
2643
2644 #[test]
2645 fn test_read_decimal_file() {
2646 use arrow_array::Decimal128Array;
2647 let testdata = arrow::util::test_util::parquet_test_data();
2648 let file_variants = vec![
2649 ("byte_array", 4),
2650 ("fixed_length", 25),
2651 ("int32", 4),
2652 ("int64", 10),
2653 ];
2654 for (prefix, target_precision) in file_variants {
2655 let path = format!("{testdata}/{prefix}_decimal.parquet");
2656 let file = File::open(path).unwrap();
2657 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2658
2659 let batch = record_reader.next().unwrap().unwrap();
2660 assert_eq!(batch.num_rows(), 24);
2661 let col = batch
2662 .column(0)
2663 .as_any()
2664 .downcast_ref::<Decimal128Array>()
2665 .unwrap();
2666
2667 let expected = 1..25;
2668
2669 assert_eq!(col.precision(), target_precision);
2670 assert_eq!(col.scale(), 2);
2671
2672 for (i, v) in expected.enumerate() {
2673 assert_eq!(col.value(i), v * 100_i128);
2674 }
2675 }
2676 }
2677
2678 #[test]
2679 fn test_read_float16_nonzeros_file() {
2680 use arrow_array::Float16Array;
2681 let testdata = arrow::util::test_util::parquet_test_data();
2682 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2684 let file = File::open(path).unwrap();
2685 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2686
2687 let batch = record_reader.next().unwrap().unwrap();
2688 assert_eq!(batch.num_rows(), 8);
2689 let col = batch
2690 .column(0)
2691 .as_any()
2692 .downcast_ref::<Float16Array>()
2693 .unwrap();
2694
2695 let f16_two = f16::ONE + f16::ONE;
2696
2697 assert_eq!(col.null_count(), 1);
2698 assert!(col.is_null(0));
2699 assert_eq!(col.value(1), f16::ONE);
2700 assert_eq!(col.value(2), -f16_two);
2701 assert!(col.value(3).is_nan());
2702 assert_eq!(col.value(4), f16::ZERO);
2703 assert!(col.value(4).is_sign_positive());
2704 assert_eq!(col.value(5), f16::NEG_ONE);
2705 assert_eq!(col.value(6), f16::NEG_ZERO);
2706 assert!(col.value(6).is_sign_negative());
2707 assert_eq!(col.value(7), f16_two);
2708 }
2709
2710 #[test]
2711 fn test_read_float16_zeros_file() {
2712 use arrow_array::Float16Array;
2713 let testdata = arrow::util::test_util::parquet_test_data();
2714 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2716 let file = File::open(path).unwrap();
2717 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2718
2719 let batch = record_reader.next().unwrap().unwrap();
2720 assert_eq!(batch.num_rows(), 3);
2721 let col = batch
2722 .column(0)
2723 .as_any()
2724 .downcast_ref::<Float16Array>()
2725 .unwrap();
2726
2727 assert_eq!(col.null_count(), 1);
2728 assert!(col.is_null(0));
2729 assert_eq!(col.value(1), f16::ZERO);
2730 assert!(col.value(1).is_sign_positive());
2731 assert!(col.value(2).is_nan());
2732 }
2733
2734 #[test]
2735 fn test_read_float32_float64_byte_stream_split() {
2736 let path = format!(
2737 "{}/byte_stream_split.zstd.parquet",
2738 arrow::util::test_util::parquet_test_data(),
2739 );
2740 let file = File::open(path).unwrap();
2741 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2742
2743 let mut row_count = 0;
2744 for batch in record_reader {
2745 let batch = batch.unwrap();
2746 row_count += batch.num_rows();
2747 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2748 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2749
2750 for &x in f32_col.values() {
2752 assert!(x > -10.0);
2753 assert!(x < 10.0);
2754 }
2755 for &x in f64_col.values() {
2756 assert!(x > -10.0);
2757 assert!(x < 10.0);
2758 }
2759 }
2760 assert_eq!(row_count, 300);
2761 }
2762
2763 #[test]
2764 fn test_read_extended_byte_stream_split() {
2765 let path = format!(
2766 "{}/byte_stream_split_extended.gzip.parquet",
2767 arrow::util::test_util::parquet_test_data(),
2768 );
2769 let file = File::open(path).unwrap();
2770 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2771
2772 let mut row_count = 0;
2773 for batch in record_reader {
2774 let batch = batch.unwrap();
2775 row_count += batch.num_rows();
2776
2777 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2779 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2780 assert_eq!(f16_col.len(), f16_bss.len());
2781 f16_col
2782 .iter()
2783 .zip(f16_bss.iter())
2784 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2785
2786 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2788 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2789 assert_eq!(f32_col.len(), f32_bss.len());
2790 f32_col
2791 .iter()
2792 .zip(f32_bss.iter())
2793 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2794
2795 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2797 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2798 assert_eq!(f64_col.len(), f64_bss.len());
2799 f64_col
2800 .iter()
2801 .zip(f64_bss.iter())
2802 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2803
2804 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2806 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2807 assert_eq!(i32_col.len(), i32_bss.len());
2808 i32_col
2809 .iter()
2810 .zip(i32_bss.iter())
2811 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2812
2813 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2815 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2816 assert_eq!(i64_col.len(), i64_bss.len());
2817 i64_col
2818 .iter()
2819 .zip(i64_bss.iter())
2820 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2821
2822 let flba_col = batch.column(10).as_fixed_size_binary();
2824 let flba_bss = batch.column(11).as_fixed_size_binary();
2825 assert_eq!(flba_col.len(), flba_bss.len());
2826 flba_col
2827 .iter()
2828 .zip(flba_bss.iter())
2829 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2830
2831 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2833 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2834 assert_eq!(dec_col.len(), dec_bss.len());
2835 dec_col
2836 .iter()
2837 .zip(dec_bss.iter())
2838 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2839 }
2840 assert_eq!(row_count, 200);
2841 }
2842
2843 #[test]
2844 fn test_read_incorrect_map_schema_file() {
2845 let testdata = arrow::util::test_util::parquet_test_data();
2846 let path = format!("{testdata}/incorrect_map_schema.parquet");
2848 let file = File::open(path).unwrap();
2849 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2850
2851 let batch = record_reader.next().unwrap().unwrap();
2852 assert_eq!(batch.num_rows(), 1);
2853
2854 let expected_schema = Schema::new(vec![Field::new(
2855 "my_map",
2856 ArrowDataType::Map(
2857 Arc::new(Field::new(
2858 "key_value",
2859 ArrowDataType::Struct(Fields::from(vec![
2860 Field::new("key", ArrowDataType::Utf8, false),
2861 Field::new("value", ArrowDataType::Utf8, true),
2862 ])),
2863 false,
2864 )),
2865 false,
2866 ),
2867 true,
2868 )]);
2869 assert_eq!(batch.schema().as_ref(), &expected_schema);
2870
2871 assert_eq!(batch.num_rows(), 1);
2872 assert_eq!(batch.column(0).null_count(), 0);
2873 assert_eq!(
2874 batch.column(0).as_map().keys().as_ref(),
2875 &StringArray::from(vec!["parent", "name"])
2876 );
2877 assert_eq!(
2878 batch.column(0).as_map().values().as_ref(),
2879 &StringArray::from(vec!["another", "report"])
2880 );
2881 }
2882
2883 #[test]
2884 fn test_read_dict_fixed_size_binary() {
2885 let schema = Arc::new(Schema::new(vec![Field::new(
2886 "a",
2887 ArrowDataType::Dictionary(
2888 Box::new(ArrowDataType::UInt8),
2889 Box::new(ArrowDataType::FixedSizeBinary(8)),
2890 ),
2891 true,
2892 )]));
2893 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2894 let values = FixedSizeBinaryArray::try_from_iter(
2895 vec![
2896 (0u8..8u8).collect::<Vec<u8>>(),
2897 (24u8..32u8).collect::<Vec<u8>>(),
2898 ]
2899 .into_iter(),
2900 )
2901 .unwrap();
2902 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2903 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2904
2905 let mut buffer = Vec::with_capacity(1024);
2906 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2907 writer.write(&batch).unwrap();
2908 writer.close().unwrap();
2909 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2910 .unwrap()
2911 .collect::<Result<Vec<_>, _>>()
2912 .unwrap();
2913
2914 assert_eq!(read.len(), 1);
2915 assert_eq!(&batch, &read[0])
2916 }
2917
2918 #[test]
2919 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2920 let struct_fields = Fields::from(vec![
2927 Field::new(
2928 "city",
2929 ArrowDataType::Dictionary(
2930 Box::new(ArrowDataType::UInt8),
2931 Box::new(ArrowDataType::Utf8),
2932 ),
2933 true,
2934 ),
2935 Field::new("name", ArrowDataType::Utf8, true),
2936 ]);
2937 let schema = Arc::new(Schema::new(vec![Field::new(
2938 "items",
2939 ArrowDataType::Struct(struct_fields.clone()),
2940 true,
2941 )]));
2942
2943 let items_arr = StructArray::new(
2944 struct_fields,
2945 vec![
2946 Arc::new(DictionaryArray::new(
2947 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2948 Arc::new(StringArray::from_iter_values(vec![
2949 "quebec",
2950 "fredericton",
2951 "halifax",
2952 ])),
2953 )),
2954 Arc::new(StringArray::from_iter_values(vec![
2955 "albert", "terry", "lance", "", "tim",
2956 ])),
2957 ],
2958 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2959 );
2960
2961 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2962 let mut buffer = Vec::with_capacity(1024);
2963 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2964 writer.write(&batch).unwrap();
2965 writer.close().unwrap();
2966 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2967 .unwrap()
2968 .collect::<Result<Vec<_>, _>>()
2969 .unwrap();
2970
2971 assert_eq!(read.len(), 1);
2972 assert_eq!(&batch, &read[0])
2973 }
2974
2975 #[derive(Clone)]
2977 struct TestOptions {
2978 num_row_groups: usize,
2981 num_rows: usize,
2983 record_batch_size: usize,
2985 null_percent: Option<usize>,
2987 write_batch_size: usize,
2992 max_data_page_size: usize,
2994 max_dict_page_size: usize,
2996 writer_version: WriterVersion,
2998 enabled_statistics: EnabledStatistics,
3000 encoding: Encoding,
3002 row_selections: Option<(RowSelection, usize)>,
3004 row_filter: Option<Vec<bool>>,
3006 limit: Option<usize>,
3008 offset: Option<usize>,
3010 }
3011
3012 impl std::fmt::Debug for TestOptions {
3014 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3015 f.debug_struct("TestOptions")
3016 .field("num_row_groups", &self.num_row_groups)
3017 .field("num_rows", &self.num_rows)
3018 .field("record_batch_size", &self.record_batch_size)
3019 .field("null_percent", &self.null_percent)
3020 .field("write_batch_size", &self.write_batch_size)
3021 .field("max_data_page_size", &self.max_data_page_size)
3022 .field("max_dict_page_size", &self.max_dict_page_size)
3023 .field("writer_version", &self.writer_version)
3024 .field("enabled_statistics", &self.enabled_statistics)
3025 .field("encoding", &self.encoding)
3026 .field("row_selections", &self.row_selections.is_some())
3027 .field("row_filter", &self.row_filter.is_some())
3028 .field("limit", &self.limit)
3029 .field("offset", &self.offset)
3030 .finish()
3031 }
3032 }
3033
3034 impl Default for TestOptions {
3035 fn default() -> Self {
3036 Self {
3037 num_row_groups: 2,
3038 num_rows: 100,
3039 record_batch_size: 15,
3040 null_percent: None,
3041 write_batch_size: 64,
3042 max_data_page_size: 1024 * 1024,
3043 max_dict_page_size: 1024 * 1024,
3044 writer_version: WriterVersion::PARQUET_1_0,
3045 enabled_statistics: EnabledStatistics::Page,
3046 encoding: Encoding::PLAIN,
3047 row_selections: None,
3048 row_filter: None,
3049 limit: None,
3050 offset: None,
3051 }
3052 }
3053 }
3054
3055 impl TestOptions {
3056 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3057 Self {
3058 num_row_groups,
3059 num_rows,
3060 record_batch_size,
3061 ..Default::default()
3062 }
3063 }
3064
3065 fn with_null_percent(self, null_percent: usize) -> Self {
3066 Self {
3067 null_percent: Some(null_percent),
3068 ..self
3069 }
3070 }
3071
3072 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3073 Self {
3074 max_data_page_size,
3075 ..self
3076 }
3077 }
3078
3079 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3080 Self {
3081 max_dict_page_size,
3082 ..self
3083 }
3084 }
3085
3086 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3087 Self {
3088 enabled_statistics,
3089 ..self
3090 }
3091 }
3092
3093 fn with_row_selections(self) -> Self {
3094 assert!(self.row_filter.is_none(), "Must set row selection first");
3095
3096 let mut rng = rng();
3097 let step = rng.random_range(self.record_batch_size..self.num_rows);
3098 let row_selections = create_test_selection(
3099 step,
3100 self.num_row_groups * self.num_rows,
3101 rng.random::<bool>(),
3102 );
3103 Self {
3104 row_selections: Some(row_selections),
3105 ..self
3106 }
3107 }
3108
3109 fn with_row_filter(self) -> Self {
3110 let row_count = match &self.row_selections {
3111 Some((_, count)) => *count,
3112 None => self.num_row_groups * self.num_rows,
3113 };
3114
3115 let mut rng = rng();
3116 Self {
3117 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3118 ..self
3119 }
3120 }
3121
3122 fn with_limit(self, limit: usize) -> Self {
3123 Self {
3124 limit: Some(limit),
3125 ..self
3126 }
3127 }
3128
3129 fn with_offset(self, offset: usize) -> Self {
3130 Self {
3131 offset: Some(offset),
3132 ..self
3133 }
3134 }
3135
3136 fn writer_props(&self) -> WriterProperties {
3137 let builder = WriterProperties::builder()
3138 .set_data_page_size_limit(self.max_data_page_size)
3139 .set_write_batch_size(self.write_batch_size)
3140 .set_writer_version(self.writer_version)
3141 .set_statistics_enabled(self.enabled_statistics);
3142
3143 let builder = match self.encoding {
3144 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3145 .set_dictionary_enabled(true)
3146 .set_dictionary_page_size_limit(self.max_dict_page_size),
3147 _ => builder
3148 .set_dictionary_enabled(false)
3149 .set_encoding(self.encoding),
3150 };
3151
3152 builder.build()
3153 }
3154 }
3155
3156 fn run_single_column_reader_tests<T, F, G>(
3163 rand_max: i32,
3164 converted_type: ConvertedType,
3165 arrow_type: Option<ArrowDataType>,
3166 converter: F,
3167 encodings: &[Encoding],
3168 ) where
3169 T: DataType,
3170 G: RandGen<T>,
3171 F: Fn(&[Option<T::T>]) -> ArrayRef,
3172 {
3173 let all_options = vec![
3174 TestOptions::new(2, 100, 15),
3177 TestOptions::new(3, 25, 5),
3182 TestOptions::new(4, 100, 25),
3186 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3188 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3190 TestOptions::new(2, 256, 127).with_null_percent(0),
3192 TestOptions::new(2, 256, 93).with_null_percent(25),
3194 TestOptions::new(4, 100, 25).with_limit(0),
3196 TestOptions::new(4, 100, 25).with_limit(50),
3198 TestOptions::new(4, 100, 25).with_limit(10),
3200 TestOptions::new(4, 100, 25).with_limit(101),
3202 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3204 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3206 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3208 TestOptions::new(2, 256, 91)
3210 .with_null_percent(25)
3211 .with_enabled_statistics(EnabledStatistics::Chunk),
3212 TestOptions::new(2, 256, 91)
3214 .with_null_percent(25)
3215 .with_enabled_statistics(EnabledStatistics::None),
3216 TestOptions::new(2, 128, 91)
3218 .with_null_percent(100)
3219 .with_enabled_statistics(EnabledStatistics::None),
3220 TestOptions::new(2, 100, 15).with_row_selections(),
3225 TestOptions::new(3, 25, 5).with_row_selections(),
3230 TestOptions::new(4, 100, 25).with_row_selections(),
3234 TestOptions::new(3, 256, 73)
3236 .with_max_data_page_size(128)
3237 .with_row_selections(),
3238 TestOptions::new(3, 256, 57)
3240 .with_max_dict_page_size(128)
3241 .with_row_selections(),
3242 TestOptions::new(2, 256, 127)
3244 .with_null_percent(0)
3245 .with_row_selections(),
3246 TestOptions::new(2, 256, 93)
3248 .with_null_percent(25)
3249 .with_row_selections(),
3250 TestOptions::new(2, 256, 93)
3252 .with_null_percent(25)
3253 .with_row_selections()
3254 .with_limit(10),
3255 TestOptions::new(2, 256, 93)
3257 .with_null_percent(25)
3258 .with_row_selections()
3259 .with_offset(20)
3260 .with_limit(10),
3261 TestOptions::new(4, 100, 25).with_row_filter(),
3265 TestOptions::new(4, 100, 25)
3267 .with_row_selections()
3268 .with_row_filter(),
3269 TestOptions::new(2, 256, 93)
3271 .with_null_percent(25)
3272 .with_max_data_page_size(10)
3273 .with_row_filter(),
3274 TestOptions::new(2, 256, 93)
3276 .with_null_percent(25)
3277 .with_max_data_page_size(10)
3278 .with_row_selections()
3279 .with_row_filter(),
3280 TestOptions::new(2, 256, 93)
3282 .with_enabled_statistics(EnabledStatistics::None)
3283 .with_max_data_page_size(10)
3284 .with_row_selections(),
3285 ];
3286
3287 all_options.into_iter().for_each(|opts| {
3288 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3289 for encoding in encodings {
3290 let opts = TestOptions {
3291 writer_version,
3292 encoding: *encoding,
3293 ..opts.clone()
3294 };
3295
3296 single_column_reader_test::<T, _, G>(
3297 opts,
3298 rand_max,
3299 converted_type,
3300 arrow_type.clone(),
3301 &converter,
3302 )
3303 }
3304 }
3305 });
3306 }
3307
3308 fn single_column_reader_test<T, F, G>(
3312 opts: TestOptions,
3313 rand_max: i32,
3314 converted_type: ConvertedType,
3315 arrow_type: Option<ArrowDataType>,
3316 converter: F,
3317 ) where
3318 T: DataType,
3319 G: RandGen<T>,
3320 F: Fn(&[Option<T::T>]) -> ArrayRef,
3321 {
3322 println!(
3324 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3325 T::get_physical_type(),
3326 converted_type,
3327 arrow_type,
3328 opts
3329 );
3330
3331 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3333 Some(null_percent) => {
3334 let mut rng = rng();
3335
3336 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3337 .map(|_| {
3338 std::iter::from_fn(|| {
3339 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3340 })
3341 .take(opts.num_rows)
3342 .collect()
3343 })
3344 .collect();
3345 (Repetition::OPTIONAL, Some(def_levels))
3346 }
3347 None => (Repetition::REQUIRED, None),
3348 };
3349
3350 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3352 .map(|idx| {
3353 let null_count = match def_levels.as_ref() {
3354 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3355 None => 0,
3356 };
3357 G::gen_vec(rand_max, opts.num_rows - null_count)
3358 })
3359 .collect();
3360
3361 let len = match T::get_physical_type() {
3362 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3363 crate::basic::Type::INT96 => 12,
3364 _ => -1,
3365 };
3366
3367 let fields = vec![Arc::new(
3368 Type::primitive_type_builder("leaf", T::get_physical_type())
3369 .with_repetition(repetition)
3370 .with_converted_type(converted_type)
3371 .with_length(len)
3372 .build()
3373 .unwrap(),
3374 )];
3375
3376 let schema = Arc::new(
3377 Type::group_type_builder("test_schema")
3378 .with_fields(fields)
3379 .build()
3380 .unwrap(),
3381 );
3382
3383 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3384
3385 let mut file = tempfile::tempfile().unwrap();
3386
3387 generate_single_column_file_with_data::<T>(
3388 &values,
3389 def_levels.as_ref(),
3390 file.try_clone().unwrap(), schema,
3392 arrow_field,
3393 &opts,
3394 )
3395 .unwrap();
3396
3397 file.rewind().unwrap();
3398
3399 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3400 opts.enabled_statistics == EnabledStatistics::Page,
3401 ));
3402
3403 let mut builder =
3404 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3405
3406 let expected_data = match opts.row_selections {
3407 Some((selections, row_count)) => {
3408 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3409
3410 let mut skip_data: Vec<Option<T::T>> = vec![];
3411 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3412 for select in dequeue {
3413 if select.skip {
3414 without_skip_data.drain(0..select.row_count);
3415 } else {
3416 skip_data.extend(without_skip_data.drain(0..select.row_count));
3417 }
3418 }
3419 builder = builder.with_row_selection(selections);
3420
3421 assert_eq!(skip_data.len(), row_count);
3422 skip_data
3423 }
3424 None => {
3425 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3427 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3428 expected_data
3429 }
3430 };
3431
3432 let mut expected_data = match opts.row_filter {
3433 Some(filter) => {
3434 let expected_data = expected_data
3435 .into_iter()
3436 .zip(filter.iter())
3437 .filter_map(|(d, f)| f.then(|| d))
3438 .collect();
3439
3440 let mut filter_offset = 0;
3441 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3442 ProjectionMask::all(),
3443 move |b| {
3444 let array = BooleanArray::from_iter(
3445 filter
3446 .iter()
3447 .skip(filter_offset)
3448 .take(b.num_rows())
3449 .map(|x| Some(*x)),
3450 );
3451 filter_offset += b.num_rows();
3452 Ok(array)
3453 },
3454 ))]);
3455
3456 builder = builder.with_row_filter(filter);
3457 expected_data
3458 }
3459 None => expected_data,
3460 };
3461
3462 if let Some(offset) = opts.offset {
3463 builder = builder.with_offset(offset);
3464 expected_data = expected_data.into_iter().skip(offset).collect();
3465 }
3466
3467 if let Some(limit) = opts.limit {
3468 builder = builder.with_limit(limit);
3469 expected_data = expected_data.into_iter().take(limit).collect();
3470 }
3471
3472 let mut record_reader = builder
3473 .with_batch_size(opts.record_batch_size)
3474 .build()
3475 .unwrap();
3476
3477 let mut total_read = 0;
3478 loop {
3479 let maybe_batch = record_reader.next();
3480 if total_read < expected_data.len() {
3481 let end = min(total_read + opts.record_batch_size, expected_data.len());
3482 let batch = maybe_batch.unwrap().unwrap();
3483 assert_eq!(end - total_read, batch.num_rows());
3484
3485 let a = converter(&expected_data[total_read..end]);
3486 let b = batch.column(0);
3487
3488 assert_eq!(a.data_type(), b.data_type());
3489 assert_eq!(a.to_data(), b.to_data());
3490 assert_eq!(
3491 a.as_any().type_id(),
3492 b.as_any().type_id(),
3493 "incorrect type ids"
3494 );
3495
3496 total_read = end;
3497 } else {
3498 assert!(maybe_batch.is_none());
3499 break;
3500 }
3501 }
3502 }
3503
3504 fn gen_expected_data<T: DataType>(
3505 def_levels: Option<&Vec<Vec<i16>>>,
3506 values: &[Vec<T::T>],
3507 ) -> Vec<Option<T::T>> {
3508 let data: Vec<Option<T::T>> = match def_levels {
3509 Some(levels) => {
3510 let mut values_iter = values.iter().flatten();
3511 levels
3512 .iter()
3513 .flatten()
3514 .map(|d| match d {
3515 1 => Some(values_iter.next().cloned().unwrap()),
3516 0 => None,
3517 _ => unreachable!(),
3518 })
3519 .collect()
3520 }
3521 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3522 };
3523 data
3524 }
3525
3526 fn generate_single_column_file_with_data<T: DataType>(
3527 values: &[Vec<T::T>],
3528 def_levels: Option<&Vec<Vec<i16>>>,
3529 file: File,
3530 schema: TypePtr,
3531 field: Option<Field>,
3532 opts: &TestOptions,
3533 ) -> Result<ParquetMetaData> {
3534 let mut writer_props = opts.writer_props();
3535 if let Some(field) = field {
3536 let arrow_schema = Schema::new(vec![field]);
3537 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3538 }
3539
3540 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3541
3542 for (idx, v) in values.iter().enumerate() {
3543 let def_levels = def_levels.map(|d| d[idx].as_slice());
3544 let mut row_group_writer = writer.next_row_group()?;
3545 {
3546 let mut column_writer = row_group_writer
3547 .next_column()?
3548 .expect("Column writer is none!");
3549
3550 column_writer
3551 .typed::<T>()
3552 .write_batch(v, def_levels, None)?;
3553
3554 column_writer.close()?;
3555 }
3556 row_group_writer.close()?;
3557 }
3558
3559 writer.close()
3560 }
3561
3562 fn get_test_file(file_name: &str) -> File {
3563 let mut path = PathBuf::new();
3564 path.push(arrow::util::test_util::arrow_test_data());
3565 path.push(file_name);
3566
3567 File::open(path.as_path()).expect("File not found!")
3568 }
3569
3570 #[test]
3571 fn test_read_structs() {
3572 let testdata = arrow::util::test_util::parquet_test_data();
3576 let path = format!("{testdata}/nested_structs.rust.parquet");
3577 let file = File::open(&path).unwrap();
3578 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3579
3580 for batch in record_batch_reader {
3581 batch.unwrap();
3582 }
3583
3584 let file = File::open(&path).unwrap();
3585 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3586
3587 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3588 let projected_reader = builder
3589 .with_projection(mask)
3590 .with_batch_size(60)
3591 .build()
3592 .unwrap();
3593
3594 let expected_schema = Schema::new(vec![
3595 Field::new(
3596 "roll_num",
3597 ArrowDataType::Struct(Fields::from(vec![Field::new(
3598 "count",
3599 ArrowDataType::UInt64,
3600 false,
3601 )])),
3602 false,
3603 ),
3604 Field::new(
3605 "PC_CUR",
3606 ArrowDataType::Struct(Fields::from(vec![
3607 Field::new("mean", ArrowDataType::Int64, false),
3608 Field::new("sum", ArrowDataType::Int64, false),
3609 ])),
3610 false,
3611 ),
3612 ]);
3613
3614 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3616
3617 for batch in projected_reader {
3618 let batch = batch.unwrap();
3619 assert_eq!(batch.schema().as_ref(), &expected_schema);
3620 }
3621 }
3622
3623 #[test]
3624 fn test_read_structs_by_name() {
3626 let testdata = arrow::util::test_util::parquet_test_data();
3627 let path = format!("{testdata}/nested_structs.rust.parquet");
3628 let file = File::open(&path).unwrap();
3629 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3630
3631 for batch in record_batch_reader {
3632 batch.unwrap();
3633 }
3634
3635 let file = File::open(&path).unwrap();
3636 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3637
3638 let mask = ProjectionMask::columns(
3639 builder.parquet_schema(),
3640 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3641 );
3642 let projected_reader = builder
3643 .with_projection(mask)
3644 .with_batch_size(60)
3645 .build()
3646 .unwrap();
3647
3648 let expected_schema = Schema::new(vec![
3649 Field::new(
3650 "roll_num",
3651 ArrowDataType::Struct(Fields::from(vec![Field::new(
3652 "count",
3653 ArrowDataType::UInt64,
3654 false,
3655 )])),
3656 false,
3657 ),
3658 Field::new(
3659 "PC_CUR",
3660 ArrowDataType::Struct(Fields::from(vec![
3661 Field::new("mean", ArrowDataType::Int64, false),
3662 Field::new("sum", ArrowDataType::Int64, false),
3663 ])),
3664 false,
3665 ),
3666 ]);
3667
3668 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3669
3670 for batch in projected_reader {
3671 let batch = batch.unwrap();
3672 assert_eq!(batch.schema().as_ref(), &expected_schema);
3673 }
3674 }
3675
3676 #[test]
3677 fn test_read_maps() {
3678 let testdata = arrow::util::test_util::parquet_test_data();
3679 let path = format!("{testdata}/nested_maps.snappy.parquet");
3680 let file = File::open(path).unwrap();
3681 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3682
3683 for batch in record_batch_reader {
3684 batch.unwrap();
3685 }
3686 }
3687
3688 #[test]
3689 fn test_nested_nullability() {
3690 let message_type = "message nested {
3691 OPTIONAL Group group {
3692 REQUIRED INT32 leaf;
3693 }
3694 }";
3695
3696 let file = tempfile::tempfile().unwrap();
3697 let schema = Arc::new(parse_message_type(message_type).unwrap());
3698
3699 {
3700 let mut writer =
3702 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3703 .unwrap();
3704
3705 {
3706 let mut row_group_writer = writer.next_row_group().unwrap();
3707 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3708
3709 column_writer
3710 .typed::<Int32Type>()
3711 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3712 .unwrap();
3713
3714 column_writer.close().unwrap();
3715 row_group_writer.close().unwrap();
3716 }
3717
3718 writer.close().unwrap();
3719 }
3720
3721 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3722 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3723
3724 let reader = builder.with_projection(mask).build().unwrap();
3725
3726 let expected_schema = Schema::new(vec![Field::new(
3727 "group",
3728 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3729 true,
3730 )]);
3731
3732 let batch = reader.into_iter().next().unwrap().unwrap();
3733 assert_eq!(batch.schema().as_ref(), &expected_schema);
3734 assert_eq!(batch.num_rows(), 4);
3735 assert_eq!(batch.column(0).null_count(), 2);
3736 }
3737
3738 #[test]
3739 fn test_invalid_utf8() {
3740 let data = vec![
3742 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3743 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3744 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3745 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3746 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3747 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3748 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3749 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3750 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3751 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3752 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3753 82, 49,
3754 ];
3755
3756 let file = Bytes::from(data);
3757 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3758
3759 let error = record_batch_reader.next().unwrap().unwrap_err();
3760
3761 assert!(
3762 error.to_string().contains("invalid utf-8 sequence"),
3763 "{}",
3764 error
3765 );
3766 }
3767
3768 #[test]
3769 fn test_invalid_utf8_string_array() {
3770 test_invalid_utf8_string_array_inner::<i32>();
3771 }
3772
3773 #[test]
3774 fn test_invalid_utf8_large_string_array() {
3775 test_invalid_utf8_string_array_inner::<i64>();
3776 }
3777
3778 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3779 let cases = [
3780 invalid_utf8_first_char::<O>(),
3781 invalid_utf8_first_char_long_strings::<O>(),
3782 invalid_utf8_later_char::<O>(),
3783 invalid_utf8_later_char_long_strings::<O>(),
3784 invalid_utf8_later_char_really_long_strings::<O>(),
3785 invalid_utf8_later_char_really_long_strings2::<O>(),
3786 ];
3787 for array in &cases {
3788 for encoding in STRING_ENCODINGS {
3789 let array = unsafe {
3792 GenericStringArray::<O>::new_unchecked(
3793 array.offsets().clone(),
3794 array.values().clone(),
3795 array.nulls().cloned(),
3796 )
3797 };
3798 let data_type = array.data_type().clone();
3799 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3800 let err = read_from_parquet(data).unwrap_err();
3801 let expected_err =
3802 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3803 assert!(
3804 err.to_string().contains(expected_err),
3805 "data type: {data_type}, expected: {expected_err}, got: {err}"
3806 );
3807 }
3808 }
3809 }
3810
3811 #[test]
3812 fn test_invalid_utf8_string_view_array() {
3813 let cases = [
3814 invalid_utf8_first_char::<i32>(),
3815 invalid_utf8_first_char_long_strings::<i32>(),
3816 invalid_utf8_later_char::<i32>(),
3817 invalid_utf8_later_char_long_strings::<i32>(),
3818 invalid_utf8_later_char_really_long_strings::<i32>(),
3819 invalid_utf8_later_char_really_long_strings2::<i32>(),
3820 ];
3821
3822 for encoding in STRING_ENCODINGS {
3823 for array in &cases {
3824 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3825 let array = array.as_binary_view();
3826
3827 let array = unsafe {
3830 StringViewArray::new_unchecked(
3831 array.views().clone(),
3832 array.data_buffers().to_vec(),
3833 array.nulls().cloned(),
3834 )
3835 };
3836
3837 let data_type = array.data_type().clone();
3838 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3839 let err = read_from_parquet(data).unwrap_err();
3840 let expected_err =
3841 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3842 assert!(
3843 err.to_string().contains(expected_err),
3844 "data type: {data_type}, expected: {expected_err}, got: {err}"
3845 );
3846 }
3847 }
3848 }
3849
3850 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3852 None,
3853 Some(Encoding::PLAIN),
3854 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3855 Some(Encoding::DELTA_BYTE_ARRAY),
3856 ];
3857
3858 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3861
3862 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3865
3866 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3868 let valid: &[u8] = b" ";
3869 let invalid = INVALID_UTF8_FIRST_CHAR;
3870 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3871 }
3872
3873 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3877 let valid: &[u8] = b" ";
3878 let mut invalid = vec![];
3879 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3880 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3881 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3882 }
3883
3884 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3887 let valid: &[u8] = b" ";
3888 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3889 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3890 }
3891
3892 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3896 let valid: &[u8] = b" ";
3897 let mut invalid = vec![];
3898 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3899 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3900 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3901 }
3902
3903 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3907 let valid: &[u8] = b" ";
3908 let mut invalid = vec![];
3909 for _ in 0..10 {
3910 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3912 }
3913 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3914 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3915 }
3916
3917 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3920 let valid: &[u8] = b" ";
3921 let mut valid_long = vec![];
3922 for _ in 0..10 {
3923 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3925 }
3926 let invalid = INVALID_UTF8_LATER_CHAR;
3927 GenericBinaryArray::<O>::from_iter(vec![
3928 None,
3929 Some(valid),
3930 Some(invalid),
3931 None,
3932 Some(&valid_long),
3933 Some(valid),
3934 ])
3935 }
3936
3937 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3942 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3943 let mut data = vec![];
3944 let schema = batch.schema();
3945 let props = encoding.map(|encoding| {
3946 WriterProperties::builder()
3947 .set_dictionary_enabled(false)
3949 .set_encoding(encoding)
3950 .build()
3951 });
3952
3953 {
3954 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3955 writer.write(&batch).unwrap();
3956 writer.flush().unwrap();
3957 writer.close().unwrap();
3958 };
3959 data
3960 }
3961
3962 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3964 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3965 .unwrap()
3966 .build()
3967 .unwrap();
3968
3969 reader.collect()
3970 }
3971
3972 #[test]
3973 fn test_dictionary_preservation() {
3974 let fields = vec![Arc::new(
3975 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3976 .with_repetition(Repetition::OPTIONAL)
3977 .with_converted_type(ConvertedType::UTF8)
3978 .build()
3979 .unwrap(),
3980 )];
3981
3982 let schema = Arc::new(
3983 Type::group_type_builder("test_schema")
3984 .with_fields(fields)
3985 .build()
3986 .unwrap(),
3987 );
3988
3989 let dict_type = ArrowDataType::Dictionary(
3990 Box::new(ArrowDataType::Int32),
3991 Box::new(ArrowDataType::Utf8),
3992 );
3993
3994 let arrow_field = Field::new("leaf", dict_type, true);
3995
3996 let mut file = tempfile::tempfile().unwrap();
3997
3998 let values = vec![
3999 vec![
4000 ByteArray::from("hello"),
4001 ByteArray::from("a"),
4002 ByteArray::from("b"),
4003 ByteArray::from("d"),
4004 ],
4005 vec![
4006 ByteArray::from("c"),
4007 ByteArray::from("a"),
4008 ByteArray::from("b"),
4009 ],
4010 ];
4011
4012 let def_levels = vec![
4013 vec![1, 0, 0, 1, 0, 0, 1, 1],
4014 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
4015 ];
4016
4017 let opts = TestOptions {
4018 encoding: Encoding::RLE_DICTIONARY,
4019 ..Default::default()
4020 };
4021
4022 generate_single_column_file_with_data::<ByteArrayType>(
4023 &values,
4024 Some(&def_levels),
4025 file.try_clone().unwrap(), schema,
4027 Some(arrow_field),
4028 &opts,
4029 )
4030 .unwrap();
4031
4032 file.rewind().unwrap();
4033
4034 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
4035
4036 let batches = record_reader
4037 .collect::<Result<Vec<RecordBatch>, _>>()
4038 .unwrap();
4039
4040 assert_eq!(batches.len(), 6);
4041 assert!(batches.iter().all(|x| x.num_columns() == 1));
4042
4043 let row_counts = batches
4044 .iter()
4045 .map(|x| (x.num_rows(), x.column(0).null_count()))
4046 .collect::<Vec<_>>();
4047
4048 assert_eq!(
4049 row_counts,
4050 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
4051 );
4052
4053 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
4054
4055 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
4057 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
4059 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
4060 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
4062 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
4063 }
4064
4065 #[test]
4066 fn test_read_null_list() {
4067 let testdata = arrow::util::test_util::parquet_test_data();
4068 let path = format!("{testdata}/null_list.parquet");
4069 let file = File::open(path).unwrap();
4070 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
4071
4072 let batch = record_batch_reader.next().unwrap().unwrap();
4073 assert_eq!(batch.num_rows(), 1);
4074 assert_eq!(batch.num_columns(), 1);
4075 assert_eq!(batch.column(0).len(), 1);
4076
4077 let list = batch
4078 .column(0)
4079 .as_any()
4080 .downcast_ref::<ListArray>()
4081 .unwrap();
4082 assert_eq!(list.len(), 1);
4083 assert!(list.is_valid(0));
4084
4085 let val = list.value(0);
4086 assert_eq!(val.len(), 0);
4087 }
4088
4089 #[test]
4090 fn test_null_schema_inference() {
4091 let testdata = arrow::util::test_util::parquet_test_data();
4092 let path = format!("{testdata}/null_list.parquet");
4093 let file = File::open(path).unwrap();
4094
4095 let arrow_field = Field::new(
4096 "emptylist",
4097 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
4098 true,
4099 );
4100
4101 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4102 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
4103 let schema = builder.schema();
4104 assert_eq!(schema.fields().len(), 1);
4105 assert_eq!(schema.field(0), &arrow_field);
4106 }
4107
4108 #[test]
4109 fn test_skip_metadata() {
4110 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
4111 let field = Field::new("col", col.data_type().clone(), true);
4112
4113 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
4114
4115 let metadata = [("key".to_string(), "value".to_string())]
4116 .into_iter()
4117 .collect();
4118
4119 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
4120
4121 assert_ne!(schema_with_metadata, schema_without_metadata);
4122
4123 let batch =
4124 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
4125
4126 let file = |version: WriterVersion| {
4127 let props = WriterProperties::builder()
4128 .set_writer_version(version)
4129 .build();
4130
4131 let file = tempfile().unwrap();
4132 let mut writer =
4133 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
4134 .unwrap();
4135 writer.write(&batch).unwrap();
4136 writer.close().unwrap();
4137 file
4138 };
4139
4140 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4141
4142 let v1_reader = file(WriterVersion::PARQUET_1_0);
4143 let v2_reader = file(WriterVersion::PARQUET_2_0);
4144
4145 let arrow_reader =
4146 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
4147 assert_eq!(arrow_reader.schema(), schema_with_metadata);
4148
4149 let reader =
4150 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
4151 .unwrap()
4152 .build()
4153 .unwrap();
4154 assert_eq!(reader.schema(), schema_without_metadata);
4155
4156 let arrow_reader =
4157 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4158 assert_eq!(arrow_reader.schema(), schema_with_metadata);
4159
4160 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4161 .unwrap()
4162 .build()
4163 .unwrap();
4164 assert_eq!(reader.schema(), schema_without_metadata);
4165 }
4166
4167 fn write_parquet_from_iter<I, F>(value: I) -> File
4168 where
4169 I: IntoIterator<Item = (F, ArrayRef)>,
4170 F: AsRef<str>,
4171 {
4172 let batch = RecordBatch::try_from_iter(value).unwrap();
4173 let file = tempfile().unwrap();
4174 let mut writer =
4175 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4176 writer.write(&batch).unwrap();
4177 writer.close().unwrap();
4178 file
4179 }
4180
4181 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4182 where
4183 I: IntoIterator<Item = (F, ArrayRef)>,
4184 F: AsRef<str>,
4185 {
4186 let file = write_parquet_from_iter(value);
4187 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4188 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4189 file.try_clone().unwrap(),
4190 options_with_schema,
4191 );
4192 assert_eq!(builder.err().unwrap().to_string(), expected_error);
4193 }
4194
4195 #[test]
4196 fn test_schema_too_few_columns() {
4197 run_schema_test_with_error(
4198 vec![
4199 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4200 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4201 ],
4202 Arc::new(Schema::new(vec![Field::new(
4203 "int64",
4204 ArrowDataType::Int64,
4205 false,
4206 )])),
4207 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4208 );
4209 }
4210
4211 #[test]
4212 fn test_schema_too_many_columns() {
4213 run_schema_test_with_error(
4214 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4215 Arc::new(Schema::new(vec![
4216 Field::new("int64", ArrowDataType::Int64, false),
4217 Field::new("int32", ArrowDataType::Int32, false),
4218 ])),
4219 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4220 );
4221 }
4222
4223 #[test]
4224 fn test_schema_mismatched_column_names() {
4225 run_schema_test_with_error(
4226 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4227 Arc::new(Schema::new(vec![Field::new(
4228 "other",
4229 ArrowDataType::Int64,
4230 false,
4231 )])),
4232 "Arrow: incompatible arrow schema, expected field named int64 got other",
4233 );
4234 }
4235
4236 #[test]
4237 fn test_schema_incompatible_columns() {
4238 run_schema_test_with_error(
4239 vec![
4240 (
4241 "col1_invalid",
4242 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4243 ),
4244 (
4245 "col2_valid",
4246 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4247 ),
4248 (
4249 "col3_invalid",
4250 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4251 ),
4252 ],
4253 Arc::new(Schema::new(vec![
4254 Field::new("col1_invalid", ArrowDataType::Int32, false),
4255 Field::new("col2_valid", ArrowDataType::Int32, false),
4256 Field::new("col3_invalid", ArrowDataType::Int32, false),
4257 ])),
4258 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4259 );
4260 }
4261
4262 #[test]
4263 fn test_one_incompatible_nested_column() {
4264 let nested_fields = Fields::from(vec![
4265 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4266 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4267 ]);
4268 let nested = StructArray::try_new(
4269 nested_fields,
4270 vec![
4271 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4272 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4273 ],
4274 None,
4275 )
4276 .expect("struct array");
4277 let supplied_nested_fields = Fields::from(vec![
4278 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4279 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4280 ]);
4281 run_schema_test_with_error(
4282 vec![
4283 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4284 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4285 ("nested", Arc::new(nested) as ArrayRef),
4286 ],
4287 Arc::new(Schema::new(vec![
4288 Field::new("col1", ArrowDataType::Int64, false),
4289 Field::new("col2", ArrowDataType::Int32, false),
4290 Field::new(
4291 "nested",
4292 ArrowDataType::Struct(supplied_nested_fields),
4293 false,
4294 ),
4295 ])),
4296 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4297 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4298 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4299 );
4300 }
4301
4302 fn utf8_parquet() -> Bytes {
4304 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4305 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4306 let props = None;
4307 let mut parquet_data = vec![];
4309 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4310 writer.write(&batch).unwrap();
4311 writer.close().unwrap();
4312 Bytes::from(parquet_data)
4313 }
4314
4315 #[test]
4316 fn test_schema_error_bad_types() {
4317 let parquet_data = utf8_parquet();
4319
4320 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4322 "column1",
4323 arrow::datatypes::DataType::Int32,
4324 false,
4325 )]));
4326
4327 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4329 let err =
4330 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4331 .unwrap_err();
4332 assert_eq!(
4333 err.to_string(),
4334 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4335 )
4336 }
4337
4338 #[test]
4339 fn test_schema_error_bad_nullability() {
4340 let parquet_data = utf8_parquet();
4342
4343 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4345 "column1",
4346 arrow::datatypes::DataType::Utf8,
4347 true,
4348 )]));
4349
4350 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4352 let err =
4353 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4354 .unwrap_err();
4355 assert_eq!(
4356 err.to_string(),
4357 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4358 )
4359 }
4360
4361 #[test]
4362 fn test_read_binary_as_utf8() {
4363 let file = write_parquet_from_iter(vec![
4364 (
4365 "binary_to_utf8",
4366 Arc::new(BinaryArray::from(vec![
4367 b"one".as_ref(),
4368 b"two".as_ref(),
4369 b"three".as_ref(),
4370 ])) as ArrayRef,
4371 ),
4372 (
4373 "large_binary_to_large_utf8",
4374 Arc::new(LargeBinaryArray::from(vec![
4375 b"one".as_ref(),
4376 b"two".as_ref(),
4377 b"three".as_ref(),
4378 ])) as ArrayRef,
4379 ),
4380 (
4381 "binary_view_to_utf8_view",
4382 Arc::new(BinaryViewArray::from(vec![
4383 b"one".as_ref(),
4384 b"two".as_ref(),
4385 b"three".as_ref(),
4386 ])) as ArrayRef,
4387 ),
4388 ]);
4389 let supplied_fields = Fields::from(vec![
4390 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4391 Field::new(
4392 "large_binary_to_large_utf8",
4393 ArrowDataType::LargeUtf8,
4394 false,
4395 ),
4396 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4397 ]);
4398
4399 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4400 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4401 file.try_clone().unwrap(),
4402 options,
4403 )
4404 .expect("reader builder with schema")
4405 .build()
4406 .expect("reader with schema");
4407
4408 let batch = arrow_reader.next().unwrap().unwrap();
4409 assert_eq!(batch.num_columns(), 3);
4410 assert_eq!(batch.num_rows(), 3);
4411 assert_eq!(
4412 batch
4413 .column(0)
4414 .as_string::<i32>()
4415 .iter()
4416 .collect::<Vec<_>>(),
4417 vec![Some("one"), Some("two"), Some("three")]
4418 );
4419
4420 assert_eq!(
4421 batch
4422 .column(1)
4423 .as_string::<i64>()
4424 .iter()
4425 .collect::<Vec<_>>(),
4426 vec![Some("one"), Some("two"), Some("three")]
4427 );
4428
4429 assert_eq!(
4430 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4431 vec![Some("one"), Some("two"), Some("three")]
4432 );
4433 }
4434
4435 #[test]
4436 #[should_panic(expected = "Invalid UTF8 sequence at")]
4437 fn test_read_non_utf8_binary_as_utf8() {
4438 let file = write_parquet_from_iter(vec![(
4439 "non_utf8_binary",
4440 Arc::new(BinaryArray::from(vec![
4441 b"\xDE\x00\xFF".as_ref(),
4442 b"\xDE\x01\xAA".as_ref(),
4443 b"\xDE\x02\xFF".as_ref(),
4444 ])) as ArrayRef,
4445 )]);
4446 let supplied_fields = Fields::from(vec![Field::new(
4447 "non_utf8_binary",
4448 ArrowDataType::Utf8,
4449 false,
4450 )]);
4451
4452 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4453 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4454 file.try_clone().unwrap(),
4455 options,
4456 )
4457 .expect("reader builder with schema")
4458 .build()
4459 .expect("reader with schema");
4460 arrow_reader.next().unwrap().unwrap_err();
4461 }
4462
4463 #[test]
4464 fn test_with_schema() {
4465 let nested_fields = Fields::from(vec![
4466 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4467 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4468 ]);
4469
4470 let nested_arrays: Vec<ArrayRef> = vec![
4471 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4472 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4473 ];
4474
4475 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4476
4477 let file = write_parquet_from_iter(vec![
4478 (
4479 "int32_to_ts_second",
4480 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4481 ),
4482 (
4483 "date32_to_date64",
4484 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4485 ),
4486 ("nested", Arc::new(nested) as ArrayRef),
4487 ]);
4488
4489 let supplied_nested_fields = Fields::from(vec![
4490 Field::new(
4491 "utf8_to_dict",
4492 ArrowDataType::Dictionary(
4493 Box::new(ArrowDataType::Int32),
4494 Box::new(ArrowDataType::Utf8),
4495 ),
4496 false,
4497 ),
4498 Field::new(
4499 "int64_to_ts_nano",
4500 ArrowDataType::Timestamp(
4501 arrow::datatypes::TimeUnit::Nanosecond,
4502 Some("+10:00".into()),
4503 ),
4504 false,
4505 ),
4506 ]);
4507
4508 let supplied_schema = Arc::new(Schema::new(vec![
4509 Field::new(
4510 "int32_to_ts_second",
4511 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4512 false,
4513 ),
4514 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4515 Field::new(
4516 "nested",
4517 ArrowDataType::Struct(supplied_nested_fields),
4518 false,
4519 ),
4520 ]));
4521
4522 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4523 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4524 file.try_clone().unwrap(),
4525 options,
4526 )
4527 .expect("reader builder with schema")
4528 .build()
4529 .expect("reader with schema");
4530
4531 assert_eq!(arrow_reader.schema(), supplied_schema);
4532 let batch = arrow_reader.next().unwrap().unwrap();
4533 assert_eq!(batch.num_columns(), 3);
4534 assert_eq!(batch.num_rows(), 4);
4535 assert_eq!(
4536 batch
4537 .column(0)
4538 .as_any()
4539 .downcast_ref::<TimestampSecondArray>()
4540 .expect("downcast to timestamp second")
4541 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4542 .map(|v| v.to_string())
4543 .expect("value as datetime"),
4544 "1970-01-01 01:00:00 +01:00"
4545 );
4546 assert_eq!(
4547 batch
4548 .column(1)
4549 .as_any()
4550 .downcast_ref::<Date64Array>()
4551 .expect("downcast to date64")
4552 .value_as_date(0)
4553 .map(|v| v.to_string())
4554 .expect("value as date"),
4555 "1970-01-01"
4556 );
4557
4558 let nested = batch
4559 .column(2)
4560 .as_any()
4561 .downcast_ref::<StructArray>()
4562 .expect("downcast to struct");
4563
4564 let nested_dict = nested
4565 .column(0)
4566 .as_any()
4567 .downcast_ref::<Int32DictionaryArray>()
4568 .expect("downcast to dictionary");
4569
4570 assert_eq!(
4571 nested_dict
4572 .values()
4573 .as_any()
4574 .downcast_ref::<StringArray>()
4575 .expect("downcast to string")
4576 .iter()
4577 .collect::<Vec<_>>(),
4578 vec![Some("a"), Some("b")]
4579 );
4580
4581 assert_eq!(
4582 nested_dict.keys().iter().collect::<Vec<_>>(),
4583 vec![Some(0), Some(0), Some(0), Some(1)]
4584 );
4585
4586 assert_eq!(
4587 nested
4588 .column(1)
4589 .as_any()
4590 .downcast_ref::<TimestampNanosecondArray>()
4591 .expect("downcast to timestamp nanosecond")
4592 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4593 .map(|v| v.to_string())
4594 .expect("value as datetime"),
4595 "1970-01-01 10:00:00.000000001 +10:00"
4596 );
4597 }
4598
4599 #[test]
4600 fn test_empty_projection() {
4601 let testdata = arrow::util::test_util::parquet_test_data();
4602 let path = format!("{testdata}/alltypes_plain.parquet");
4603 let file = File::open(path).unwrap();
4604
4605 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4606 let file_metadata = builder.metadata().file_metadata();
4607 let expected_rows = file_metadata.num_rows() as usize;
4608
4609 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4610 let batch_reader = builder
4611 .with_projection(mask)
4612 .with_batch_size(2)
4613 .build()
4614 .unwrap();
4615
4616 let mut total_rows = 0;
4617 for maybe_batch in batch_reader {
4618 let batch = maybe_batch.unwrap();
4619 total_rows += batch.num_rows();
4620 assert_eq!(batch.num_columns(), 0);
4621 assert!(batch.num_rows() <= 2);
4622 }
4623
4624 assert_eq!(total_rows, expected_rows);
4625 }
4626
4627 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4628 let schema = Arc::new(Schema::new(vec![Field::new(
4629 "list",
4630 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4631 true,
4632 )]));
4633
4634 let mut buf = Vec::with_capacity(1024);
4635
4636 let mut writer = ArrowWriter::try_new(
4637 &mut buf,
4638 schema.clone(),
4639 Some(
4640 WriterProperties::builder()
4641 .set_max_row_group_size(row_group_size)
4642 .build(),
4643 ),
4644 )
4645 .unwrap();
4646 for _ in 0..2 {
4647 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4648 for _ in 0..(batch_size) {
4649 list_builder.append(true);
4650 }
4651 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4652 .unwrap();
4653 writer.write(&batch).unwrap();
4654 }
4655 writer.close().unwrap();
4656
4657 let mut record_reader =
4658 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4659 assert_eq!(
4660 batch_size,
4661 record_reader.next().unwrap().unwrap().num_rows()
4662 );
4663 assert_eq!(
4664 batch_size,
4665 record_reader.next().unwrap().unwrap().num_rows()
4666 );
4667 }
4668
4669 #[test]
4670 fn test_row_group_exact_multiple() {
4671 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4672 test_row_group_batch(8, 8);
4673 test_row_group_batch(10, 8);
4674 test_row_group_batch(8, 10);
4675 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4676 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4677 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4678 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4679 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4680 }
4681
4682 fn get_expected_batches(
4685 column: &RecordBatch,
4686 selection: &RowSelection,
4687 batch_size: usize,
4688 ) -> Vec<RecordBatch> {
4689 let mut expected_batches = vec![];
4690
4691 let mut selection: VecDeque<_> = selection.clone().into();
4692 let mut row_offset = 0;
4693 let mut last_start = None;
4694 while row_offset < column.num_rows() && !selection.is_empty() {
4695 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4696 while batch_remaining > 0 && !selection.is_empty() {
4697 let (to_read, skip) = match selection.front_mut() {
4698 Some(selection) if selection.row_count > batch_remaining => {
4699 selection.row_count -= batch_remaining;
4700 (batch_remaining, selection.skip)
4701 }
4702 Some(_) => {
4703 let select = selection.pop_front().unwrap();
4704 (select.row_count, select.skip)
4705 }
4706 None => break,
4707 };
4708
4709 batch_remaining -= to_read;
4710
4711 match skip {
4712 true => {
4713 if let Some(last_start) = last_start.take() {
4714 expected_batches.push(column.slice(last_start, row_offset - last_start))
4715 }
4716 row_offset += to_read
4717 }
4718 false => {
4719 last_start.get_or_insert(row_offset);
4720 row_offset += to_read
4721 }
4722 }
4723 }
4724 }
4725
4726 if let Some(last_start) = last_start.take() {
4727 expected_batches.push(column.slice(last_start, row_offset - last_start))
4728 }
4729
4730 for batch in &expected_batches[..expected_batches.len() - 1] {
4732 assert_eq!(batch.num_rows(), batch_size);
4733 }
4734
4735 expected_batches
4736 }
4737
4738 fn create_test_selection(
4739 step_len: usize,
4740 total_len: usize,
4741 skip_first: bool,
4742 ) -> (RowSelection, usize) {
4743 let mut remaining = total_len;
4744 let mut skip = skip_first;
4745 let mut vec = vec![];
4746 let mut selected_count = 0;
4747 while remaining != 0 {
4748 let step = if remaining > step_len {
4749 step_len
4750 } else {
4751 remaining
4752 };
4753 vec.push(RowSelector {
4754 row_count: step,
4755 skip,
4756 });
4757 remaining -= step;
4758 if !skip {
4759 selected_count += step;
4760 }
4761 skip = !skip;
4762 }
4763 (vec.into(), selected_count)
4764 }
4765
4766 #[test]
4767 fn test_scan_row_with_selection() {
4768 let testdata = arrow::util::test_util::parquet_test_data();
4769 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4770 let test_file = File::open(&path).unwrap();
4771
4772 let mut serial_reader =
4773 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4774 let data = serial_reader.next().unwrap().unwrap();
4775
4776 let do_test = |batch_size: usize, selection_len: usize| {
4777 for skip_first in [false, true] {
4778 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4779
4780 let expected = get_expected_batches(&data, &selections, batch_size);
4781 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4782 assert_eq!(
4783 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4784 expected,
4785 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4786 );
4787 }
4788 };
4789
4790 do_test(1000, 1000);
4793
4794 do_test(20, 20);
4796
4797 do_test(20, 5);
4799
4800 do_test(20, 5);
4803
4804 fn create_skip_reader(
4805 test_file: &File,
4806 batch_size: usize,
4807 selections: RowSelection,
4808 ) -> ParquetRecordBatchReader {
4809 let options =
4810 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4811 let file = test_file.try_clone().unwrap();
4812 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4813 .unwrap()
4814 .with_batch_size(batch_size)
4815 .with_row_selection(selections)
4816 .build()
4817 .unwrap()
4818 }
4819 }
4820
4821 #[test]
4822 fn test_batch_size_overallocate() {
4823 let testdata = arrow::util::test_util::parquet_test_data();
4824 let path = format!("{testdata}/alltypes_plain.parquet");
4826 let test_file = File::open(path).unwrap();
4827
4828 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4829 let num_rows = builder.metadata.file_metadata().num_rows();
4830 let reader = builder
4831 .with_batch_size(1024)
4832 .with_projection(ProjectionMask::all())
4833 .build()
4834 .unwrap();
4835 assert_ne!(1024, num_rows);
4836 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4837 }
4838
4839 #[test]
4840 fn test_read_with_page_index_enabled() {
4841 let testdata = arrow::util::test_util::parquet_test_data();
4842
4843 {
4844 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4846 let test_file = File::open(path).unwrap();
4847 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4848 test_file,
4849 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4850 )
4851 .unwrap();
4852 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4853 let reader = builder.build().unwrap();
4854 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4855 assert_eq!(batches.len(), 8);
4856 }
4857
4858 {
4859 let path = format!("{testdata}/alltypes_plain.parquet");
4861 let test_file = File::open(path).unwrap();
4862 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4863 test_file,
4864 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4865 )
4866 .unwrap();
4867 assert!(builder.metadata().offset_index().is_none());
4870 let reader = builder.build().unwrap();
4871 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4872 assert_eq!(batches.len(), 1);
4873 }
4874 }
4875
4876 #[test]
4877 fn test_raw_repetition() {
4878 const MESSAGE_TYPE: &str = "
4879 message Log {
4880 OPTIONAL INT32 eventType;
4881 REPEATED INT32 category;
4882 REPEATED group filter {
4883 OPTIONAL INT32 error;
4884 }
4885 }
4886 ";
4887 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4888 let props = Default::default();
4889
4890 let mut buf = Vec::with_capacity(1024);
4891 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4892 let mut row_group_writer = writer.next_row_group().unwrap();
4893
4894 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4896 col_writer
4897 .typed::<Int32Type>()
4898 .write_batch(&[1], Some(&[1]), None)
4899 .unwrap();
4900 col_writer.close().unwrap();
4901 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4903 col_writer
4904 .typed::<Int32Type>()
4905 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4906 .unwrap();
4907 col_writer.close().unwrap();
4908 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4910 col_writer
4911 .typed::<Int32Type>()
4912 .write_batch(&[1], Some(&[1]), Some(&[0]))
4913 .unwrap();
4914 col_writer.close().unwrap();
4915
4916 let rg_md = row_group_writer.close().unwrap();
4917 assert_eq!(rg_md.num_rows(), 1);
4918 writer.close().unwrap();
4919
4920 let bytes = Bytes::from(buf);
4921
4922 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4923 let full = no_mask.next().unwrap().unwrap();
4924
4925 assert_eq!(full.num_columns(), 3);
4926
4927 for idx in 0..3 {
4928 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4929 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4930 let mut reader = b.with_projection(mask).build().unwrap();
4931 let projected = reader.next().unwrap().unwrap();
4932
4933 assert_eq!(projected.num_columns(), 1);
4934 assert_eq!(full.column(idx), projected.column(0));
4935 }
4936 }
4937
4938 #[test]
4939 fn test_read_lz4_raw() {
4940 let testdata = arrow::util::test_util::parquet_test_data();
4941 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4942 let file = File::open(path).unwrap();
4943
4944 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4945 .unwrap()
4946 .collect::<Result<Vec<_>, _>>()
4947 .unwrap();
4948 assert_eq!(batches.len(), 1);
4949 let batch = &batches[0];
4950
4951 assert_eq!(batch.num_columns(), 3);
4952 assert_eq!(batch.num_rows(), 4);
4953
4954 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4956 assert_eq!(
4957 a.values(),
4958 &[1593604800, 1593604800, 1593604801, 1593604801]
4959 );
4960
4961 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4962 let a: Vec<_> = a.iter().flatten().collect();
4963 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4964
4965 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4966 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4967 }
4968
4969 #[test]
4979 fn test_read_lz4_hadoop_fallback() {
4980 for file in [
4981 "hadoop_lz4_compressed.parquet",
4982 "non_hadoop_lz4_compressed.parquet",
4983 ] {
4984 let testdata = arrow::util::test_util::parquet_test_data();
4985 let path = format!("{testdata}/{file}");
4986 let file = File::open(path).unwrap();
4987 let expected_rows = 4;
4988
4989 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4990 .unwrap()
4991 .collect::<Result<Vec<_>, _>>()
4992 .unwrap();
4993 assert_eq!(batches.len(), 1);
4994 let batch = &batches[0];
4995
4996 assert_eq!(batch.num_columns(), 3);
4997 assert_eq!(batch.num_rows(), expected_rows);
4998
4999 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
5000 assert_eq!(
5001 a.values(),
5002 &[1593604800, 1593604800, 1593604801, 1593604801]
5003 );
5004
5005 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
5006 let b: Vec<_> = b.iter().flatten().collect();
5007 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
5008
5009 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
5010 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
5011 }
5012 }
5013
5014 #[test]
5015 fn test_read_lz4_hadoop_large() {
5016 let testdata = arrow::util::test_util::parquet_test_data();
5017 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
5018 let file = File::open(path).unwrap();
5019 let expected_rows = 10000;
5020
5021 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
5022 .unwrap()
5023 .collect::<Result<Vec<_>, _>>()
5024 .unwrap();
5025 assert_eq!(batches.len(), 1);
5026 let batch = &batches[0];
5027
5028 assert_eq!(batch.num_columns(), 1);
5029 assert_eq!(batch.num_rows(), expected_rows);
5030
5031 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
5032 let a: Vec<_> = a.iter().flatten().collect();
5033 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
5034 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
5035 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
5036 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
5037 }
5038
5039 #[test]
5040 #[cfg(feature = "snap")]
5041 fn test_read_nested_lists() {
5042 let testdata = arrow::util::test_util::parquet_test_data();
5043 let path = format!("{testdata}/nested_lists.snappy.parquet");
5044 let file = File::open(path).unwrap();
5045
5046 let f = file.try_clone().unwrap();
5047 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
5048 let expected = reader.next().unwrap().unwrap();
5049 assert_eq!(expected.num_rows(), 3);
5050
5051 let selection = RowSelection::from(vec![
5052 RowSelector::skip(1),
5053 RowSelector::select(1),
5054 RowSelector::skip(1),
5055 ]);
5056 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5057 .unwrap()
5058 .with_row_selection(selection)
5059 .build()
5060 .unwrap();
5061
5062 let actual = reader.next().unwrap().unwrap();
5063 assert_eq!(actual.num_rows(), 1);
5064 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
5065 }
5066
5067 #[test]
5068 fn test_arbitrary_decimal() {
5069 let values = [1, 2, 3, 4, 5, 6, 7, 8];
5070 let decimals_19_0 = Decimal128Array::from_iter_values(values)
5071 .with_precision_and_scale(19, 0)
5072 .unwrap();
5073 let decimals_12_0 = Decimal128Array::from_iter_values(values)
5074 .with_precision_and_scale(12, 0)
5075 .unwrap();
5076 let decimals_17_10 = Decimal128Array::from_iter_values(values)
5077 .with_precision_and_scale(17, 10)
5078 .unwrap();
5079
5080 let written = RecordBatch::try_from_iter([
5081 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
5082 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
5083 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
5084 ])
5085 .unwrap();
5086
5087 let mut buffer = Vec::with_capacity(1024);
5088 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
5089 writer.write(&written).unwrap();
5090 writer.close().unwrap();
5091
5092 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
5093 .unwrap()
5094 .collect::<Result<Vec<_>, _>>()
5095 .unwrap();
5096
5097 assert_eq!(&written.slice(0, 8), &read[0]);
5098 }
5099
5100 #[test]
5101 fn test_list_skip() {
5102 let mut list = ListBuilder::new(Int32Builder::new());
5103 list.append_value([Some(1), Some(2)]);
5104 list.append_value([Some(3)]);
5105 list.append_value([Some(4)]);
5106 let list = list.finish();
5107 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
5108
5109 let props = WriterProperties::builder()
5111 .set_data_page_row_count_limit(1)
5112 .set_write_batch_size(2)
5113 .build();
5114
5115 let mut buffer = Vec::with_capacity(1024);
5116 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
5117 writer.write(&batch).unwrap();
5118 writer.close().unwrap();
5119
5120 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
5121 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
5122 .unwrap()
5123 .with_row_selection(selection.into())
5124 .build()
5125 .unwrap();
5126 let out = reader.next().unwrap().unwrap();
5127 assert_eq!(out.num_rows(), 1);
5128 assert_eq!(out, batch.slice(2, 1));
5129 }
5130
5131 #[test]
5132 fn test_row_selection_interleaved_skip() -> Result<()> {
5133 let schema = Arc::new(Schema::new(vec![Field::new(
5134 "v",
5135 ArrowDataType::Int32,
5136 false,
5137 )]));
5138
5139 let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
5140 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
5141
5142 let mut buffer = Vec::with_capacity(1024);
5143 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
5144 writer.write(&batch)?;
5145 writer.close()?;
5146
5147 let selection = RowSelection::from(vec![
5148 RowSelector::select(1),
5149 RowSelector::skip(2),
5150 RowSelector::select(2),
5151 ]);
5152
5153 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
5154 .with_batch_size(4)
5155 .with_row_selection(selection)
5156 .build()?;
5157
5158 let out = reader.next().unwrap()?;
5159 assert_eq!(out.num_rows(), 3);
5160 let values = out
5161 .column(0)
5162 .as_primitive::<arrow_array::types::Int32Type>()
5163 .values();
5164 assert_eq!(values, &[0, 3, 4]);
5165 assert!(reader.next().is_none());
5166 Ok(())
5167 }
5168
5169 #[test]
5170 fn test_row_selection_mask_sparse_rows() -> Result<()> {
5171 let schema = Arc::new(Schema::new(vec![Field::new(
5172 "v",
5173 ArrowDataType::Int32,
5174 false,
5175 )]));
5176
5177 let values = Int32Array::from((0..30).collect::<Vec<i32>>());
5178 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
5179
5180 let mut buffer = Vec::with_capacity(1024);
5181 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
5182 writer.write(&batch)?;
5183 writer.close()?;
5184
5185 let total_rows = batch.num_rows();
5186 let ranges = (1..total_rows)
5187 .step_by(2)
5188 .map(|i| i..i + 1)
5189 .collect::<Vec<_>>();
5190 let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
5191
5192 let selectors: Vec<RowSelector> = selection.clone().into();
5193 assert!(total_rows < selectors.len() * 8);
5194
5195 let bytes = Bytes::from(buffer);
5196
5197 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
5198 .with_batch_size(7)
5199 .with_row_selection(selection)
5200 .build()?;
5201
5202 let mut collected = Vec::new();
5203 for batch in reader {
5204 let batch = batch?;
5205 collected.extend_from_slice(
5206 batch
5207 .column(0)
5208 .as_primitive::<arrow_array::types::Int32Type>()
5209 .values(),
5210 );
5211 }
5212
5213 let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
5214 assert_eq!(collected, expected);
5215 Ok(())
5216 }
5217
5218 fn test_decimal32_roundtrip() {
5219 let d = |values: Vec<i32>, p: u8| {
5220 let iter = values.into_iter();
5221 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5222 .with_precision_and_scale(p, 2)
5223 .unwrap()
5224 };
5225
5226 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5227 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5228
5229 let mut buffer = Vec::with_capacity(1024);
5230 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5231 writer.write(&batch).unwrap();
5232 writer.close().unwrap();
5233
5234 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5235 let t1 = builder.parquet_schema().columns()[0].physical_type();
5236 assert_eq!(t1, PhysicalType::INT32);
5237
5238 let mut reader = builder.build().unwrap();
5239 assert_eq!(batch.schema(), reader.schema());
5240
5241 let out = reader.next().unwrap().unwrap();
5242 assert_eq!(batch, out);
5243 }
5244
5245 fn test_decimal64_roundtrip() {
5246 let d = |values: Vec<i64>, p: u8| {
5250 let iter = values.into_iter();
5251 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5252 .with_precision_and_scale(p, 2)
5253 .unwrap()
5254 };
5255
5256 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5257 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5258 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5259
5260 let batch = RecordBatch::try_from_iter([
5261 ("d1", Arc::new(d1) as ArrayRef),
5262 ("d2", Arc::new(d2) as ArrayRef),
5263 ("d3", Arc::new(d3) as ArrayRef),
5264 ])
5265 .unwrap();
5266
5267 let mut buffer = Vec::with_capacity(1024);
5268 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5269 writer.write(&batch).unwrap();
5270 writer.close().unwrap();
5271
5272 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5273 let t1 = builder.parquet_schema().columns()[0].physical_type();
5274 assert_eq!(t1, PhysicalType::INT32);
5275 let t2 = builder.parquet_schema().columns()[1].physical_type();
5276 assert_eq!(t2, PhysicalType::INT64);
5277 let t3 = builder.parquet_schema().columns()[2].physical_type();
5278 assert_eq!(t3, PhysicalType::INT64);
5279
5280 let mut reader = builder.build().unwrap();
5281 assert_eq!(batch.schema(), reader.schema());
5282
5283 let out = reader.next().unwrap().unwrap();
5284 assert_eq!(batch, out);
5285 }
5286
5287 fn test_decimal_roundtrip<T: DecimalType>() {
5288 let d = |values: Vec<usize>, p: u8| {
5293 let iter = values.into_iter().map(T::Native::usize_as);
5294 PrimitiveArray::<T>::from_iter_values(iter)
5295 .with_precision_and_scale(p, 2)
5296 .unwrap()
5297 };
5298
5299 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5300 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5301 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5302 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5303
5304 let batch = RecordBatch::try_from_iter([
5305 ("d1", Arc::new(d1) as ArrayRef),
5306 ("d2", Arc::new(d2) as ArrayRef),
5307 ("d3", Arc::new(d3) as ArrayRef),
5308 ("d4", Arc::new(d4) as ArrayRef),
5309 ])
5310 .unwrap();
5311
5312 let mut buffer = Vec::with_capacity(1024);
5313 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5314 writer.write(&batch).unwrap();
5315 writer.close().unwrap();
5316
5317 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5318 let t1 = builder.parquet_schema().columns()[0].physical_type();
5319 assert_eq!(t1, PhysicalType::INT32);
5320 let t2 = builder.parquet_schema().columns()[1].physical_type();
5321 assert_eq!(t2, PhysicalType::INT64);
5322 let t3 = builder.parquet_schema().columns()[2].physical_type();
5323 assert_eq!(t3, PhysicalType::INT64);
5324 let t4 = builder.parquet_schema().columns()[3].physical_type();
5325 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5326
5327 let mut reader = builder.build().unwrap();
5328 assert_eq!(batch.schema(), reader.schema());
5329
5330 let out = reader.next().unwrap().unwrap();
5331 assert_eq!(batch, out);
5332 }
5333
5334 #[test]
5335 fn test_decimal() {
5336 test_decimal32_roundtrip();
5337 test_decimal64_roundtrip();
5338 test_decimal_roundtrip::<Decimal128Type>();
5339 test_decimal_roundtrip::<Decimal256Type>();
5340 }
5341
5342 #[test]
5343 fn test_list_selection() {
5344 let schema = Arc::new(Schema::new(vec![Field::new_list(
5345 "list",
5346 Field::new_list_field(ArrowDataType::Utf8, true),
5347 false,
5348 )]));
5349 let mut buf = Vec::with_capacity(1024);
5350
5351 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5352
5353 for i in 0..2 {
5354 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5355 for j in 0..1024 {
5356 list_a_builder.values().append_value(format!("{i} {j}"));
5357 list_a_builder.append(true);
5358 }
5359 let batch =
5360 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5361 .unwrap();
5362 writer.write(&batch).unwrap();
5363 }
5364 let _metadata = writer.close().unwrap();
5365
5366 let buf = Bytes::from(buf);
5367 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5368 .unwrap()
5369 .with_row_selection(RowSelection::from(vec![
5370 RowSelector::skip(100),
5371 RowSelector::select(924),
5372 RowSelector::skip(100),
5373 RowSelector::select(924),
5374 ]))
5375 .build()
5376 .unwrap();
5377
5378 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5379 let batch = concat_batches(&schema, &batches).unwrap();
5380
5381 assert_eq!(batch.num_rows(), 924 * 2);
5382 let list = batch.column(0).as_list::<i32>();
5383
5384 for w in list.value_offsets().windows(2) {
5385 assert_eq!(w[0] + 1, w[1])
5386 }
5387 let mut values = list.values().as_string::<i32>().iter();
5388
5389 for i in 0..2 {
5390 for j in 100..1024 {
5391 let expected = format!("{i} {j}");
5392 assert_eq!(values.next().unwrap().unwrap(), &expected);
5393 }
5394 }
5395 }
5396
5397 #[test]
5398 fn test_list_selection_fuzz() {
5399 let mut rng = rng();
5400 let schema = Arc::new(Schema::new(vec![Field::new_list(
5401 "list",
5402 Field::new_list(
5403 Field::LIST_FIELD_DEFAULT_NAME,
5404 Field::new_list_field(ArrowDataType::Int32, true),
5405 true,
5406 ),
5407 true,
5408 )]));
5409 let mut buf = Vec::with_capacity(1024);
5410 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5411
5412 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5413
5414 for _ in 0..2048 {
5415 if rng.random_bool(0.2) {
5416 list_a_builder.append(false);
5417 continue;
5418 }
5419
5420 let list_a_len = rng.random_range(0..10);
5421 let list_b_builder = list_a_builder.values();
5422
5423 for _ in 0..list_a_len {
5424 if rng.random_bool(0.2) {
5425 list_b_builder.append(false);
5426 continue;
5427 }
5428
5429 let list_b_len = rng.random_range(0..10);
5430 let int_builder = list_b_builder.values();
5431 for _ in 0..list_b_len {
5432 match rng.random_bool(0.2) {
5433 true => int_builder.append_null(),
5434 false => int_builder.append_value(rng.random()),
5435 }
5436 }
5437 list_b_builder.append(true)
5438 }
5439 list_a_builder.append(true);
5440 }
5441
5442 let array = Arc::new(list_a_builder.finish());
5443 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5444
5445 writer.write(&batch).unwrap();
5446 let _metadata = writer.close().unwrap();
5447
5448 let buf = Bytes::from(buf);
5449
5450 let cases = [
5451 vec![
5452 RowSelector::skip(100),
5453 RowSelector::select(924),
5454 RowSelector::skip(100),
5455 RowSelector::select(924),
5456 ],
5457 vec![
5458 RowSelector::select(924),
5459 RowSelector::skip(100),
5460 RowSelector::select(924),
5461 RowSelector::skip(100),
5462 ],
5463 vec![
5464 RowSelector::skip(1023),
5465 RowSelector::select(1),
5466 RowSelector::skip(1023),
5467 RowSelector::select(1),
5468 ],
5469 vec![
5470 RowSelector::select(1),
5471 RowSelector::skip(1023),
5472 RowSelector::select(1),
5473 RowSelector::skip(1023),
5474 ],
5475 ];
5476
5477 for batch_size in [100, 1024, 2048] {
5478 for selection in &cases {
5479 let selection = RowSelection::from(selection.clone());
5480 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5481 .unwrap()
5482 .with_row_selection(selection.clone())
5483 .with_batch_size(batch_size)
5484 .build()
5485 .unwrap();
5486
5487 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5488 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5489 assert_eq!(actual.num_rows(), selection.row_count());
5490
5491 let mut batch_offset = 0;
5492 let mut actual_offset = 0;
5493 for selector in selection.iter() {
5494 if selector.skip {
5495 batch_offset += selector.row_count;
5496 continue;
5497 }
5498
5499 assert_eq!(
5500 batch.slice(batch_offset, selector.row_count),
5501 actual.slice(actual_offset, selector.row_count)
5502 );
5503
5504 batch_offset += selector.row_count;
5505 actual_offset += selector.row_count;
5506 }
5507 }
5508 }
5509 }
5510
5511 #[test]
5512 fn test_read_old_nested_list() {
5513 use arrow::datatypes::DataType;
5514 use arrow::datatypes::ToByteSlice;
5515
5516 let testdata = arrow::util::test_util::parquet_test_data();
5517 let path = format!("{testdata}/old_list_structure.parquet");
5526 let test_file = File::open(path).unwrap();
5527
5528 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5530
5531 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5533
5534 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5536 "array",
5537 DataType::Int32,
5538 false,
5539 ))))
5540 .len(2)
5541 .add_buffer(a_value_offsets)
5542 .add_child_data(a_values.into_data())
5543 .build()
5544 .unwrap();
5545 let a = ListArray::from(a_list_data);
5546
5547 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5548 let mut reader = builder.build().unwrap();
5549 let out = reader.next().unwrap().unwrap();
5550 assert_eq!(out.num_rows(), 1);
5551 assert_eq!(out.num_columns(), 1);
5552 let c0 = out.column(0);
5554 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5555 let r0 = c0arr.value(0);
5557 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5558 assert_eq!(r0arr, &a);
5559 }
5560
5561 #[test]
5562 fn test_map_no_value() {
5563 let testdata = arrow::util::test_util::parquet_test_data();
5583 let path = format!("{testdata}/map_no_value.parquet");
5584 let file = File::open(path).unwrap();
5585
5586 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5587 .unwrap()
5588 .build()
5589 .unwrap();
5590 let out = reader.next().unwrap().unwrap();
5591 assert_eq!(out.num_rows(), 3);
5592 assert_eq!(out.num_columns(), 3);
5593 let c0 = out.column(1).as_list::<i32>();
5595 let c1 = out.column(2).as_list::<i32>();
5596 assert_eq!(c0.len(), c1.len());
5597 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5598 }
5599
5600 #[test]
5601 fn test_row_filter_full_page_skip_is_handled() {
5602 let first_value: i64 = 1111;
5603 let last_value: i64 = 9999;
5604 let num_rows: usize = 12;
5605
5606 let schema = Arc::new(Schema::new(vec![
5610 Field::new("key", arrow_schema::DataType::Int64, false),
5611 Field::new("value", arrow_schema::DataType::Int64, false),
5612 ]));
5613
5614 let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5615 int_values[0] = first_value;
5616 int_values[num_rows - 1] = last_value;
5617 let keys = Int64Array::from(int_values.clone());
5618 let values = Int64Array::from(int_values.clone());
5619 let batch = RecordBatch::try_new(
5620 Arc::clone(&schema),
5621 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5622 )
5623 .unwrap();
5624
5625 let props = WriterProperties::builder()
5626 .set_write_batch_size(2)
5627 .set_data_page_row_count_limit(2)
5628 .build();
5629
5630 let mut buffer = Vec::new();
5631 let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5632 writer.write(&batch).unwrap();
5633 writer.close().unwrap();
5634 let data = Bytes::from(buffer);
5635
5636 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5637 let builder =
5638 ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5639 let schema = builder.parquet_schema().clone();
5640 let filter_mask = ProjectionMask::leaves(&schema, [0]);
5641
5642 let make_predicate = |mask: ProjectionMask| {
5643 ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5644 let column = batch.column(0);
5645 let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5646 let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5647 or(&match_first, &match_second)
5648 })
5649 };
5650
5651 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5652 let predicate = make_predicate(filter_mask.clone());
5653
5654 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5657 .unwrap()
5658 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5659 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5660 .with_batch_size(12)
5661 .build()
5662 .unwrap();
5663
5664 let schema = reader.schema().clone();
5667 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5668 let result = concat_batches(&schema, &batches).unwrap();
5669 assert_eq!(result.num_rows(), 2);
5670 }
5671
5672 #[test]
5673 fn test_get_row_group_column_bloom_filter_with_length() {
5674 let testdata = arrow::util::test_util::parquet_test_data();
5676 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5677 let file = File::open(path).unwrap();
5678 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5679 let schema = builder.schema().clone();
5680 let reader = builder.build().unwrap();
5681
5682 let mut parquet_data = Vec::new();
5683 let props = WriterProperties::builder()
5684 .set_bloom_filter_enabled(true)
5685 .build();
5686 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5687 for batch in reader {
5688 let batch = batch.unwrap();
5689 writer.write(&batch).unwrap();
5690 }
5691 writer.close().unwrap();
5692
5693 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5695 }
5696
5697 #[test]
5698 fn test_get_row_group_column_bloom_filter_without_length() {
5699 let testdata = arrow::util::test_util::parquet_test_data();
5700 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5701 let data = Bytes::from(std::fs::read(path).unwrap());
5702 test_get_row_group_column_bloom_filter(data, false);
5703 }
5704
5705 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5706 let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5707
5708 let metadata = builder.metadata();
5709 assert_eq!(metadata.num_row_groups(), 1);
5710 let row_group = metadata.row_group(0);
5711 let column = row_group.column(0);
5712 assert_eq!(column.bloom_filter_length().is_some(), with_length);
5713
5714 let sbbf = builder
5715 .get_row_group_column_bloom_filter(0, 0)
5716 .unwrap()
5717 .unwrap();
5718 assert!(sbbf.check(&"Hello"));
5719 assert!(!sbbf.check(&"Hello_Not_Exists"));
5720 }
5721
5722 #[test]
5723 fn test_read_unknown_logical_type() {
5724 let testdata = arrow::util::test_util::parquet_test_data();
5725 let path = format!("{testdata}/unknown-logical-type.parquet");
5726 let test_file = File::open(path).unwrap();
5727
5728 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5729 .expect("Error creating reader builder");
5730
5731 let schema = builder.metadata().file_metadata().schema_descr();
5732 assert_eq!(
5733 schema.column(0).logical_type_ref(),
5734 Some(&LogicalType::String)
5735 );
5736 assert_eq!(
5737 schema.column(1).logical_type_ref(),
5738 Some(&LogicalType::_Unknown { field_id: 2555 })
5739 );
5740 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5741
5742 let mut reader = builder.build().unwrap();
5743 let out = reader.next().unwrap().unwrap();
5744 assert_eq!(out.num_rows(), 3);
5745 assert_eq!(out.num_columns(), 2);
5746 }
5747
5748 #[test]
5749 fn test_read_row_numbers() {
5750 let file = write_parquet_from_iter(vec![(
5751 "value",
5752 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5753 )]);
5754 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5755
5756 let row_number_field = Arc::new(
5757 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5758 );
5759
5760 let options = ArrowReaderOptions::new()
5761 .with_schema(Arc::new(Schema::new(supplied_fields)))
5762 .with_virtual_columns(vec![row_number_field.clone()])
5763 .unwrap();
5764 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5765 file.try_clone().unwrap(),
5766 options,
5767 )
5768 .expect("reader builder with schema")
5769 .build()
5770 .expect("reader with schema");
5771
5772 let batch = arrow_reader.next().unwrap().unwrap();
5773 let schema = Arc::new(Schema::new(vec![
5774 Field::new("value", ArrowDataType::Int64, false),
5775 (*row_number_field).clone(),
5776 ]));
5777
5778 assert_eq!(batch.schema(), schema);
5779 assert_eq!(batch.num_columns(), 2);
5780 assert_eq!(batch.num_rows(), 3);
5781 assert_eq!(
5782 batch
5783 .column(0)
5784 .as_primitive::<types::Int64Type>()
5785 .iter()
5786 .collect::<Vec<_>>(),
5787 vec![Some(1), Some(2), Some(3)]
5788 );
5789 assert_eq!(
5790 batch
5791 .column(1)
5792 .as_primitive::<types::Int64Type>()
5793 .iter()
5794 .collect::<Vec<_>>(),
5795 vec![Some(0), Some(1), Some(2)]
5796 );
5797 }
5798
5799 #[test]
5800 fn test_read_only_row_numbers() {
5801 let file = write_parquet_from_iter(vec![(
5802 "value",
5803 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5804 )]);
5805 let row_number_field = Arc::new(
5806 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5807 );
5808 let options = ArrowReaderOptions::new()
5809 .with_virtual_columns(vec![row_number_field.clone()])
5810 .unwrap();
5811 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5812 let num_columns = metadata
5813 .metadata
5814 .file_metadata()
5815 .schema_descr()
5816 .num_columns();
5817
5818 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5819 .with_projection(ProjectionMask::none(num_columns))
5820 .build()
5821 .expect("reader with schema");
5822
5823 let batch = arrow_reader.next().unwrap().unwrap();
5824 let schema = Arc::new(Schema::new(vec![row_number_field]));
5825
5826 assert_eq!(batch.schema(), schema);
5827 assert_eq!(batch.num_columns(), 1);
5828 assert_eq!(batch.num_rows(), 3);
5829 assert_eq!(
5830 batch
5831 .column(0)
5832 .as_primitive::<types::Int64Type>()
5833 .iter()
5834 .collect::<Vec<_>>(),
5835 vec![Some(0), Some(1), Some(2)]
5836 );
5837 }
5838
5839 #[test]
5840 fn test_read_row_numbers_row_group_order() -> Result<()> {
5841 let array = Int64Array::from_iter_values(5000..5100);
5843 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5844 let mut buffer = Vec::new();
5845 let options = WriterProperties::builder()
5846 .set_max_row_group_size(50)
5847 .build();
5848 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5849 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5851 writer.write(&batch_chunk)?;
5852 }
5853 writer.close()?;
5854
5855 let row_number_field = Arc::new(
5856 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5857 );
5858
5859 let buffer = Bytes::from(buffer);
5860
5861 let options =
5862 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5863
5864 let arrow_reader =
5866 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5867 .build()?;
5868
5869 assert_eq!(
5870 ValuesAndRowNumbers {
5871 values: (5000..5100).collect(),
5872 row_numbers: (0..100).collect()
5873 },
5874 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5875 );
5876
5877 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5879 .with_row_groups(vec![1, 0])
5880 .build()?;
5881
5882 assert_eq!(
5883 ValuesAndRowNumbers {
5884 values: (5050..5100).chain(5000..5050).collect(),
5885 row_numbers: (50..100).chain(0..50).collect(),
5886 },
5887 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5888 );
5889
5890 Ok(())
5891 }
5892
5893 #[derive(Debug, PartialEq)]
5894 struct ValuesAndRowNumbers {
5895 values: Vec<i64>,
5896 row_numbers: Vec<i64>,
5897 }
5898 impl ValuesAndRowNumbers {
5899 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5900 let mut values = vec![];
5901 let mut row_numbers = vec![];
5902 for batch in reader {
5903 let batch = batch.expect("Could not read batch");
5904 values.extend(
5905 batch
5906 .column_by_name("col")
5907 .expect("Could not get col column")
5908 .as_primitive::<arrow::datatypes::Int64Type>()
5909 .iter()
5910 .map(|v| v.expect("Could not get value")),
5911 );
5912
5913 row_numbers.extend(
5914 batch
5915 .column_by_name("row_number")
5916 .expect("Could not get row_number column")
5917 .as_primitive::<arrow::datatypes::Int64Type>()
5918 .iter()
5919 .map(|v| v.expect("Could not get row number"))
5920 .collect::<Vec<_>>(),
5921 );
5922 }
5923 Self {
5924 values,
5925 row_numbers,
5926 }
5927 }
5928 }
5929
5930 #[test]
5931 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5932 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5934 assert_eq!(
5935 ArrowReaderOptions::new()
5936 .with_virtual_columns(vec![regular_field])
5937 .unwrap_err()
5938 .to_string(),
5939 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5940 );
5941 }
5942
5943 #[test]
5944 fn test_row_numbers_with_multiple_row_groups() {
5945 test_row_numbers_with_multiple_row_groups_helper(
5946 false,
5947 |path, selection, _row_filter, batch_size| {
5948 let file = File::open(path).unwrap();
5949 let row_number_field = Arc::new(
5950 Field::new("row_number", ArrowDataType::Int64, false)
5951 .with_extension_type(RowNumber),
5952 );
5953 let options = ArrowReaderOptions::new()
5954 .with_virtual_columns(vec![row_number_field])
5955 .unwrap();
5956 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5957 .unwrap()
5958 .with_row_selection(selection)
5959 .with_batch_size(batch_size)
5960 .build()
5961 .expect("Could not create reader");
5962 reader
5963 .collect::<Result<Vec<_>, _>>()
5964 .expect("Could not read")
5965 },
5966 );
5967 }
5968
5969 #[test]
5970 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5971 test_row_numbers_with_multiple_row_groups_helper(
5972 true,
5973 |path, selection, row_filter, batch_size| {
5974 let file = File::open(path).unwrap();
5975 let row_number_field = Arc::new(
5976 Field::new("row_number", ArrowDataType::Int64, false)
5977 .with_extension_type(RowNumber),
5978 );
5979 let options = ArrowReaderOptions::new()
5980 .with_virtual_columns(vec![row_number_field])
5981 .unwrap();
5982 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5983 .unwrap()
5984 .with_row_selection(selection)
5985 .with_batch_size(batch_size)
5986 .with_row_filter(row_filter.expect("No filter"))
5987 .build()
5988 .expect("Could not create reader");
5989 reader
5990 .collect::<Result<Vec<_>, _>>()
5991 .expect("Could not read")
5992 },
5993 );
5994 }
5995
5996 #[test]
5997 fn test_read_row_group_indices() {
5998 let array1 = Int64Array::from(vec![1, 2]);
6000 let array2 = Int64Array::from(vec![3, 4]);
6001 let array3 = Int64Array::from(vec![5, 6]);
6002
6003 let batch1 =
6004 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6005 let batch2 =
6006 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6007 let batch3 =
6008 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
6009
6010 let mut buffer = Vec::new();
6011 let options = WriterProperties::builder()
6012 .set_max_row_group_size(2)
6013 .build();
6014 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6015 writer.write(&batch1).unwrap();
6016 writer.write(&batch2).unwrap();
6017 writer.write(&batch3).unwrap();
6018 writer.close().unwrap();
6019
6020 let file = Bytes::from(buffer);
6021 let row_group_index_field = Arc::new(
6022 Field::new("row_group_index", ArrowDataType::Int64, false)
6023 .with_extension_type(RowGroupIndex),
6024 );
6025
6026 let options = ArrowReaderOptions::new()
6027 .with_virtual_columns(vec![row_group_index_field.clone()])
6028 .unwrap();
6029 let mut arrow_reader =
6030 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
6031 .expect("reader builder with virtual columns")
6032 .build()
6033 .expect("reader with virtual columns");
6034
6035 let batch = arrow_reader.next().unwrap().unwrap();
6036
6037 assert_eq!(batch.num_columns(), 2);
6038 assert_eq!(batch.num_rows(), 6);
6039
6040 assert_eq!(
6041 batch
6042 .column(0)
6043 .as_primitive::<types::Int64Type>()
6044 .iter()
6045 .collect::<Vec<_>>(),
6046 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
6047 );
6048
6049 assert_eq!(
6050 batch
6051 .column(1)
6052 .as_primitive::<types::Int64Type>()
6053 .iter()
6054 .collect::<Vec<_>>(),
6055 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
6056 );
6057 }
6058
6059 #[test]
6060 fn test_read_only_row_group_indices() {
6061 let array1 = Int64Array::from(vec![1, 2, 3]);
6062 let array2 = Int64Array::from(vec![4, 5]);
6063
6064 let batch1 =
6065 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6066 let batch2 =
6067 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6068
6069 let mut buffer = Vec::new();
6070 let options = WriterProperties::builder()
6071 .set_max_row_group_size(3)
6072 .build();
6073 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6074 writer.write(&batch1).unwrap();
6075 writer.write(&batch2).unwrap();
6076 writer.close().unwrap();
6077
6078 let file = Bytes::from(buffer);
6079 let row_group_index_field = Arc::new(
6080 Field::new("row_group_index", ArrowDataType::Int64, false)
6081 .with_extension_type(RowGroupIndex),
6082 );
6083
6084 let options = ArrowReaderOptions::new()
6085 .with_virtual_columns(vec![row_group_index_field.clone()])
6086 .unwrap();
6087 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
6088 let num_columns = metadata
6089 .metadata
6090 .file_metadata()
6091 .schema_descr()
6092 .num_columns();
6093
6094 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
6095 .with_projection(ProjectionMask::none(num_columns))
6096 .build()
6097 .expect("reader with virtual columns only");
6098
6099 let batch = arrow_reader.next().unwrap().unwrap();
6100 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
6101
6102 assert_eq!(batch.schema(), schema);
6103 assert_eq!(batch.num_columns(), 1);
6104 assert_eq!(batch.num_rows(), 5);
6105
6106 assert_eq!(
6107 batch
6108 .column(0)
6109 .as_primitive::<types::Int64Type>()
6110 .iter()
6111 .collect::<Vec<_>>(),
6112 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
6113 );
6114 }
6115
6116 #[test]
6117 fn test_read_row_group_indices_with_selection() -> Result<()> {
6118 let mut buffer = Vec::new();
6119 let options = WriterProperties::builder()
6120 .set_max_row_group_size(10)
6121 .build();
6122
6123 let schema = Arc::new(Schema::new(vec![Field::new(
6124 "value",
6125 ArrowDataType::Int64,
6126 false,
6127 )]));
6128
6129 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
6130
6131 for i in 0..3 {
6133 let start = i * 10;
6134 let array = Int64Array::from_iter_values(start..start + 10);
6135 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
6136 writer.write(&batch)?;
6137 }
6138 writer.close()?;
6139
6140 let file = Bytes::from(buffer);
6141 let row_group_index_field = Arc::new(
6142 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
6143 );
6144
6145 let options =
6146 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
6147
6148 let arrow_reader =
6150 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
6151 .with_row_groups(vec![2, 1, 0])
6152 .build()?;
6153
6154 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
6155 let combined = concat_batches(&batches[0].schema(), &batches)?;
6156
6157 let values = combined.column(0).as_primitive::<types::Int64Type>();
6158 let first_val = values.value(0);
6159 let last_val = values.value(combined.num_rows() - 1);
6160 assert_eq!(first_val, 20);
6162 assert_eq!(last_val, 9);
6164
6165 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
6166 assert_eq!(rg_indices.value(0), 2);
6167 assert_eq!(rg_indices.value(10), 1);
6168 assert_eq!(rg_indices.value(20), 0);
6169
6170 Ok(())
6171 }
6172
6173 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
6174 use_filter: bool,
6175 test_case: F,
6176 ) where
6177 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
6178 {
6179 let seed: u64 = random();
6180 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
6181 let mut rng = StdRng::seed_from_u64(seed);
6182
6183 use tempfile::TempDir;
6184 let tempdir = TempDir::new().expect("Could not create temp dir");
6185
6186 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
6187
6188 let path = tempdir.path().join("test.parquet");
6189 std::fs::write(&path, bytes).expect("Could not write file");
6190
6191 let mut case = vec![];
6192 let mut remaining = metadata.file_metadata().num_rows();
6193 while remaining > 0 {
6194 let row_count = rng.random_range(1..=remaining);
6195 remaining -= row_count;
6196 case.push(RowSelector {
6197 row_count: row_count as usize,
6198 skip: rng.random_bool(0.5),
6199 });
6200 }
6201
6202 let filter = use_filter.then(|| {
6203 let filter = (0..metadata.file_metadata().num_rows())
6204 .map(|_| rng.random_bool(0.99))
6205 .collect::<Vec<_>>();
6206 let mut filter_offset = 0;
6207 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
6208 ProjectionMask::all(),
6209 move |b| {
6210 let array = BooleanArray::from_iter(
6211 filter
6212 .iter()
6213 .skip(filter_offset)
6214 .take(b.num_rows())
6215 .map(|x| Some(*x)),
6216 );
6217 filter_offset += b.num_rows();
6218 Ok(array)
6219 },
6220 ))])
6221 });
6222
6223 let selection = RowSelection::from(case);
6224 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
6225
6226 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
6227 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
6228 return;
6229 }
6230 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
6231 .expect("Failed to concatenate");
6232 let values = actual
6234 .column(0)
6235 .as_primitive::<types::Int64Type>()
6236 .iter()
6237 .collect::<Vec<_>>();
6238 let row_numbers = actual
6239 .column(1)
6240 .as_primitive::<types::Int64Type>()
6241 .iter()
6242 .collect::<Vec<_>>();
6243 assert_eq!(
6244 row_numbers
6245 .into_iter()
6246 .map(|number| number.map(|number| number + 1))
6247 .collect::<Vec<_>>(),
6248 values
6249 );
6250 }
6251
6252 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
6253 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
6254 "value",
6255 ArrowDataType::Int64,
6256 false,
6257 )])));
6258
6259 let mut buf = Vec::with_capacity(1024);
6260 let mut writer =
6261 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
6262
6263 let mut values = 1..=rng.random_range(1..4096);
6264 while !values.is_empty() {
6265 let batch_values = values
6266 .by_ref()
6267 .take(rng.random_range(1..4096))
6268 .collect::<Vec<_>>();
6269 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
6270 let batch =
6271 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
6272 writer.write(&batch).expect("Could not write batch");
6273 writer.flush().expect("Could not flush");
6274 }
6275 let metadata = writer.close().expect("Could not close writer");
6276
6277 (Bytes::from(buf), metadata)
6278 }
6279}