1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub struct ArrowReaderBuilder<T> {
108 pub(crate) input: T,
116
117 pub(crate) metadata: Arc<ParquetMetaData>,
118
119 pub(crate) schema: SchemaRef,
120
121 pub(crate) fields: Option<Arc<ParquetField>>,
122
123 pub(crate) batch_size: usize,
124
125 pub(crate) row_groups: Option<Vec<usize>>,
126
127 pub(crate) projection: ProjectionMask,
128
129 pub(crate) filter: Option<RowFilter>,
130
131 pub(crate) selection: Option<RowSelection>,
132
133 pub(crate) row_selection_policy: RowSelectionPolicy,
134
135 pub(crate) limit: Option<usize>,
136
137 pub(crate) offset: Option<usize>,
138
139 pub(crate) metrics: ArrowReaderMetrics,
140
141 pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("ArrowReaderBuilder<T>")
147 .field("input", &self.input)
148 .field("metadata", &self.metadata)
149 .field("schema", &self.schema)
150 .field("fields", &self.fields)
151 .field("batch_size", &self.batch_size)
152 .field("row_groups", &self.row_groups)
153 .field("projection", &self.projection)
154 .field("filter", &self.filter)
155 .field("selection", &self.selection)
156 .field("row_selection_policy", &self.row_selection_policy)
157 .field("limit", &self.limit)
158 .field("offset", &self.offset)
159 .field("metrics", &self.metrics)
160 .finish()
161 }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166 Self {
167 input,
168 metadata: metadata.metadata,
169 schema: metadata.schema,
170 fields: metadata.fields,
171 batch_size: 1024,
172 row_groups: None,
173 projection: ProjectionMask::all(),
174 filter: None,
175 selection: None,
176 row_selection_policy: RowSelectionPolicy::default(),
177 limit: None,
178 offset: None,
179 metrics: ArrowReaderMetrics::Disabled,
180 max_predicate_cache_size: 100 * 1024 * 1024, }
182 }
183
184 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186 &self.metadata
187 }
188
189 pub fn parquet_schema(&self) -> &SchemaDescriptor {
191 self.metadata.file_metadata().schema_descr()
192 }
193
194 pub fn schema(&self) -> &SchemaRef {
196 &self.schema
197 }
198
199 pub fn with_batch_size(self, batch_size: usize) -> Self {
202 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204 Self { batch_size, ..self }
205 }
206
207 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211 Self {
212 row_groups: Some(row_groups),
213 ..self
214 }
215 }
216
217 pub fn with_projection(self, mask: ProjectionMask) -> Self {
219 Self {
220 projection: mask,
221 ..self
222 }
223 }
224
225 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229 Self {
230 row_selection_policy: policy,
231 ..self
232 }
233 }
234
235 pub fn with_row_selection(self, selection: RowSelection) -> Self {
295 Self {
296 selection: Some(selection),
297 ..self
298 }
299 }
300
301 pub fn with_row_filter(self, filter: RowFilter) -> Self {
341 Self {
342 filter: Some(filter),
343 ..self
344 }
345 }
346
347 pub fn with_limit(self, limit: usize) -> Self {
355 Self {
356 limit: Some(limit),
357 ..self
358 }
359 }
360
361 pub fn with_offset(self, offset: usize) -> Self {
369 Self {
370 offset: Some(offset),
371 ..self
372 }
373 }
374
375 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410 Self { metrics, ..self }
411 }
412
413 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428 Self {
429 max_predicate_cache_size,
430 ..self
431 }
432 }
433}
434
435#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451 skip_arrow_metadata: bool,
453 supplied_schema: Option<SchemaRef>,
458
459 pub(crate) column_index: PageIndexPolicy,
460 pub(crate) offset_index: PageIndexPolicy,
461
462 metadata_options: ParquetMetaDataOptions,
464 #[cfg(feature = "encryption")]
466 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468 virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472 pub fn new() -> Self {
474 Self::default()
475 }
476
477 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484 Self {
485 skip_arrow_metadata,
486 ..self
487 }
488 }
489
490 pub fn with_schema(self, schema: SchemaRef) -> Self {
599 Self {
600 supplied_schema: Some(schema),
601 skip_arrow_metadata: true,
602 ..self
603 }
604 }
605
606 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607 pub fn with_page_index(self, page_index: bool) -> Self {
620 self.with_page_index_policy(PageIndexPolicy::from(page_index))
621 }
622
623 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631 self.with_column_index_policy(policy)
632 .with_offset_index_policy(policy)
633 }
634
635 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642 self.column_index = policy;
643 self
644 }
645
646 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653 self.offset_index = policy;
654 self
655 }
656
657 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663 self.metadata_options.set_schema(schema);
664 self
665 }
666
667 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678 self.metadata_options.set_encoding_stats_as_mask(val);
679 self
680 }
681
682 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687 self.metadata_options.set_encoding_stats_policy(policy);
688 self
689 }
690
691 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696 self.metadata_options.set_column_stats_policy(policy);
697 self
698 }
699
700 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705 self.metadata_options.set_size_stats_policy(policy);
706 self
707 }
708
709 #[cfg(feature = "encryption")]
713 pub fn with_file_decryption_properties(
714 self,
715 file_decryption_properties: Arc<FileDecryptionProperties>,
716 ) -> Self {
717 Self {
718 file_decryption_properties: Some(file_decryption_properties),
719 ..self
720 }
721 }
722
723 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776 for field in &virtual_columns {
778 if !is_virtual_column(field) {
779 return Err(ParquetError::General(format!(
780 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781 field.name()
782 )));
783 }
784 }
785 Ok(Self {
786 virtual_columns,
787 ..self
788 })
789 }
790
791 #[deprecated(
792 since = "57.2.0",
793 note = "Use `column_index_policy` or `offset_index_policy` instead"
794 )]
795 pub fn page_index(&self) -> bool {
802 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803 }
804
805 pub fn offset_index_policy(&self) -> PageIndexPolicy {
810 self.offset_index
811 }
812
813 pub fn column_index_policy(&self) -> PageIndexPolicy {
818 self.column_index
819 }
820
821 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823 &self.metadata_options
824 }
825
826 #[cfg(feature = "encryption")]
831 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832 self.file_decryption_properties.as_ref()
833 }
834}
835
836#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852 pub(crate) metadata: Arc<ParquetMetaData>,
854 pub(crate) schema: SchemaRef,
856 pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873 let metadata = ParquetMetaDataReader::new()
874 .with_column_index_policy(options.column_index)
875 .with_offset_index_policy(options.offset_index)
876 .with_metadata_options(Some(options.metadata_options.clone()));
877 #[cfg(feature = "encryption")]
878 let metadata = metadata.with_decryption_properties(
879 options.file_decryption_properties.as_ref().map(Arc::clone),
880 );
881 let metadata = metadata.parse_and_finish(reader)?;
882 Self::try_new(Arc::new(metadata), options)
883 }
884
885 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893 match options.supplied_schema {
894 Some(supplied_schema) => Self::with_supplied_schema(
895 metadata,
896 supplied_schema.clone(),
897 &options.virtual_columns,
898 ),
899 None => {
900 let kv_metadata = match options.skip_arrow_metadata {
901 true => None,
902 false => metadata.file_metadata().key_value_metadata(),
903 };
904
905 let (schema, fields) = parquet_to_arrow_schema_and_fields(
906 metadata.file_metadata().schema_descr(),
907 ProjectionMask::all(),
908 kv_metadata,
909 &options.virtual_columns,
910 )?;
911
912 Ok(Self {
913 metadata,
914 schema: Arc::new(schema),
915 fields: fields.map(Arc::new),
916 })
917 }
918 }
919 }
920
921 fn with_supplied_schema(
922 metadata: Arc<ParquetMetaData>,
923 supplied_schema: SchemaRef,
924 virtual_columns: &[FieldRef],
925 ) -> Result<Self> {
926 let parquet_schema = metadata.file_metadata().schema_descr();
927 let field_levels = parquet_to_arrow_field_levels_with_virtual(
928 parquet_schema,
929 ProjectionMask::all(),
930 Some(supplied_schema.fields()),
931 virtual_columns,
932 )?;
933 let fields = field_levels.fields;
934 let inferred_len = fields.len();
935 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936 if inferred_len != supplied_len {
940 return Err(arrow_err!(format!(
941 "Incompatible supplied Arrow schema: expected {} columns received {}",
942 inferred_len, supplied_len
943 )));
944 }
945
946 let mut errors = Vec::new();
947
948 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950 for (field1, field2) in field_iter {
951 if field1.data_type() != field2.data_type() {
952 errors.push(format!(
953 "data type mismatch for field {}: requested {} but found {}",
954 field1.name(),
955 field1.data_type(),
956 field2.data_type()
957 ));
958 }
959 if field1.is_nullable() != field2.is_nullable() {
960 errors.push(format!(
961 "nullability mismatch for field {}: expected {:?} but found {:?}",
962 field1.name(),
963 field1.is_nullable(),
964 field2.is_nullable()
965 ));
966 }
967 if field1.metadata() != field2.metadata() {
968 errors.push(format!(
969 "metadata mismatch for field {}: expected {:?} but found {:?}",
970 field1.name(),
971 field1.metadata(),
972 field2.metadata()
973 ));
974 }
975 }
976
977 if !errors.is_empty() {
978 let message = errors.join(", ");
979 return Err(ParquetError::ArrowError(format!(
980 "Incompatible supplied Arrow schema: {message}",
981 )));
982 }
983
984 Ok(Self {
985 metadata,
986 schema: supplied_schema,
987 fields: field_levels.levels.map(Arc::new),
988 })
989 }
990
991 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993 &self.metadata
994 }
995
996 pub fn parquet_schema(&self) -> &SchemaDescriptor {
998 self.metadata.file_metadata().schema_descr()
999 }
1000
1001 pub fn schema(&self) -> &SchemaRef {
1003 &self.schema
1004 }
1005}
1006
1007#[doc(hidden)]
1008pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013 f.debug_tuple("SyncReader").field(&self.0).finish()
1014 }
1015}
1016
1017pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026 pub fn try_new(reader: T) -> Result<Self> {
1057 Self::try_new_with_options(reader, Default::default())
1058 }
1059
1060 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066 Ok(Self::new_with_metadata(reader, metadata))
1067 }
1068
1069 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108 Self::new_builder(SyncReader(input), metadata)
1109 }
1110
1111 pub fn get_row_group_column_bloom_filter(
1117 &self,
1118 row_group_idx: usize,
1119 column_idx: usize,
1120 ) -> Result<Option<Sbbf>> {
1121 let metadata = self.metadata.row_group(row_group_idx);
1122 let column_metadata = metadata.column(column_idx);
1123
1124 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125 offset
1126 .try_into()
1127 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128 } else {
1129 return Ok(None);
1130 };
1131
1132 let buffer = match column_metadata.bloom_filter_length() {
1133 Some(length) => self.input.0.get_bytes(offset, length as usize),
1134 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135 }?;
1136
1137 let (header, bitset_offset) =
1138 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140 match header.algorithm {
1141 BloomFilterAlgorithm::BLOCK => {
1142 }
1144 }
1145 match header.compression {
1146 BloomFilterCompression::UNCOMPRESSED => {
1147 }
1149 }
1150 match header.hash {
1151 BloomFilterHash::XXHASH => {
1152 }
1154 }
1155
1156 let bitset = match column_metadata.bloom_filter_length() {
1157 Some(_) => buffer.slice(
1158 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159 - TryInto::<usize>::try_into(offset).unwrap())..,
1160 ),
1161 None => {
1162 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163 ParquetError::General("Bloom filter length is invalid".to_string())
1164 })?;
1165 self.input.0.get_bytes(bitset_offset, bitset_length)?
1166 }
1167 };
1168 Ok(Some(Sbbf::new(&bitset)))
1169 }
1170
1171 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175 let Self {
1176 input,
1177 metadata,
1178 schema: _,
1179 fields,
1180 batch_size,
1181 row_groups,
1182 projection,
1183 mut filter,
1184 selection,
1185 row_selection_policy,
1186 limit,
1187 offset,
1188 metrics,
1189 max_predicate_cache_size: _,
1191 } = self;
1192
1193 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198 let reader = ReaderRowGroups {
1199 reader: Arc::new(input.0),
1200 metadata,
1201 row_groups,
1202 };
1203
1204 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205 .with_selection(selection)
1206 .with_row_selection_policy(row_selection_policy);
1207
1208 if let Some(filter) = filter.as_mut() {
1210 for predicate in filter.predicates.iter_mut() {
1211 if !plan_builder.selects_any() {
1213 break;
1214 }
1215
1216 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1217 .with_parquet_metadata(&reader.metadata)
1218 .build_array_reader(fields.as_deref(), predicate.projection())?;
1219
1220 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1221 }
1222 }
1223
1224 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1225 .with_parquet_metadata(&reader.metadata)
1226 .build_array_reader(fields.as_deref(), &projection)?;
1227
1228 let read_plan = plan_builder
1229 .limited(reader.num_rows())
1230 .with_offset(offset)
1231 .with_limit(limit)
1232 .build_limited()
1233 .build();
1234
1235 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1236 }
1237}
1238
1239struct ReaderRowGroups<T: ChunkReader> {
1240 reader: Arc<T>,
1241
1242 metadata: Arc<ParquetMetaData>,
1243 row_groups: Vec<usize>,
1245}
1246
1247impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1248 fn num_rows(&self) -> usize {
1249 let meta = self.metadata.row_groups();
1250 self.row_groups
1251 .iter()
1252 .map(|x| meta[*x].num_rows() as usize)
1253 .sum()
1254 }
1255
1256 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1257 Ok(Box::new(ReaderPageIterator {
1258 column_idx: i,
1259 reader: self.reader.clone(),
1260 metadata: self.metadata.clone(),
1261 row_groups: self.row_groups.clone().into_iter(),
1262 }))
1263 }
1264
1265 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1266 Box::new(
1267 self.row_groups
1268 .iter()
1269 .map(move |i| self.metadata.row_group(*i)),
1270 )
1271 }
1272
1273 fn metadata(&self) -> &ParquetMetaData {
1274 self.metadata.as_ref()
1275 }
1276}
1277
1278struct ReaderPageIterator<T: ChunkReader> {
1279 reader: Arc<T>,
1280 column_idx: usize,
1281 row_groups: std::vec::IntoIter<usize>,
1282 metadata: Arc<ParquetMetaData>,
1283}
1284
1285impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1286 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1288 let rg = self.metadata.row_group(rg_idx);
1289 let column_chunk_metadata = rg.column(self.column_idx);
1290 let offset_index = self.metadata.offset_index();
1291 let page_locations = offset_index
1294 .filter(|i| !i[rg_idx].is_empty())
1295 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1296 let total_rows = rg.num_rows() as usize;
1297 let reader = self.reader.clone();
1298
1299 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1300 .add_crypto_context(
1301 rg_idx,
1302 self.column_idx,
1303 self.metadata.as_ref(),
1304 column_chunk_metadata,
1305 )
1306 }
1307}
1308
1309impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1310 type Item = Result<Box<dyn PageReader>>;
1311
1312 fn next(&mut self) -> Option<Self::Item> {
1313 let rg_idx = self.row_groups.next()?;
1314 let page_reader = self
1315 .next_page_reader(rg_idx)
1316 .map(|page_reader| Box::new(page_reader) as _);
1317 Some(page_reader)
1318 }
1319}
1320
1321impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1322
1323pub struct ParquetRecordBatchReader {
1334 array_reader: Box<dyn ArrayReader>,
1335 schema: SchemaRef,
1336 read_plan: ReadPlan,
1337}
1338
1339impl Debug for ParquetRecordBatchReader {
1340 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1341 f.debug_struct("ParquetRecordBatchReader")
1342 .field("array_reader", &"...")
1343 .field("schema", &self.schema)
1344 .field("read_plan", &self.read_plan)
1345 .finish()
1346 }
1347}
1348
1349impl Iterator for ParquetRecordBatchReader {
1350 type Item = Result<RecordBatch, ArrowError>;
1351
1352 fn next(&mut self) -> Option<Self::Item> {
1353 self.next_inner()
1354 .map_err(|arrow_err| arrow_err.into())
1355 .transpose()
1356 }
1357}
1358
1359impl ParquetRecordBatchReader {
1360 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1366 let mut read_records = 0;
1367 let batch_size = self.batch_size();
1368 if batch_size == 0 {
1369 return Ok(None);
1370 }
1371 match self.read_plan.row_selection_cursor_mut() {
1372 RowSelectionCursor::Mask(mask_cursor) => {
1373 while !mask_cursor.is_empty() {
1376 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1377 return Ok(None);
1378 };
1379
1380 if mask_chunk.initial_skip > 0 {
1381 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1382 if skipped != mask_chunk.initial_skip {
1383 return Err(general_err!(
1384 "failed to skip rows, expected {}, got {}",
1385 mask_chunk.initial_skip,
1386 skipped
1387 ));
1388 }
1389 }
1390
1391 if mask_chunk.chunk_rows == 0 {
1392 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1393 return Ok(None);
1394 }
1395 continue;
1396 }
1397
1398 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1399
1400 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1401 if read == 0 {
1402 return Err(general_err!(
1403 "reached end of column while expecting {} rows",
1404 mask_chunk.chunk_rows
1405 ));
1406 }
1407 if read != mask_chunk.chunk_rows {
1408 return Err(general_err!(
1409 "insufficient rows read from array reader - expected {}, got {}",
1410 mask_chunk.chunk_rows,
1411 read
1412 ));
1413 }
1414
1415 let array = self.array_reader.consume_batch()?;
1416 let struct_array = array.as_struct_opt().ok_or_else(|| {
1419 ArrowError::ParquetError(
1420 "Struct array reader should return struct array".to_string(),
1421 )
1422 })?;
1423
1424 let filtered_batch =
1425 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1426
1427 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1428 return Err(general_err!(
1429 "filtered rows mismatch selection - expected {}, got {}",
1430 mask_chunk.selected_rows,
1431 filtered_batch.num_rows()
1432 ));
1433 }
1434
1435 if filtered_batch.num_rows() == 0 {
1436 continue;
1437 }
1438
1439 return Ok(Some(filtered_batch));
1440 }
1441 }
1442 RowSelectionCursor::Selectors(selectors_cursor) => {
1443 while read_records < batch_size && !selectors_cursor.is_empty() {
1444 let front = selectors_cursor.next_selector();
1445 if front.skip {
1446 let skipped = self.array_reader.skip_records(front.row_count)?;
1447
1448 if skipped != front.row_count {
1449 return Err(general_err!(
1450 "failed to skip rows, expected {}, got {}",
1451 front.row_count,
1452 skipped
1453 ));
1454 }
1455 continue;
1456 }
1457
1458 if front.row_count == 0 {
1461 continue;
1462 }
1463
1464 let need_read = batch_size - read_records;
1466 let to_read = match front.row_count.checked_sub(need_read) {
1467 Some(remaining) if remaining != 0 => {
1468 selectors_cursor.return_selector(RowSelector::select(remaining));
1471 need_read
1472 }
1473 _ => front.row_count,
1474 };
1475 match self.array_reader.read_records(to_read)? {
1476 0 => break,
1477 rec => read_records += rec,
1478 };
1479 }
1480 }
1481 RowSelectionCursor::All => {
1482 self.array_reader.read_records(batch_size)?;
1483 }
1484 };
1485
1486 let array = self.array_reader.consume_batch()?;
1487 let struct_array = array.as_struct_opt().ok_or_else(|| {
1488 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1489 })?;
1490
1491 Ok(if struct_array.len() > 0 {
1492 Some(RecordBatch::from(struct_array))
1493 } else {
1494 None
1495 })
1496 }
1497}
1498
1499impl RecordBatchReader for ParquetRecordBatchReader {
1500 fn schema(&self) -> SchemaRef {
1505 self.schema.clone()
1506 }
1507}
1508
1509impl ParquetRecordBatchReader {
1510 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1514 ParquetRecordBatchReaderBuilder::try_new(reader)?
1515 .with_batch_size(batch_size)
1516 .build()
1517 }
1518
1519 pub fn try_new_with_row_groups(
1524 levels: &FieldLevels,
1525 row_groups: &dyn RowGroups,
1526 batch_size: usize,
1527 selection: Option<RowSelection>,
1528 ) -> Result<Self> {
1529 let metrics = ArrowReaderMetrics::disabled();
1531 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1532 .with_parquet_metadata(row_groups.metadata())
1533 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1534
1535 let read_plan = ReadPlanBuilder::new(batch_size)
1536 .with_selection(selection)
1537 .build();
1538
1539 Ok(Self {
1540 array_reader,
1541 schema: Arc::new(Schema::new(levels.fields.clone())),
1542 read_plan,
1543 })
1544 }
1545
1546 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1550 let schema = match array_reader.get_data_type() {
1551 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1552 _ => unreachable!("Struct array reader's data type is not struct!"),
1553 };
1554
1555 Self {
1556 array_reader,
1557 schema: Arc::new(schema),
1558 read_plan,
1559 }
1560 }
1561
1562 #[inline(always)]
1563 pub(crate) fn batch_size(&self) -> usize {
1564 self.read_plan.batch_size()
1565 }
1566}
1567
1568#[cfg(test)]
1569pub(crate) mod tests {
1570 use std::cmp::min;
1571 use std::collections::{HashMap, VecDeque};
1572 use std::fmt::Formatter;
1573 use std::fs::File;
1574 use std::io::Seek;
1575 use std::path::PathBuf;
1576 use std::sync::Arc;
1577
1578 use rand::rngs::StdRng;
1579 use rand::{Rng, RngCore, SeedableRng, random, rng};
1580 use tempfile::tempfile;
1581
1582 use crate::arrow::arrow_reader::{
1583 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1584 ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1585 RowSelectionPolicy, RowSelector,
1586 };
1587 use crate::arrow::schema::{
1588 add_encoded_arrow_schema_to_metadata,
1589 virtual_type::{RowGroupIndex, RowNumber},
1590 };
1591 use crate::arrow::{ArrowWriter, ProjectionMask};
1592 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1593 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1594 use crate::data_type::{
1595 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1596 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1597 };
1598 use crate::errors::Result;
1599 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1600 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1601 use crate::file::writer::SerializedFileWriter;
1602 use crate::schema::parser::parse_message_type;
1603 use crate::schema::types::{Type, TypePtr};
1604 use crate::util::test_common::rand_gen::RandGen;
1605 use arrow::compute::kernels::cmp::eq;
1606 use arrow::compute::or;
1607 use arrow_array::builder::*;
1608 use arrow_array::cast::AsArray;
1609 use arrow_array::types::{
1610 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1611 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1612 Time64MicrosecondType,
1613 };
1614 use arrow_array::*;
1615 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1616 use arrow_data::{ArrayData, ArrayDataBuilder};
1617 use arrow_schema::{
1618 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1619 };
1620 use arrow_select::concat::concat_batches;
1621 use bytes::Bytes;
1622 use half::f16;
1623 use num_traits::PrimInt;
1624
1625 #[test]
1626 fn test_arrow_reader_all_columns() {
1627 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1628
1629 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1630 let original_schema = Arc::clone(builder.schema());
1631 let reader = builder.build().unwrap();
1632
1633 assert_eq!(original_schema.fields(), reader.schema().fields());
1635 }
1636
1637 #[test]
1638 fn test_reuse_schema() {
1639 let file = get_test_file("parquet/alltypes-java.parquet");
1640
1641 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1642 let expected = builder.metadata;
1643 let schema = expected.file_metadata().schema_descr_ptr();
1644
1645 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1646 let builder =
1647 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1648
1649 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1651 }
1652
1653 #[test]
1654 fn test_page_encoding_stats_mask() {
1655 let testdata = arrow::util::test_util::parquet_test_data();
1656 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1657 let file = File::open(path).unwrap();
1658
1659 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1660 let builder =
1661 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1662
1663 let row_group_metadata = builder.metadata.row_group(0);
1664
1665 let page_encoding_stats = row_group_metadata
1667 .column(0)
1668 .page_encoding_stats_mask()
1669 .unwrap();
1670 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1671 let page_encoding_stats = row_group_metadata
1672 .column(2)
1673 .page_encoding_stats_mask()
1674 .unwrap();
1675 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1676 }
1677
1678 #[test]
1679 fn test_stats_stats_skipped() {
1680 let testdata = arrow::util::test_util::parquet_test_data();
1681 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1682 let file = File::open(path).unwrap();
1683
1684 let arrow_options = ArrowReaderOptions::new()
1686 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1687 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1688 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1689 file.try_clone().unwrap(),
1690 arrow_options,
1691 )
1692 .unwrap();
1693
1694 let row_group_metadata = builder.metadata.row_group(0);
1695 for column in row_group_metadata.columns() {
1696 assert!(column.page_encoding_stats().is_none());
1697 assert!(column.page_encoding_stats_mask().is_none());
1698 assert!(column.statistics().is_none());
1699 }
1700
1701 let arrow_options = ArrowReaderOptions::new()
1703 .with_encoding_stats_as_mask(true)
1704 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1705 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1706 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1707 file.try_clone().unwrap(),
1708 arrow_options,
1709 )
1710 .unwrap();
1711
1712 let row_group_metadata = builder.metadata.row_group(0);
1713 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1714 assert!(column.page_encoding_stats().is_none());
1715 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1716 assert_eq!(column.statistics().is_some(), idx == 0);
1717 }
1718 }
1719
1720 #[test]
1721 fn test_size_stats_stats_skipped() {
1722 let testdata = arrow::util::test_util::parquet_test_data();
1723 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1724 let file = File::open(path).unwrap();
1725
1726 let arrow_options =
1728 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1729 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1730 file.try_clone().unwrap(),
1731 arrow_options,
1732 )
1733 .unwrap();
1734
1735 let row_group_metadata = builder.metadata.row_group(0);
1736 for column in row_group_metadata.columns() {
1737 assert!(column.repetition_level_histogram().is_none());
1738 assert!(column.definition_level_histogram().is_none());
1739 assert!(column.unencoded_byte_array_data_bytes().is_none());
1740 }
1741
1742 let arrow_options = ArrowReaderOptions::new()
1744 .with_encoding_stats_as_mask(true)
1745 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1746 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1747 file.try_clone().unwrap(),
1748 arrow_options,
1749 )
1750 .unwrap();
1751
1752 let row_group_metadata = builder.metadata.row_group(0);
1753 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1754 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1755 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1756 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1757 }
1758 }
1759
1760 #[test]
1761 fn test_arrow_reader_single_column() {
1762 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1763
1764 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1765 let original_schema = Arc::clone(builder.schema());
1766
1767 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1768 let reader = builder.with_projection(mask).build().unwrap();
1769
1770 assert_eq!(1, reader.schema().fields().len());
1772 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1773 }
1774
1775 #[test]
1776 fn test_arrow_reader_single_column_by_name() {
1777 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1778
1779 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1780 let original_schema = Arc::clone(builder.schema());
1781
1782 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1783 let reader = builder.with_projection(mask).build().unwrap();
1784
1785 assert_eq!(1, reader.schema().fields().len());
1787 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1788 }
1789
1790 #[test]
1791 fn test_null_column_reader_test() {
1792 let mut file = tempfile::tempfile().unwrap();
1793
1794 let schema = "
1795 message message {
1796 OPTIONAL INT32 int32;
1797 }
1798 ";
1799 let schema = Arc::new(parse_message_type(schema).unwrap());
1800
1801 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1802 generate_single_column_file_with_data::<Int32Type>(
1803 &[vec![], vec![]],
1804 Some(&def_levels),
1805 file.try_clone().unwrap(), schema,
1807 Some(Field::new("int32", ArrowDataType::Null, true)),
1808 &Default::default(),
1809 )
1810 .unwrap();
1811
1812 file.rewind().unwrap();
1813
1814 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1815 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1816
1817 assert_eq!(batches.len(), 4);
1818 for batch in &batches[0..3] {
1819 assert_eq!(batch.num_rows(), 2);
1820 assert_eq!(batch.num_columns(), 1);
1821 assert_eq!(batch.column(0).null_count(), 2);
1822 }
1823
1824 assert_eq!(batches[3].num_rows(), 1);
1825 assert_eq!(batches[3].num_columns(), 1);
1826 assert_eq!(batches[3].column(0).null_count(), 1);
1827 }
1828
1829 #[test]
1830 fn test_primitive_single_column_reader_test() {
1831 run_single_column_reader_tests::<BoolType, _, BoolType>(
1832 2,
1833 ConvertedType::NONE,
1834 None,
1835 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1836 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1837 );
1838 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1839 2,
1840 ConvertedType::NONE,
1841 None,
1842 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1843 &[
1844 Encoding::PLAIN,
1845 Encoding::RLE_DICTIONARY,
1846 Encoding::DELTA_BINARY_PACKED,
1847 Encoding::BYTE_STREAM_SPLIT,
1848 ],
1849 );
1850 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1851 2,
1852 ConvertedType::NONE,
1853 None,
1854 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1855 &[
1856 Encoding::PLAIN,
1857 Encoding::RLE_DICTIONARY,
1858 Encoding::DELTA_BINARY_PACKED,
1859 Encoding::BYTE_STREAM_SPLIT,
1860 ],
1861 );
1862 run_single_column_reader_tests::<FloatType, _, FloatType>(
1863 2,
1864 ConvertedType::NONE,
1865 None,
1866 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1867 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1868 );
1869 }
1870
1871 #[test]
1872 fn test_unsigned_primitive_single_column_reader_test() {
1873 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1874 2,
1875 ConvertedType::UINT_32,
1876 Some(ArrowDataType::UInt32),
1877 |vals| {
1878 Arc::new(UInt32Array::from_iter(
1879 vals.iter().map(|x| x.map(|x| x as u32)),
1880 ))
1881 },
1882 &[
1883 Encoding::PLAIN,
1884 Encoding::RLE_DICTIONARY,
1885 Encoding::DELTA_BINARY_PACKED,
1886 ],
1887 );
1888 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1889 2,
1890 ConvertedType::UINT_64,
1891 Some(ArrowDataType::UInt64),
1892 |vals| {
1893 Arc::new(UInt64Array::from_iter(
1894 vals.iter().map(|x| x.map(|x| x as u64)),
1895 ))
1896 },
1897 &[
1898 Encoding::PLAIN,
1899 Encoding::RLE_DICTIONARY,
1900 Encoding::DELTA_BINARY_PACKED,
1901 ],
1902 );
1903 }
1904
1905 #[test]
1906 fn test_unsigned_roundtrip() {
1907 let schema = Arc::new(Schema::new(vec![
1908 Field::new("uint32", ArrowDataType::UInt32, true),
1909 Field::new("uint64", ArrowDataType::UInt64, true),
1910 ]));
1911
1912 let mut buf = Vec::with_capacity(1024);
1913 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1914
1915 let original = RecordBatch::try_new(
1916 schema,
1917 vec![
1918 Arc::new(UInt32Array::from_iter_values([
1919 0,
1920 i32::MAX as u32,
1921 u32::MAX,
1922 ])),
1923 Arc::new(UInt64Array::from_iter_values([
1924 0,
1925 i64::MAX as u64,
1926 u64::MAX,
1927 ])),
1928 ],
1929 )
1930 .unwrap();
1931
1932 writer.write(&original).unwrap();
1933 writer.close().unwrap();
1934
1935 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1936 let ret = reader.next().unwrap().unwrap();
1937 assert_eq!(ret, original);
1938
1939 ret.column(0)
1941 .as_any()
1942 .downcast_ref::<UInt32Array>()
1943 .unwrap();
1944
1945 ret.column(1)
1946 .as_any()
1947 .downcast_ref::<UInt64Array>()
1948 .unwrap();
1949 }
1950
1951 #[test]
1952 fn test_float16_roundtrip() -> Result<()> {
1953 let schema = Arc::new(Schema::new(vec![
1954 Field::new("float16", ArrowDataType::Float16, false),
1955 Field::new("float16-nullable", ArrowDataType::Float16, true),
1956 ]));
1957
1958 let mut buf = Vec::with_capacity(1024);
1959 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1960
1961 let original = RecordBatch::try_new(
1962 schema,
1963 vec![
1964 Arc::new(Float16Array::from_iter_values([
1965 f16::EPSILON,
1966 f16::MIN,
1967 f16::MAX,
1968 f16::NAN,
1969 f16::INFINITY,
1970 f16::NEG_INFINITY,
1971 f16::ONE,
1972 f16::NEG_ONE,
1973 f16::ZERO,
1974 f16::NEG_ZERO,
1975 f16::E,
1976 f16::PI,
1977 f16::FRAC_1_PI,
1978 ])),
1979 Arc::new(Float16Array::from(vec![
1980 None,
1981 None,
1982 None,
1983 Some(f16::NAN),
1984 Some(f16::INFINITY),
1985 Some(f16::NEG_INFINITY),
1986 None,
1987 None,
1988 None,
1989 None,
1990 None,
1991 None,
1992 Some(f16::FRAC_1_PI),
1993 ])),
1994 ],
1995 )?;
1996
1997 writer.write(&original)?;
1998 writer.close()?;
1999
2000 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2001 let ret = reader.next().unwrap()?;
2002 assert_eq!(ret, original);
2003
2004 ret.column(0).as_primitive::<Float16Type>();
2006 ret.column(1).as_primitive::<Float16Type>();
2007
2008 Ok(())
2009 }
2010
2011 #[test]
2012 fn test_time_utc_roundtrip() -> Result<()> {
2013 let schema = Arc::new(Schema::new(vec![
2014 Field::new(
2015 "time_millis",
2016 ArrowDataType::Time32(TimeUnit::Millisecond),
2017 true,
2018 )
2019 .with_metadata(HashMap::from_iter(vec![(
2020 "adjusted_to_utc".to_string(),
2021 "".to_string(),
2022 )])),
2023 Field::new(
2024 "time_micros",
2025 ArrowDataType::Time64(TimeUnit::Microsecond),
2026 true,
2027 )
2028 .with_metadata(HashMap::from_iter(vec![(
2029 "adjusted_to_utc".to_string(),
2030 "".to_string(),
2031 )])),
2032 ]));
2033
2034 let mut buf = Vec::with_capacity(1024);
2035 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2036
2037 let original = RecordBatch::try_new(
2038 schema,
2039 vec![
2040 Arc::new(Time32MillisecondArray::from(vec![
2041 Some(-1),
2042 Some(0),
2043 Some(86_399_000),
2044 Some(86_400_000),
2045 Some(86_401_000),
2046 None,
2047 ])),
2048 Arc::new(Time64MicrosecondArray::from(vec![
2049 Some(-1),
2050 Some(0),
2051 Some(86_399 * 1_000_000),
2052 Some(86_400 * 1_000_000),
2053 Some(86_401 * 1_000_000),
2054 None,
2055 ])),
2056 ],
2057 )?;
2058
2059 writer.write(&original)?;
2060 writer.close()?;
2061
2062 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2063 let ret = reader.next().unwrap()?;
2064 assert_eq!(ret, original);
2065
2066 ret.column(0).as_primitive::<Time32MillisecondType>();
2068 ret.column(1).as_primitive::<Time64MicrosecondType>();
2069
2070 Ok(())
2071 }
2072
2073 #[test]
2074 fn test_date32_roundtrip() -> Result<()> {
2075 use arrow_array::Date32Array;
2076
2077 let schema = Arc::new(Schema::new(vec![Field::new(
2078 "date32",
2079 ArrowDataType::Date32,
2080 false,
2081 )]));
2082
2083 let mut buf = Vec::with_capacity(1024);
2084
2085 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2086
2087 let original = RecordBatch::try_new(
2088 schema,
2089 vec![Arc::new(Date32Array::from(vec![
2090 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2091 ]))],
2092 )?;
2093
2094 writer.write(&original)?;
2095 writer.close()?;
2096
2097 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2098 let ret = reader.next().unwrap()?;
2099 assert_eq!(ret, original);
2100
2101 ret.column(0).as_primitive::<Date32Type>();
2103
2104 Ok(())
2105 }
2106
2107 #[test]
2108 fn test_date64_roundtrip() -> Result<()> {
2109 use arrow_array::Date64Array;
2110
2111 let schema = Arc::new(Schema::new(vec![
2112 Field::new("small-date64", ArrowDataType::Date64, false),
2113 Field::new("big-date64", ArrowDataType::Date64, false),
2114 Field::new("invalid-date64", ArrowDataType::Date64, false),
2115 ]));
2116
2117 let mut default_buf = Vec::with_capacity(1024);
2118 let mut coerce_buf = Vec::with_capacity(1024);
2119
2120 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2121
2122 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2123 let mut coerce_writer =
2124 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2125
2126 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2127
2128 let original = RecordBatch::try_new(
2129 schema,
2130 vec![
2131 Arc::new(Date64Array::from(vec![
2133 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2134 -1_000 * NUM_MILLISECONDS_IN_DAY,
2135 0,
2136 1_000 * NUM_MILLISECONDS_IN_DAY,
2137 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2138 ])),
2139 Arc::new(Date64Array::from(vec![
2141 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2142 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2143 0,
2144 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2145 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2146 ])),
2147 Arc::new(Date64Array::from(vec![
2149 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2150 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2151 1,
2152 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2153 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2154 ])),
2155 ],
2156 )?;
2157
2158 default_writer.write(&original)?;
2159 coerce_writer.write(&original)?;
2160
2161 default_writer.close()?;
2162 coerce_writer.close()?;
2163
2164 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2165 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2166
2167 let default_ret = default_reader.next().unwrap()?;
2168 let coerce_ret = coerce_reader.next().unwrap()?;
2169
2170 assert_eq!(default_ret, original);
2172
2173 assert_eq!(coerce_ret.column(0), original.column(0));
2175 assert_ne!(coerce_ret.column(1), original.column(1));
2176 assert_ne!(coerce_ret.column(2), original.column(2));
2177
2178 default_ret.column(0).as_primitive::<Date64Type>();
2180 coerce_ret.column(0).as_primitive::<Date64Type>();
2181
2182 Ok(())
2183 }
2184 struct RandFixedLenGen {}
2185
2186 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2187 fn r#gen(len: i32) -> FixedLenByteArray {
2188 let mut v = vec![0u8; len as usize];
2189 rng().fill_bytes(&mut v);
2190 ByteArray::from(v).into()
2191 }
2192 }
2193
2194 #[test]
2195 fn test_fixed_length_binary_column_reader() {
2196 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2197 20,
2198 ConvertedType::NONE,
2199 None,
2200 |vals| {
2201 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2202 for val in vals {
2203 match val {
2204 Some(b) => builder.append_value(b).unwrap(),
2205 None => builder.append_null(),
2206 }
2207 }
2208 Arc::new(builder.finish())
2209 },
2210 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2211 );
2212 }
2213
2214 #[test]
2215 fn test_interval_day_time_column_reader() {
2216 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2217 12,
2218 ConvertedType::INTERVAL,
2219 None,
2220 |vals| {
2221 Arc::new(
2222 vals.iter()
2223 .map(|x| {
2224 x.as_ref().map(|b| IntervalDayTime {
2225 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2226 milliseconds: i32::from_le_bytes(
2227 b.as_ref()[8..12].try_into().unwrap(),
2228 ),
2229 })
2230 })
2231 .collect::<IntervalDayTimeArray>(),
2232 )
2233 },
2234 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2235 );
2236 }
2237
2238 #[test]
2239 fn test_int96_single_column_reader_test() {
2240 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2241
2242 type TypeHintAndConversionFunction =
2243 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2244
2245 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2246 (None, |vals: &[Option<Int96>]| {
2248 Arc::new(TimestampNanosecondArray::from_iter(
2249 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2250 )) as ArrayRef
2251 }),
2252 (
2254 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2255 |vals: &[Option<Int96>]| {
2256 Arc::new(TimestampSecondArray::from_iter(
2257 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2258 )) as ArrayRef
2259 },
2260 ),
2261 (
2262 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2263 |vals: &[Option<Int96>]| {
2264 Arc::new(TimestampMillisecondArray::from_iter(
2265 vals.iter().map(|x| x.map(|x| x.to_millis())),
2266 )) as ArrayRef
2267 },
2268 ),
2269 (
2270 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2271 |vals: &[Option<Int96>]| {
2272 Arc::new(TimestampMicrosecondArray::from_iter(
2273 vals.iter().map(|x| x.map(|x| x.to_micros())),
2274 )) as ArrayRef
2275 },
2276 ),
2277 (
2278 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2279 |vals: &[Option<Int96>]| {
2280 Arc::new(TimestampNanosecondArray::from_iter(
2281 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2282 )) as ArrayRef
2283 },
2284 ),
2285 (
2287 Some(ArrowDataType::Timestamp(
2288 TimeUnit::Second,
2289 Some(Arc::from("-05:00")),
2290 )),
2291 |vals: &[Option<Int96>]| {
2292 Arc::new(
2293 TimestampSecondArray::from_iter(
2294 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2295 )
2296 .with_timezone("-05:00"),
2297 ) as ArrayRef
2298 },
2299 ),
2300 ];
2301
2302 resolutions.iter().for_each(|(arrow_type, converter)| {
2303 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2304 2,
2305 ConvertedType::NONE,
2306 arrow_type.clone(),
2307 converter,
2308 encodings,
2309 );
2310 })
2311 }
2312
2313 #[test]
2314 fn test_int96_from_spark_file_with_provided_schema() {
2315 use arrow_schema::DataType::Timestamp;
2319 let test_data = arrow::util::test_util::parquet_test_data();
2320 let path = format!("{test_data}/int96_from_spark.parquet");
2321 let file = File::open(path).unwrap();
2322
2323 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2324 "a",
2325 Timestamp(TimeUnit::Microsecond, None),
2326 true,
2327 )]));
2328 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2329
2330 let mut record_reader =
2331 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2332 .unwrap()
2333 .build()
2334 .unwrap();
2335
2336 let batch = record_reader.next().unwrap().unwrap();
2337 assert_eq!(batch.num_columns(), 1);
2338 let column = batch.column(0);
2339 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2340
2341 let expected = Arc::new(Int64Array::from(vec![
2342 Some(1704141296123456),
2343 Some(1704070800000000),
2344 Some(253402225200000000),
2345 Some(1735599600000000),
2346 None,
2347 Some(9089380393200000000),
2348 ]));
2349
2350 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2355 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2356
2357 assert_eq!(casted_timestamps.len(), expected.len());
2358
2359 casted_timestamps
2360 .iter()
2361 .zip(expected.iter())
2362 .for_each(|(lhs, rhs)| {
2363 assert_eq!(lhs, rhs);
2364 });
2365 }
2366
2367 #[test]
2368 fn test_int96_from_spark_file_without_provided_schema() {
2369 use arrow_schema::DataType::Timestamp;
2373 let test_data = arrow::util::test_util::parquet_test_data();
2374 let path = format!("{test_data}/int96_from_spark.parquet");
2375 let file = File::open(path).unwrap();
2376
2377 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2378 .unwrap()
2379 .build()
2380 .unwrap();
2381
2382 let batch = record_reader.next().unwrap().unwrap();
2383 assert_eq!(batch.num_columns(), 1);
2384 let column = batch.column(0);
2385 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2386
2387 let expected = Arc::new(Int64Array::from(vec![
2388 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2393 Some(-4864435138808946688), ]));
2395
2396 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2401 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2402
2403 assert_eq!(casted_timestamps.len(), expected.len());
2404
2405 casted_timestamps
2406 .iter()
2407 .zip(expected.iter())
2408 .for_each(|(lhs, rhs)| {
2409 assert_eq!(lhs, rhs);
2410 });
2411 }
2412
2413 struct RandUtf8Gen {}
2414
2415 impl RandGen<ByteArrayType> for RandUtf8Gen {
2416 fn r#gen(len: i32) -> ByteArray {
2417 Int32Type::r#gen(len).to_string().as_str().into()
2418 }
2419 }
2420
2421 #[test]
2422 fn test_utf8_single_column_reader_test() {
2423 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2424 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2425 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2426 })))
2427 }
2428
2429 let encodings = &[
2430 Encoding::PLAIN,
2431 Encoding::RLE_DICTIONARY,
2432 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2433 Encoding::DELTA_BYTE_ARRAY,
2434 ];
2435
2436 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2437 2,
2438 ConvertedType::NONE,
2439 None,
2440 |vals| {
2441 Arc::new(BinaryArray::from_iter(
2442 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2443 ))
2444 },
2445 encodings,
2446 );
2447
2448 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2449 2,
2450 ConvertedType::UTF8,
2451 None,
2452 string_converter::<i32>,
2453 encodings,
2454 );
2455
2456 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2457 2,
2458 ConvertedType::UTF8,
2459 Some(ArrowDataType::Utf8),
2460 string_converter::<i32>,
2461 encodings,
2462 );
2463
2464 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2465 2,
2466 ConvertedType::UTF8,
2467 Some(ArrowDataType::LargeUtf8),
2468 string_converter::<i64>,
2469 encodings,
2470 );
2471
2472 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2473 for key in &small_key_types {
2474 for encoding in encodings {
2475 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2476 opts.encoding = *encoding;
2477
2478 let data_type =
2479 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2480
2481 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2483 opts,
2484 2,
2485 ConvertedType::UTF8,
2486 Some(data_type.clone()),
2487 move |vals| {
2488 let vals = string_converter::<i32>(vals);
2489 arrow::compute::cast(&vals, &data_type).unwrap()
2490 },
2491 );
2492 }
2493 }
2494
2495 let key_types = [
2496 ArrowDataType::Int16,
2497 ArrowDataType::UInt16,
2498 ArrowDataType::Int32,
2499 ArrowDataType::UInt32,
2500 ArrowDataType::Int64,
2501 ArrowDataType::UInt64,
2502 ];
2503
2504 for key in &key_types {
2505 let data_type =
2506 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2507
2508 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2509 2,
2510 ConvertedType::UTF8,
2511 Some(data_type.clone()),
2512 move |vals| {
2513 let vals = string_converter::<i32>(vals);
2514 arrow::compute::cast(&vals, &data_type).unwrap()
2515 },
2516 encodings,
2517 );
2518
2519 let data_type = ArrowDataType::Dictionary(
2520 Box::new(key.clone()),
2521 Box::new(ArrowDataType::LargeUtf8),
2522 );
2523
2524 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2525 2,
2526 ConvertedType::UTF8,
2527 Some(data_type.clone()),
2528 move |vals| {
2529 let vals = string_converter::<i64>(vals);
2530 arrow::compute::cast(&vals, &data_type).unwrap()
2531 },
2532 encodings,
2533 );
2534 }
2535 }
2536
2537 #[test]
2538 fn test_decimal_nullable_struct() {
2539 let decimals = Decimal256Array::from_iter_values(
2540 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2541 );
2542
2543 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2544 "decimals",
2545 decimals.data_type().clone(),
2546 false,
2547 )])))
2548 .len(8)
2549 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2550 .child_data(vec![decimals.into_data()])
2551 .build()
2552 .unwrap();
2553
2554 let written =
2555 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2556 .unwrap();
2557
2558 let mut buffer = Vec::with_capacity(1024);
2559 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2560 writer.write(&written).unwrap();
2561 writer.close().unwrap();
2562
2563 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2564 .unwrap()
2565 .collect::<Result<Vec<_>, _>>()
2566 .unwrap();
2567
2568 assert_eq!(&written.slice(0, 3), &read[0]);
2569 assert_eq!(&written.slice(3, 3), &read[1]);
2570 assert_eq!(&written.slice(6, 2), &read[2]);
2571 }
2572
2573 #[test]
2574 fn test_int32_nullable_struct() {
2575 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2576 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2577 "int32",
2578 int32.data_type().clone(),
2579 false,
2580 )])))
2581 .len(8)
2582 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2583 .child_data(vec![int32.into_data()])
2584 .build()
2585 .unwrap();
2586
2587 let written =
2588 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2589 .unwrap();
2590
2591 let mut buffer = Vec::with_capacity(1024);
2592 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2593 writer.write(&written).unwrap();
2594 writer.close().unwrap();
2595
2596 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2597 .unwrap()
2598 .collect::<Result<Vec<_>, _>>()
2599 .unwrap();
2600
2601 assert_eq!(&written.slice(0, 3), &read[0]);
2602 assert_eq!(&written.slice(3, 3), &read[1]);
2603 assert_eq!(&written.slice(6, 2), &read[2]);
2604 }
2605
2606 #[test]
2607 fn test_decimal_list() {
2608 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2609
2610 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2612 decimals.data_type().clone(),
2613 false,
2614 ))))
2615 .len(7)
2616 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2617 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2618 .child_data(vec![decimals.into_data()])
2619 .build()
2620 .unwrap();
2621
2622 let written =
2623 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2624 .unwrap();
2625
2626 let mut buffer = Vec::with_capacity(1024);
2627 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2628 writer.write(&written).unwrap();
2629 writer.close().unwrap();
2630
2631 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2632 .unwrap()
2633 .collect::<Result<Vec<_>, _>>()
2634 .unwrap();
2635
2636 assert_eq!(&written.slice(0, 3), &read[0]);
2637 assert_eq!(&written.slice(3, 3), &read[1]);
2638 assert_eq!(&written.slice(6, 1), &read[2]);
2639 }
2640
2641 #[test]
2642 fn test_read_decimal_file() {
2643 use arrow_array::Decimal128Array;
2644 let testdata = arrow::util::test_util::parquet_test_data();
2645 let file_variants = vec![
2646 ("byte_array", 4),
2647 ("fixed_length", 25),
2648 ("int32", 4),
2649 ("int64", 10),
2650 ];
2651 for (prefix, target_precision) in file_variants {
2652 let path = format!("{testdata}/{prefix}_decimal.parquet");
2653 let file = File::open(path).unwrap();
2654 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2655
2656 let batch = record_reader.next().unwrap().unwrap();
2657 assert_eq!(batch.num_rows(), 24);
2658 let col = batch
2659 .column(0)
2660 .as_any()
2661 .downcast_ref::<Decimal128Array>()
2662 .unwrap();
2663
2664 let expected = 1..25;
2665
2666 assert_eq!(col.precision(), target_precision);
2667 assert_eq!(col.scale(), 2);
2668
2669 for (i, v) in expected.enumerate() {
2670 assert_eq!(col.value(i), v * 100_i128);
2671 }
2672 }
2673 }
2674
2675 #[test]
2676 fn test_read_float16_nonzeros_file() {
2677 use arrow_array::Float16Array;
2678 let testdata = arrow::util::test_util::parquet_test_data();
2679 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2681 let file = File::open(path).unwrap();
2682 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2683
2684 let batch = record_reader.next().unwrap().unwrap();
2685 assert_eq!(batch.num_rows(), 8);
2686 let col = batch
2687 .column(0)
2688 .as_any()
2689 .downcast_ref::<Float16Array>()
2690 .unwrap();
2691
2692 let f16_two = f16::ONE + f16::ONE;
2693
2694 assert_eq!(col.null_count(), 1);
2695 assert!(col.is_null(0));
2696 assert_eq!(col.value(1), f16::ONE);
2697 assert_eq!(col.value(2), -f16_two);
2698 assert!(col.value(3).is_nan());
2699 assert_eq!(col.value(4), f16::ZERO);
2700 assert!(col.value(4).is_sign_positive());
2701 assert_eq!(col.value(5), f16::NEG_ONE);
2702 assert_eq!(col.value(6), f16::NEG_ZERO);
2703 assert!(col.value(6).is_sign_negative());
2704 assert_eq!(col.value(7), f16_two);
2705 }
2706
2707 #[test]
2708 fn test_read_float16_zeros_file() {
2709 use arrow_array::Float16Array;
2710 let testdata = arrow::util::test_util::parquet_test_data();
2711 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2713 let file = File::open(path).unwrap();
2714 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2715
2716 let batch = record_reader.next().unwrap().unwrap();
2717 assert_eq!(batch.num_rows(), 3);
2718 let col = batch
2719 .column(0)
2720 .as_any()
2721 .downcast_ref::<Float16Array>()
2722 .unwrap();
2723
2724 assert_eq!(col.null_count(), 1);
2725 assert!(col.is_null(0));
2726 assert_eq!(col.value(1), f16::ZERO);
2727 assert!(col.value(1).is_sign_positive());
2728 assert!(col.value(2).is_nan());
2729 }
2730
2731 #[test]
2732 fn test_read_float32_float64_byte_stream_split() {
2733 let path = format!(
2734 "{}/byte_stream_split.zstd.parquet",
2735 arrow::util::test_util::parquet_test_data(),
2736 );
2737 let file = File::open(path).unwrap();
2738 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2739
2740 let mut row_count = 0;
2741 for batch in record_reader {
2742 let batch = batch.unwrap();
2743 row_count += batch.num_rows();
2744 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2745 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2746
2747 for &x in f32_col.values() {
2749 assert!(x > -10.0);
2750 assert!(x < 10.0);
2751 }
2752 for &x in f64_col.values() {
2753 assert!(x > -10.0);
2754 assert!(x < 10.0);
2755 }
2756 }
2757 assert_eq!(row_count, 300);
2758 }
2759
2760 #[test]
2761 fn test_read_extended_byte_stream_split() {
2762 let path = format!(
2763 "{}/byte_stream_split_extended.gzip.parquet",
2764 arrow::util::test_util::parquet_test_data(),
2765 );
2766 let file = File::open(path).unwrap();
2767 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2768
2769 let mut row_count = 0;
2770 for batch in record_reader {
2771 let batch = batch.unwrap();
2772 row_count += batch.num_rows();
2773
2774 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2776 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2777 assert_eq!(f16_col.len(), f16_bss.len());
2778 f16_col
2779 .iter()
2780 .zip(f16_bss.iter())
2781 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2782
2783 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2785 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2786 assert_eq!(f32_col.len(), f32_bss.len());
2787 f32_col
2788 .iter()
2789 .zip(f32_bss.iter())
2790 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2791
2792 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2794 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2795 assert_eq!(f64_col.len(), f64_bss.len());
2796 f64_col
2797 .iter()
2798 .zip(f64_bss.iter())
2799 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2800
2801 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2803 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2804 assert_eq!(i32_col.len(), i32_bss.len());
2805 i32_col
2806 .iter()
2807 .zip(i32_bss.iter())
2808 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2809
2810 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2812 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2813 assert_eq!(i64_col.len(), i64_bss.len());
2814 i64_col
2815 .iter()
2816 .zip(i64_bss.iter())
2817 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2818
2819 let flba_col = batch.column(10).as_fixed_size_binary();
2821 let flba_bss = batch.column(11).as_fixed_size_binary();
2822 assert_eq!(flba_col.len(), flba_bss.len());
2823 flba_col
2824 .iter()
2825 .zip(flba_bss.iter())
2826 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2827
2828 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2830 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2831 assert_eq!(dec_col.len(), dec_bss.len());
2832 dec_col
2833 .iter()
2834 .zip(dec_bss.iter())
2835 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2836 }
2837 assert_eq!(row_count, 200);
2838 }
2839
2840 #[test]
2841 fn test_read_incorrect_map_schema_file() {
2842 let testdata = arrow::util::test_util::parquet_test_data();
2843 let path = format!("{testdata}/incorrect_map_schema.parquet");
2845 let file = File::open(path).unwrap();
2846 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2847
2848 let batch = record_reader.next().unwrap().unwrap();
2849 assert_eq!(batch.num_rows(), 1);
2850
2851 let expected_schema = Schema::new(vec![Field::new(
2852 "my_map",
2853 ArrowDataType::Map(
2854 Arc::new(Field::new(
2855 "key_value",
2856 ArrowDataType::Struct(Fields::from(vec![
2857 Field::new("key", ArrowDataType::Utf8, false),
2858 Field::new("value", ArrowDataType::Utf8, true),
2859 ])),
2860 false,
2861 )),
2862 false,
2863 ),
2864 true,
2865 )]);
2866 assert_eq!(batch.schema().as_ref(), &expected_schema);
2867
2868 assert_eq!(batch.num_rows(), 1);
2869 assert_eq!(batch.column(0).null_count(), 0);
2870 assert_eq!(
2871 batch.column(0).as_map().keys().as_ref(),
2872 &StringArray::from(vec!["parent", "name"])
2873 );
2874 assert_eq!(
2875 batch.column(0).as_map().values().as_ref(),
2876 &StringArray::from(vec!["another", "report"])
2877 );
2878 }
2879
2880 #[test]
2881 fn test_read_dict_fixed_size_binary() {
2882 let schema = Arc::new(Schema::new(vec![Field::new(
2883 "a",
2884 ArrowDataType::Dictionary(
2885 Box::new(ArrowDataType::UInt8),
2886 Box::new(ArrowDataType::FixedSizeBinary(8)),
2887 ),
2888 true,
2889 )]));
2890 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2891 let values = FixedSizeBinaryArray::try_from_iter(
2892 vec![
2893 (0u8..8u8).collect::<Vec<u8>>(),
2894 (24u8..32u8).collect::<Vec<u8>>(),
2895 ]
2896 .into_iter(),
2897 )
2898 .unwrap();
2899 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2900 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2901
2902 let mut buffer = Vec::with_capacity(1024);
2903 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2904 writer.write(&batch).unwrap();
2905 writer.close().unwrap();
2906 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2907 .unwrap()
2908 .collect::<Result<Vec<_>, _>>()
2909 .unwrap();
2910
2911 assert_eq!(read.len(), 1);
2912 assert_eq!(&batch, &read[0])
2913 }
2914
2915 #[test]
2916 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2917 let struct_fields = Fields::from(vec![
2924 Field::new(
2925 "city",
2926 ArrowDataType::Dictionary(
2927 Box::new(ArrowDataType::UInt8),
2928 Box::new(ArrowDataType::Utf8),
2929 ),
2930 true,
2931 ),
2932 Field::new("name", ArrowDataType::Utf8, true),
2933 ]);
2934 let schema = Arc::new(Schema::new(vec![Field::new(
2935 "items",
2936 ArrowDataType::Struct(struct_fields.clone()),
2937 true,
2938 )]));
2939
2940 let items_arr = StructArray::new(
2941 struct_fields,
2942 vec![
2943 Arc::new(DictionaryArray::new(
2944 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2945 Arc::new(StringArray::from_iter_values(vec![
2946 "quebec",
2947 "fredericton",
2948 "halifax",
2949 ])),
2950 )),
2951 Arc::new(StringArray::from_iter_values(vec![
2952 "albert", "terry", "lance", "", "tim",
2953 ])),
2954 ],
2955 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2956 );
2957
2958 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2959 let mut buffer = Vec::with_capacity(1024);
2960 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2961 writer.write(&batch).unwrap();
2962 writer.close().unwrap();
2963 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2964 .unwrap()
2965 .collect::<Result<Vec<_>, _>>()
2966 .unwrap();
2967
2968 assert_eq!(read.len(), 1);
2969 assert_eq!(&batch, &read[0])
2970 }
2971
2972 #[derive(Clone)]
2974 struct TestOptions {
2975 num_row_groups: usize,
2978 num_rows: usize,
2980 record_batch_size: usize,
2982 null_percent: Option<usize>,
2984 write_batch_size: usize,
2989 max_data_page_size: usize,
2991 max_dict_page_size: usize,
2993 writer_version: WriterVersion,
2995 enabled_statistics: EnabledStatistics,
2997 encoding: Encoding,
2999 row_selections: Option<(RowSelection, usize)>,
3001 row_filter: Option<Vec<bool>>,
3003 limit: Option<usize>,
3005 offset: Option<usize>,
3007 }
3008
3009 impl std::fmt::Debug for TestOptions {
3011 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3012 f.debug_struct("TestOptions")
3013 .field("num_row_groups", &self.num_row_groups)
3014 .field("num_rows", &self.num_rows)
3015 .field("record_batch_size", &self.record_batch_size)
3016 .field("null_percent", &self.null_percent)
3017 .field("write_batch_size", &self.write_batch_size)
3018 .field("max_data_page_size", &self.max_data_page_size)
3019 .field("max_dict_page_size", &self.max_dict_page_size)
3020 .field("writer_version", &self.writer_version)
3021 .field("enabled_statistics", &self.enabled_statistics)
3022 .field("encoding", &self.encoding)
3023 .field("row_selections", &self.row_selections.is_some())
3024 .field("row_filter", &self.row_filter.is_some())
3025 .field("limit", &self.limit)
3026 .field("offset", &self.offset)
3027 .finish()
3028 }
3029 }
3030
3031 impl Default for TestOptions {
3032 fn default() -> Self {
3033 Self {
3034 num_row_groups: 2,
3035 num_rows: 100,
3036 record_batch_size: 15,
3037 null_percent: None,
3038 write_batch_size: 64,
3039 max_data_page_size: 1024 * 1024,
3040 max_dict_page_size: 1024 * 1024,
3041 writer_version: WriterVersion::PARQUET_1_0,
3042 enabled_statistics: EnabledStatistics::Page,
3043 encoding: Encoding::PLAIN,
3044 row_selections: None,
3045 row_filter: None,
3046 limit: None,
3047 offset: None,
3048 }
3049 }
3050 }
3051
3052 impl TestOptions {
3053 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3054 Self {
3055 num_row_groups,
3056 num_rows,
3057 record_batch_size,
3058 ..Default::default()
3059 }
3060 }
3061
3062 fn with_null_percent(self, null_percent: usize) -> Self {
3063 Self {
3064 null_percent: Some(null_percent),
3065 ..self
3066 }
3067 }
3068
3069 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3070 Self {
3071 max_data_page_size,
3072 ..self
3073 }
3074 }
3075
3076 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3077 Self {
3078 max_dict_page_size,
3079 ..self
3080 }
3081 }
3082
3083 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3084 Self {
3085 enabled_statistics,
3086 ..self
3087 }
3088 }
3089
3090 fn with_row_selections(self) -> Self {
3091 assert!(self.row_filter.is_none(), "Must set row selection first");
3092
3093 let mut rng = rng();
3094 let step = rng.random_range(self.record_batch_size..self.num_rows);
3095 let row_selections = create_test_selection(
3096 step,
3097 self.num_row_groups * self.num_rows,
3098 rng.random::<bool>(),
3099 );
3100 Self {
3101 row_selections: Some(row_selections),
3102 ..self
3103 }
3104 }
3105
3106 fn with_row_filter(self) -> Self {
3107 let row_count = match &self.row_selections {
3108 Some((_, count)) => *count,
3109 None => self.num_row_groups * self.num_rows,
3110 };
3111
3112 let mut rng = rng();
3113 Self {
3114 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3115 ..self
3116 }
3117 }
3118
3119 fn with_limit(self, limit: usize) -> Self {
3120 Self {
3121 limit: Some(limit),
3122 ..self
3123 }
3124 }
3125
3126 fn with_offset(self, offset: usize) -> Self {
3127 Self {
3128 offset: Some(offset),
3129 ..self
3130 }
3131 }
3132
3133 fn writer_props(&self) -> WriterProperties {
3134 let builder = WriterProperties::builder()
3135 .set_data_page_size_limit(self.max_data_page_size)
3136 .set_write_batch_size(self.write_batch_size)
3137 .set_writer_version(self.writer_version)
3138 .set_statistics_enabled(self.enabled_statistics);
3139
3140 let builder = match self.encoding {
3141 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3142 .set_dictionary_enabled(true)
3143 .set_dictionary_page_size_limit(self.max_dict_page_size),
3144 _ => builder
3145 .set_dictionary_enabled(false)
3146 .set_encoding(self.encoding),
3147 };
3148
3149 builder.build()
3150 }
3151 }
3152
3153 fn run_single_column_reader_tests<T, F, G>(
3160 rand_max: i32,
3161 converted_type: ConvertedType,
3162 arrow_type: Option<ArrowDataType>,
3163 converter: F,
3164 encodings: &[Encoding],
3165 ) where
3166 T: DataType,
3167 G: RandGen<T>,
3168 F: Fn(&[Option<T::T>]) -> ArrayRef,
3169 {
3170 let all_options = vec![
3171 TestOptions::new(2, 100, 15),
3174 TestOptions::new(3, 25, 5),
3179 TestOptions::new(4, 100, 25),
3183 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3185 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3187 TestOptions::new(2, 256, 127).with_null_percent(0),
3189 TestOptions::new(2, 256, 93).with_null_percent(25),
3191 TestOptions::new(4, 100, 25).with_limit(0),
3193 TestOptions::new(4, 100, 25).with_limit(50),
3195 TestOptions::new(4, 100, 25).with_limit(10),
3197 TestOptions::new(4, 100, 25).with_limit(101),
3199 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3201 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3203 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3205 TestOptions::new(2, 256, 91)
3207 .with_null_percent(25)
3208 .with_enabled_statistics(EnabledStatistics::Chunk),
3209 TestOptions::new(2, 256, 91)
3211 .with_null_percent(25)
3212 .with_enabled_statistics(EnabledStatistics::None),
3213 TestOptions::new(2, 128, 91)
3215 .with_null_percent(100)
3216 .with_enabled_statistics(EnabledStatistics::None),
3217 TestOptions::new(2, 100, 15).with_row_selections(),
3222 TestOptions::new(3, 25, 5).with_row_selections(),
3227 TestOptions::new(4, 100, 25).with_row_selections(),
3231 TestOptions::new(3, 256, 73)
3233 .with_max_data_page_size(128)
3234 .with_row_selections(),
3235 TestOptions::new(3, 256, 57)
3237 .with_max_dict_page_size(128)
3238 .with_row_selections(),
3239 TestOptions::new(2, 256, 127)
3241 .with_null_percent(0)
3242 .with_row_selections(),
3243 TestOptions::new(2, 256, 93)
3245 .with_null_percent(25)
3246 .with_row_selections(),
3247 TestOptions::new(2, 256, 93)
3249 .with_null_percent(25)
3250 .with_row_selections()
3251 .with_limit(10),
3252 TestOptions::new(2, 256, 93)
3254 .with_null_percent(25)
3255 .with_row_selections()
3256 .with_offset(20)
3257 .with_limit(10),
3258 TestOptions::new(4, 100, 25).with_row_filter(),
3262 TestOptions::new(4, 100, 25)
3264 .with_row_selections()
3265 .with_row_filter(),
3266 TestOptions::new(2, 256, 93)
3268 .with_null_percent(25)
3269 .with_max_data_page_size(10)
3270 .with_row_filter(),
3271 TestOptions::new(2, 256, 93)
3273 .with_null_percent(25)
3274 .with_max_data_page_size(10)
3275 .with_row_selections()
3276 .with_row_filter(),
3277 TestOptions::new(2, 256, 93)
3279 .with_enabled_statistics(EnabledStatistics::None)
3280 .with_max_data_page_size(10)
3281 .with_row_selections(),
3282 ];
3283
3284 all_options.into_iter().for_each(|opts| {
3285 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3286 for encoding in encodings {
3287 let opts = TestOptions {
3288 writer_version,
3289 encoding: *encoding,
3290 ..opts.clone()
3291 };
3292
3293 single_column_reader_test::<T, _, G>(
3294 opts,
3295 rand_max,
3296 converted_type,
3297 arrow_type.clone(),
3298 &converter,
3299 )
3300 }
3301 }
3302 });
3303 }
3304
3305 fn single_column_reader_test<T, F, G>(
3309 opts: TestOptions,
3310 rand_max: i32,
3311 converted_type: ConvertedType,
3312 arrow_type: Option<ArrowDataType>,
3313 converter: F,
3314 ) where
3315 T: DataType,
3316 G: RandGen<T>,
3317 F: Fn(&[Option<T::T>]) -> ArrayRef,
3318 {
3319 println!(
3321 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3322 T::get_physical_type(),
3323 converted_type,
3324 arrow_type,
3325 opts
3326 );
3327
3328 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3330 Some(null_percent) => {
3331 let mut rng = rng();
3332
3333 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3334 .map(|_| {
3335 std::iter::from_fn(|| {
3336 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3337 })
3338 .take(opts.num_rows)
3339 .collect()
3340 })
3341 .collect();
3342 (Repetition::OPTIONAL, Some(def_levels))
3343 }
3344 None => (Repetition::REQUIRED, None),
3345 };
3346
3347 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3349 .map(|idx| {
3350 let null_count = match def_levels.as_ref() {
3351 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3352 None => 0,
3353 };
3354 G::gen_vec(rand_max, opts.num_rows - null_count)
3355 })
3356 .collect();
3357
3358 let len = match T::get_physical_type() {
3359 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3360 crate::basic::Type::INT96 => 12,
3361 _ => -1,
3362 };
3363
3364 let fields = vec![Arc::new(
3365 Type::primitive_type_builder("leaf", T::get_physical_type())
3366 .with_repetition(repetition)
3367 .with_converted_type(converted_type)
3368 .with_length(len)
3369 .build()
3370 .unwrap(),
3371 )];
3372
3373 let schema = Arc::new(
3374 Type::group_type_builder("test_schema")
3375 .with_fields(fields)
3376 .build()
3377 .unwrap(),
3378 );
3379
3380 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3381
3382 let mut file = tempfile::tempfile().unwrap();
3383
3384 generate_single_column_file_with_data::<T>(
3385 &values,
3386 def_levels.as_ref(),
3387 file.try_clone().unwrap(), schema,
3389 arrow_field,
3390 &opts,
3391 )
3392 .unwrap();
3393
3394 file.rewind().unwrap();
3395
3396 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3397 opts.enabled_statistics == EnabledStatistics::Page,
3398 ));
3399
3400 let mut builder =
3401 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3402
3403 let expected_data = match opts.row_selections {
3404 Some((selections, row_count)) => {
3405 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3406
3407 let mut skip_data: Vec<Option<T::T>> = vec![];
3408 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3409 for select in dequeue {
3410 if select.skip {
3411 without_skip_data.drain(0..select.row_count);
3412 } else {
3413 skip_data.extend(without_skip_data.drain(0..select.row_count));
3414 }
3415 }
3416 builder = builder.with_row_selection(selections);
3417
3418 assert_eq!(skip_data.len(), row_count);
3419 skip_data
3420 }
3421 None => {
3422 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3424 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3425 expected_data
3426 }
3427 };
3428
3429 let mut expected_data = match opts.row_filter {
3430 Some(filter) => {
3431 let expected_data = expected_data
3432 .into_iter()
3433 .zip(filter.iter())
3434 .filter_map(|(d, f)| f.then(|| d))
3435 .collect();
3436
3437 let mut filter_offset = 0;
3438 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3439 ProjectionMask::all(),
3440 move |b| {
3441 let array = BooleanArray::from_iter(
3442 filter
3443 .iter()
3444 .skip(filter_offset)
3445 .take(b.num_rows())
3446 .map(|x| Some(*x)),
3447 );
3448 filter_offset += b.num_rows();
3449 Ok(array)
3450 },
3451 ))]);
3452
3453 builder = builder.with_row_filter(filter);
3454 expected_data
3455 }
3456 None => expected_data,
3457 };
3458
3459 if let Some(offset) = opts.offset {
3460 builder = builder.with_offset(offset);
3461 expected_data = expected_data.into_iter().skip(offset).collect();
3462 }
3463
3464 if let Some(limit) = opts.limit {
3465 builder = builder.with_limit(limit);
3466 expected_data = expected_data.into_iter().take(limit).collect();
3467 }
3468
3469 let mut record_reader = builder
3470 .with_batch_size(opts.record_batch_size)
3471 .build()
3472 .unwrap();
3473
3474 let mut total_read = 0;
3475 loop {
3476 let maybe_batch = record_reader.next();
3477 if total_read < expected_data.len() {
3478 let end = min(total_read + opts.record_batch_size, expected_data.len());
3479 let batch = maybe_batch.unwrap().unwrap();
3480 assert_eq!(end - total_read, batch.num_rows());
3481
3482 let a = converter(&expected_data[total_read..end]);
3483 let b = batch.column(0);
3484
3485 assert_eq!(a.data_type(), b.data_type());
3486 assert_eq!(a.to_data(), b.to_data());
3487 assert_eq!(
3488 a.as_any().type_id(),
3489 b.as_any().type_id(),
3490 "incorrect type ids"
3491 );
3492
3493 total_read = end;
3494 } else {
3495 assert!(maybe_batch.is_none());
3496 break;
3497 }
3498 }
3499 }
3500
3501 fn gen_expected_data<T: DataType>(
3502 def_levels: Option<&Vec<Vec<i16>>>,
3503 values: &[Vec<T::T>],
3504 ) -> Vec<Option<T::T>> {
3505 let data: Vec<Option<T::T>> = match def_levels {
3506 Some(levels) => {
3507 let mut values_iter = values.iter().flatten();
3508 levels
3509 .iter()
3510 .flatten()
3511 .map(|d| match d {
3512 1 => Some(values_iter.next().cloned().unwrap()),
3513 0 => None,
3514 _ => unreachable!(),
3515 })
3516 .collect()
3517 }
3518 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3519 };
3520 data
3521 }
3522
3523 fn generate_single_column_file_with_data<T: DataType>(
3524 values: &[Vec<T::T>],
3525 def_levels: Option<&Vec<Vec<i16>>>,
3526 file: File,
3527 schema: TypePtr,
3528 field: Option<Field>,
3529 opts: &TestOptions,
3530 ) -> Result<ParquetMetaData> {
3531 let mut writer_props = opts.writer_props();
3532 if let Some(field) = field {
3533 let arrow_schema = Schema::new(vec![field]);
3534 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3535 }
3536
3537 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3538
3539 for (idx, v) in values.iter().enumerate() {
3540 let def_levels = def_levels.map(|d| d[idx].as_slice());
3541 let mut row_group_writer = writer.next_row_group()?;
3542 {
3543 let mut column_writer = row_group_writer
3544 .next_column()?
3545 .expect("Column writer is none!");
3546
3547 column_writer
3548 .typed::<T>()
3549 .write_batch(v, def_levels, None)?;
3550
3551 column_writer.close()?;
3552 }
3553 row_group_writer.close()?;
3554 }
3555
3556 writer.close()
3557 }
3558
3559 fn get_test_file(file_name: &str) -> File {
3560 let mut path = PathBuf::new();
3561 path.push(arrow::util::test_util::arrow_test_data());
3562 path.push(file_name);
3563
3564 File::open(path.as_path()).expect("File not found!")
3565 }
3566
3567 #[test]
3568 fn test_read_structs() {
3569 let testdata = arrow::util::test_util::parquet_test_data();
3573 let path = format!("{testdata}/nested_structs.rust.parquet");
3574 let file = File::open(&path).unwrap();
3575 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3576
3577 for batch in record_batch_reader {
3578 batch.unwrap();
3579 }
3580
3581 let file = File::open(&path).unwrap();
3582 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3583
3584 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3585 let projected_reader = builder
3586 .with_projection(mask)
3587 .with_batch_size(60)
3588 .build()
3589 .unwrap();
3590
3591 let expected_schema = Schema::new(vec![
3592 Field::new(
3593 "roll_num",
3594 ArrowDataType::Struct(Fields::from(vec![Field::new(
3595 "count",
3596 ArrowDataType::UInt64,
3597 false,
3598 )])),
3599 false,
3600 ),
3601 Field::new(
3602 "PC_CUR",
3603 ArrowDataType::Struct(Fields::from(vec![
3604 Field::new("mean", ArrowDataType::Int64, false),
3605 Field::new("sum", ArrowDataType::Int64, false),
3606 ])),
3607 false,
3608 ),
3609 ]);
3610
3611 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3613
3614 for batch in projected_reader {
3615 let batch = batch.unwrap();
3616 assert_eq!(batch.schema().as_ref(), &expected_schema);
3617 }
3618 }
3619
3620 #[test]
3621 fn test_read_structs_by_name() {
3623 let testdata = arrow::util::test_util::parquet_test_data();
3624 let path = format!("{testdata}/nested_structs.rust.parquet");
3625 let file = File::open(&path).unwrap();
3626 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3627
3628 for batch in record_batch_reader {
3629 batch.unwrap();
3630 }
3631
3632 let file = File::open(&path).unwrap();
3633 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3634
3635 let mask = ProjectionMask::columns(
3636 builder.parquet_schema(),
3637 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3638 );
3639 let projected_reader = builder
3640 .with_projection(mask)
3641 .with_batch_size(60)
3642 .build()
3643 .unwrap();
3644
3645 let expected_schema = Schema::new(vec![
3646 Field::new(
3647 "roll_num",
3648 ArrowDataType::Struct(Fields::from(vec![Field::new(
3649 "count",
3650 ArrowDataType::UInt64,
3651 false,
3652 )])),
3653 false,
3654 ),
3655 Field::new(
3656 "PC_CUR",
3657 ArrowDataType::Struct(Fields::from(vec![
3658 Field::new("mean", ArrowDataType::Int64, false),
3659 Field::new("sum", ArrowDataType::Int64, false),
3660 ])),
3661 false,
3662 ),
3663 ]);
3664
3665 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3666
3667 for batch in projected_reader {
3668 let batch = batch.unwrap();
3669 assert_eq!(batch.schema().as_ref(), &expected_schema);
3670 }
3671 }
3672
3673 #[test]
3674 fn test_read_maps() {
3675 let testdata = arrow::util::test_util::parquet_test_data();
3676 let path = format!("{testdata}/nested_maps.snappy.parquet");
3677 let file = File::open(path).unwrap();
3678 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3679
3680 for batch in record_batch_reader {
3681 batch.unwrap();
3682 }
3683 }
3684
3685 #[test]
3686 fn test_nested_nullability() {
3687 let message_type = "message nested {
3688 OPTIONAL Group group {
3689 REQUIRED INT32 leaf;
3690 }
3691 }";
3692
3693 let file = tempfile::tempfile().unwrap();
3694 let schema = Arc::new(parse_message_type(message_type).unwrap());
3695
3696 {
3697 let mut writer =
3699 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3700 .unwrap();
3701
3702 {
3703 let mut row_group_writer = writer.next_row_group().unwrap();
3704 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3705
3706 column_writer
3707 .typed::<Int32Type>()
3708 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3709 .unwrap();
3710
3711 column_writer.close().unwrap();
3712 row_group_writer.close().unwrap();
3713 }
3714
3715 writer.close().unwrap();
3716 }
3717
3718 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3719 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3720
3721 let reader = builder.with_projection(mask).build().unwrap();
3722
3723 let expected_schema = Schema::new(vec![Field::new(
3724 "group",
3725 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3726 true,
3727 )]);
3728
3729 let batch = reader.into_iter().next().unwrap().unwrap();
3730 assert_eq!(batch.schema().as_ref(), &expected_schema);
3731 assert_eq!(batch.num_rows(), 4);
3732 assert_eq!(batch.column(0).null_count(), 2);
3733 }
3734
3735 #[test]
3736 fn test_invalid_utf8() {
3737 let data = vec![
3739 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3740 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3741 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3742 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3743 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3744 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3745 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3746 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3747 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3748 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3749 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3750 82, 49,
3751 ];
3752
3753 let file = Bytes::from(data);
3754 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3755
3756 let error = record_batch_reader.next().unwrap().unwrap_err();
3757
3758 assert!(
3759 error.to_string().contains("invalid utf-8 sequence"),
3760 "{}",
3761 error
3762 );
3763 }
3764
3765 #[test]
3766 fn test_invalid_utf8_string_array() {
3767 test_invalid_utf8_string_array_inner::<i32>();
3768 }
3769
3770 #[test]
3771 fn test_invalid_utf8_large_string_array() {
3772 test_invalid_utf8_string_array_inner::<i64>();
3773 }
3774
3775 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3776 let cases = [
3777 invalid_utf8_first_char::<O>(),
3778 invalid_utf8_first_char_long_strings::<O>(),
3779 invalid_utf8_later_char::<O>(),
3780 invalid_utf8_later_char_long_strings::<O>(),
3781 invalid_utf8_later_char_really_long_strings::<O>(),
3782 invalid_utf8_later_char_really_long_strings2::<O>(),
3783 ];
3784 for array in &cases {
3785 for encoding in STRING_ENCODINGS {
3786 let array = unsafe {
3789 GenericStringArray::<O>::new_unchecked(
3790 array.offsets().clone(),
3791 array.values().clone(),
3792 array.nulls().cloned(),
3793 )
3794 };
3795 let data_type = array.data_type().clone();
3796 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3797 let err = read_from_parquet(data).unwrap_err();
3798 let expected_err =
3799 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3800 assert!(
3801 err.to_string().contains(expected_err),
3802 "data type: {data_type}, expected: {expected_err}, got: {err}"
3803 );
3804 }
3805 }
3806 }
3807
3808 #[test]
3809 fn test_invalid_utf8_string_view_array() {
3810 let cases = [
3811 invalid_utf8_first_char::<i32>(),
3812 invalid_utf8_first_char_long_strings::<i32>(),
3813 invalid_utf8_later_char::<i32>(),
3814 invalid_utf8_later_char_long_strings::<i32>(),
3815 invalid_utf8_later_char_really_long_strings::<i32>(),
3816 invalid_utf8_later_char_really_long_strings2::<i32>(),
3817 ];
3818
3819 for encoding in STRING_ENCODINGS {
3820 for array in &cases {
3821 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3822 let array = array.as_binary_view();
3823
3824 let array = unsafe {
3827 StringViewArray::new_unchecked(
3828 array.views().clone(),
3829 array.data_buffers().to_vec(),
3830 array.nulls().cloned(),
3831 )
3832 };
3833
3834 let data_type = array.data_type().clone();
3835 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3836 let err = read_from_parquet(data).unwrap_err();
3837 let expected_err =
3838 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3839 assert!(
3840 err.to_string().contains(expected_err),
3841 "data type: {data_type}, expected: {expected_err}, got: {err}"
3842 );
3843 }
3844 }
3845 }
3846
3847 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3849 None,
3850 Some(Encoding::PLAIN),
3851 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3852 Some(Encoding::DELTA_BYTE_ARRAY),
3853 ];
3854
3855 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3858
3859 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3862
3863 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3865 let valid: &[u8] = b" ";
3866 let invalid = INVALID_UTF8_FIRST_CHAR;
3867 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3868 }
3869
3870 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3874 let valid: &[u8] = b" ";
3875 let mut invalid = vec![];
3876 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3877 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3878 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3879 }
3880
3881 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3884 let valid: &[u8] = b" ";
3885 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3886 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3887 }
3888
3889 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3893 let valid: &[u8] = b" ";
3894 let mut invalid = vec![];
3895 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3896 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3897 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3898 }
3899
3900 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3904 let valid: &[u8] = b" ";
3905 let mut invalid = vec![];
3906 for _ in 0..10 {
3907 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3909 }
3910 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3911 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3912 }
3913
3914 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3917 let valid: &[u8] = b" ";
3918 let mut valid_long = vec![];
3919 for _ in 0..10 {
3920 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3922 }
3923 let invalid = INVALID_UTF8_LATER_CHAR;
3924 GenericBinaryArray::<O>::from_iter(vec![
3925 None,
3926 Some(valid),
3927 Some(invalid),
3928 None,
3929 Some(&valid_long),
3930 Some(valid),
3931 ])
3932 }
3933
3934 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3939 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3940 let mut data = vec![];
3941 let schema = batch.schema();
3942 let props = encoding.map(|encoding| {
3943 WriterProperties::builder()
3944 .set_dictionary_enabled(false)
3946 .set_encoding(encoding)
3947 .build()
3948 });
3949
3950 {
3951 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3952 writer.write(&batch).unwrap();
3953 writer.flush().unwrap();
3954 writer.close().unwrap();
3955 };
3956 data
3957 }
3958
3959 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3961 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3962 .unwrap()
3963 .build()
3964 .unwrap();
3965
3966 reader.collect()
3967 }
3968
3969 #[test]
3970 fn test_dictionary_preservation() {
3971 let fields = vec![Arc::new(
3972 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3973 .with_repetition(Repetition::OPTIONAL)
3974 .with_converted_type(ConvertedType::UTF8)
3975 .build()
3976 .unwrap(),
3977 )];
3978
3979 let schema = Arc::new(
3980 Type::group_type_builder("test_schema")
3981 .with_fields(fields)
3982 .build()
3983 .unwrap(),
3984 );
3985
3986 let dict_type = ArrowDataType::Dictionary(
3987 Box::new(ArrowDataType::Int32),
3988 Box::new(ArrowDataType::Utf8),
3989 );
3990
3991 let arrow_field = Field::new("leaf", dict_type, true);
3992
3993 let mut file = tempfile::tempfile().unwrap();
3994
3995 let values = vec![
3996 vec![
3997 ByteArray::from("hello"),
3998 ByteArray::from("a"),
3999 ByteArray::from("b"),
4000 ByteArray::from("d"),
4001 ],
4002 vec![
4003 ByteArray::from("c"),
4004 ByteArray::from("a"),
4005 ByteArray::from("b"),
4006 ],
4007 ];
4008
4009 let def_levels = vec![
4010 vec![1, 0, 0, 1, 0, 0, 1, 1],
4011 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
4012 ];
4013
4014 let opts = TestOptions {
4015 encoding: Encoding::RLE_DICTIONARY,
4016 ..Default::default()
4017 };
4018
4019 generate_single_column_file_with_data::<ByteArrayType>(
4020 &values,
4021 Some(&def_levels),
4022 file.try_clone().unwrap(), schema,
4024 Some(arrow_field),
4025 &opts,
4026 )
4027 .unwrap();
4028
4029 file.rewind().unwrap();
4030
4031 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
4032
4033 let batches = record_reader
4034 .collect::<Result<Vec<RecordBatch>, _>>()
4035 .unwrap();
4036
4037 assert_eq!(batches.len(), 6);
4038 assert!(batches.iter().all(|x| x.num_columns() == 1));
4039
4040 let row_counts = batches
4041 .iter()
4042 .map(|x| (x.num_rows(), x.column(0).null_count()))
4043 .collect::<Vec<_>>();
4044
4045 assert_eq!(
4046 row_counts,
4047 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
4048 );
4049
4050 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
4051
4052 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
4054 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
4056 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
4057 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
4059 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
4060 }
4061
4062 #[test]
4063 fn test_read_null_list() {
4064 let testdata = arrow::util::test_util::parquet_test_data();
4065 let path = format!("{testdata}/null_list.parquet");
4066 let file = File::open(path).unwrap();
4067 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
4068
4069 let batch = record_batch_reader.next().unwrap().unwrap();
4070 assert_eq!(batch.num_rows(), 1);
4071 assert_eq!(batch.num_columns(), 1);
4072 assert_eq!(batch.column(0).len(), 1);
4073
4074 let list = batch
4075 .column(0)
4076 .as_any()
4077 .downcast_ref::<ListArray>()
4078 .unwrap();
4079 assert_eq!(list.len(), 1);
4080 assert!(list.is_valid(0));
4081
4082 let val = list.value(0);
4083 assert_eq!(val.len(), 0);
4084 }
4085
4086 #[test]
4087 fn test_null_schema_inference() {
4088 let testdata = arrow::util::test_util::parquet_test_data();
4089 let path = format!("{testdata}/null_list.parquet");
4090 let file = File::open(path).unwrap();
4091
4092 let arrow_field = Field::new(
4093 "emptylist",
4094 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
4095 true,
4096 );
4097
4098 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4099 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
4100 let schema = builder.schema();
4101 assert_eq!(schema.fields().len(), 1);
4102 assert_eq!(schema.field(0), &arrow_field);
4103 }
4104
4105 #[test]
4106 fn test_skip_metadata() {
4107 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
4108 let field = Field::new("col", col.data_type().clone(), true);
4109
4110 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
4111
4112 let metadata = [("key".to_string(), "value".to_string())]
4113 .into_iter()
4114 .collect();
4115
4116 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
4117
4118 assert_ne!(schema_with_metadata, schema_without_metadata);
4119
4120 let batch =
4121 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
4122
4123 let file = |version: WriterVersion| {
4124 let props = WriterProperties::builder()
4125 .set_writer_version(version)
4126 .build();
4127
4128 let file = tempfile().unwrap();
4129 let mut writer =
4130 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
4131 .unwrap();
4132 writer.write(&batch).unwrap();
4133 writer.close().unwrap();
4134 file
4135 };
4136
4137 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4138
4139 let v1_reader = file(WriterVersion::PARQUET_1_0);
4140 let v2_reader = file(WriterVersion::PARQUET_2_0);
4141
4142 let arrow_reader =
4143 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
4144 assert_eq!(arrow_reader.schema(), schema_with_metadata);
4145
4146 let reader =
4147 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
4148 .unwrap()
4149 .build()
4150 .unwrap();
4151 assert_eq!(reader.schema(), schema_without_metadata);
4152
4153 let arrow_reader =
4154 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4155 assert_eq!(arrow_reader.schema(), schema_with_metadata);
4156
4157 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4158 .unwrap()
4159 .build()
4160 .unwrap();
4161 assert_eq!(reader.schema(), schema_without_metadata);
4162 }
4163
4164 fn write_parquet_from_iter<I, F>(value: I) -> File
4165 where
4166 I: IntoIterator<Item = (F, ArrayRef)>,
4167 F: AsRef<str>,
4168 {
4169 let batch = RecordBatch::try_from_iter(value).unwrap();
4170 let file = tempfile().unwrap();
4171 let mut writer =
4172 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4173 writer.write(&batch).unwrap();
4174 writer.close().unwrap();
4175 file
4176 }
4177
4178 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4179 where
4180 I: IntoIterator<Item = (F, ArrayRef)>,
4181 F: AsRef<str>,
4182 {
4183 let file = write_parquet_from_iter(value);
4184 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4185 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4186 file.try_clone().unwrap(),
4187 options_with_schema,
4188 );
4189 assert_eq!(builder.err().unwrap().to_string(), expected_error);
4190 }
4191
4192 #[test]
4193 fn test_schema_too_few_columns() {
4194 run_schema_test_with_error(
4195 vec![
4196 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4197 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4198 ],
4199 Arc::new(Schema::new(vec![Field::new(
4200 "int64",
4201 ArrowDataType::Int64,
4202 false,
4203 )])),
4204 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4205 );
4206 }
4207
4208 #[test]
4209 fn test_schema_too_many_columns() {
4210 run_schema_test_with_error(
4211 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4212 Arc::new(Schema::new(vec![
4213 Field::new("int64", ArrowDataType::Int64, false),
4214 Field::new("int32", ArrowDataType::Int32, false),
4215 ])),
4216 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4217 );
4218 }
4219
4220 #[test]
4221 fn test_schema_mismatched_column_names() {
4222 run_schema_test_with_error(
4223 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4224 Arc::new(Schema::new(vec![Field::new(
4225 "other",
4226 ArrowDataType::Int64,
4227 false,
4228 )])),
4229 "Arrow: incompatible arrow schema, expected field named int64 got other",
4230 );
4231 }
4232
4233 #[test]
4234 fn test_schema_incompatible_columns() {
4235 run_schema_test_with_error(
4236 vec![
4237 (
4238 "col1_invalid",
4239 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4240 ),
4241 (
4242 "col2_valid",
4243 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4244 ),
4245 (
4246 "col3_invalid",
4247 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4248 ),
4249 ],
4250 Arc::new(Schema::new(vec![
4251 Field::new("col1_invalid", ArrowDataType::Int32, false),
4252 Field::new("col2_valid", ArrowDataType::Int32, false),
4253 Field::new("col3_invalid", ArrowDataType::Int32, false),
4254 ])),
4255 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4256 );
4257 }
4258
4259 #[test]
4260 fn test_one_incompatible_nested_column() {
4261 let nested_fields = Fields::from(vec![
4262 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4263 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4264 ]);
4265 let nested = StructArray::try_new(
4266 nested_fields,
4267 vec![
4268 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4269 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4270 ],
4271 None,
4272 )
4273 .expect("struct array");
4274 let supplied_nested_fields = Fields::from(vec![
4275 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4276 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4277 ]);
4278 run_schema_test_with_error(
4279 vec![
4280 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4281 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4282 ("nested", Arc::new(nested) as ArrayRef),
4283 ],
4284 Arc::new(Schema::new(vec![
4285 Field::new("col1", ArrowDataType::Int64, false),
4286 Field::new("col2", ArrowDataType::Int32, false),
4287 Field::new(
4288 "nested",
4289 ArrowDataType::Struct(supplied_nested_fields),
4290 false,
4291 ),
4292 ])),
4293 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4294 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4295 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4296 );
4297 }
4298
4299 fn utf8_parquet() -> Bytes {
4301 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4302 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4303 let props = None;
4304 let mut parquet_data = vec![];
4306 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4307 writer.write(&batch).unwrap();
4308 writer.close().unwrap();
4309 Bytes::from(parquet_data)
4310 }
4311
4312 #[test]
4313 fn test_schema_error_bad_types() {
4314 let parquet_data = utf8_parquet();
4316
4317 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4319 "column1",
4320 arrow::datatypes::DataType::Int32,
4321 false,
4322 )]));
4323
4324 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4326 let err =
4327 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4328 .unwrap_err();
4329 assert_eq!(
4330 err.to_string(),
4331 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4332 )
4333 }
4334
4335 #[test]
4336 fn test_schema_error_bad_nullability() {
4337 let parquet_data = utf8_parquet();
4339
4340 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4342 "column1",
4343 arrow::datatypes::DataType::Utf8,
4344 true,
4345 )]));
4346
4347 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4349 let err =
4350 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4351 .unwrap_err();
4352 assert_eq!(
4353 err.to_string(),
4354 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4355 )
4356 }
4357
4358 #[test]
4359 fn test_read_binary_as_utf8() {
4360 let file = write_parquet_from_iter(vec![
4361 (
4362 "binary_to_utf8",
4363 Arc::new(BinaryArray::from(vec![
4364 b"one".as_ref(),
4365 b"two".as_ref(),
4366 b"three".as_ref(),
4367 ])) as ArrayRef,
4368 ),
4369 (
4370 "large_binary_to_large_utf8",
4371 Arc::new(LargeBinaryArray::from(vec![
4372 b"one".as_ref(),
4373 b"two".as_ref(),
4374 b"three".as_ref(),
4375 ])) as ArrayRef,
4376 ),
4377 (
4378 "binary_view_to_utf8_view",
4379 Arc::new(BinaryViewArray::from(vec![
4380 b"one".as_ref(),
4381 b"two".as_ref(),
4382 b"three".as_ref(),
4383 ])) as ArrayRef,
4384 ),
4385 ]);
4386 let supplied_fields = Fields::from(vec![
4387 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4388 Field::new(
4389 "large_binary_to_large_utf8",
4390 ArrowDataType::LargeUtf8,
4391 false,
4392 ),
4393 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4394 ]);
4395
4396 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4397 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4398 file.try_clone().unwrap(),
4399 options,
4400 )
4401 .expect("reader builder with schema")
4402 .build()
4403 .expect("reader with schema");
4404
4405 let batch = arrow_reader.next().unwrap().unwrap();
4406 assert_eq!(batch.num_columns(), 3);
4407 assert_eq!(batch.num_rows(), 3);
4408 assert_eq!(
4409 batch
4410 .column(0)
4411 .as_string::<i32>()
4412 .iter()
4413 .collect::<Vec<_>>(),
4414 vec![Some("one"), Some("two"), Some("three")]
4415 );
4416
4417 assert_eq!(
4418 batch
4419 .column(1)
4420 .as_string::<i64>()
4421 .iter()
4422 .collect::<Vec<_>>(),
4423 vec![Some("one"), Some("two"), Some("three")]
4424 );
4425
4426 assert_eq!(
4427 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4428 vec![Some("one"), Some("two"), Some("three")]
4429 );
4430 }
4431
4432 #[test]
4433 #[should_panic(expected = "Invalid UTF8 sequence at")]
4434 fn test_read_non_utf8_binary_as_utf8() {
4435 let file = write_parquet_from_iter(vec![(
4436 "non_utf8_binary",
4437 Arc::new(BinaryArray::from(vec![
4438 b"\xDE\x00\xFF".as_ref(),
4439 b"\xDE\x01\xAA".as_ref(),
4440 b"\xDE\x02\xFF".as_ref(),
4441 ])) as ArrayRef,
4442 )]);
4443 let supplied_fields = Fields::from(vec![Field::new(
4444 "non_utf8_binary",
4445 ArrowDataType::Utf8,
4446 false,
4447 )]);
4448
4449 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4450 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4451 file.try_clone().unwrap(),
4452 options,
4453 )
4454 .expect("reader builder with schema")
4455 .build()
4456 .expect("reader with schema");
4457 arrow_reader.next().unwrap().unwrap_err();
4458 }
4459
4460 #[test]
4461 fn test_with_schema() {
4462 let nested_fields = Fields::from(vec![
4463 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4464 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4465 ]);
4466
4467 let nested_arrays: Vec<ArrayRef> = vec![
4468 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4469 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4470 ];
4471
4472 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4473
4474 let file = write_parquet_from_iter(vec![
4475 (
4476 "int32_to_ts_second",
4477 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4478 ),
4479 (
4480 "date32_to_date64",
4481 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4482 ),
4483 ("nested", Arc::new(nested) as ArrayRef),
4484 ]);
4485
4486 let supplied_nested_fields = Fields::from(vec![
4487 Field::new(
4488 "utf8_to_dict",
4489 ArrowDataType::Dictionary(
4490 Box::new(ArrowDataType::Int32),
4491 Box::new(ArrowDataType::Utf8),
4492 ),
4493 false,
4494 ),
4495 Field::new(
4496 "int64_to_ts_nano",
4497 ArrowDataType::Timestamp(
4498 arrow::datatypes::TimeUnit::Nanosecond,
4499 Some("+10:00".into()),
4500 ),
4501 false,
4502 ),
4503 ]);
4504
4505 let supplied_schema = Arc::new(Schema::new(vec![
4506 Field::new(
4507 "int32_to_ts_second",
4508 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4509 false,
4510 ),
4511 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4512 Field::new(
4513 "nested",
4514 ArrowDataType::Struct(supplied_nested_fields),
4515 false,
4516 ),
4517 ]));
4518
4519 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4520 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4521 file.try_clone().unwrap(),
4522 options,
4523 )
4524 .expect("reader builder with schema")
4525 .build()
4526 .expect("reader with schema");
4527
4528 assert_eq!(arrow_reader.schema(), supplied_schema);
4529 let batch = arrow_reader.next().unwrap().unwrap();
4530 assert_eq!(batch.num_columns(), 3);
4531 assert_eq!(batch.num_rows(), 4);
4532 assert_eq!(
4533 batch
4534 .column(0)
4535 .as_any()
4536 .downcast_ref::<TimestampSecondArray>()
4537 .expect("downcast to timestamp second")
4538 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4539 .map(|v| v.to_string())
4540 .expect("value as datetime"),
4541 "1970-01-01 01:00:00 +01:00"
4542 );
4543 assert_eq!(
4544 batch
4545 .column(1)
4546 .as_any()
4547 .downcast_ref::<Date64Array>()
4548 .expect("downcast to date64")
4549 .value_as_date(0)
4550 .map(|v| v.to_string())
4551 .expect("value as date"),
4552 "1970-01-01"
4553 );
4554
4555 let nested = batch
4556 .column(2)
4557 .as_any()
4558 .downcast_ref::<StructArray>()
4559 .expect("downcast to struct");
4560
4561 let nested_dict = nested
4562 .column(0)
4563 .as_any()
4564 .downcast_ref::<Int32DictionaryArray>()
4565 .expect("downcast to dictionary");
4566
4567 assert_eq!(
4568 nested_dict
4569 .values()
4570 .as_any()
4571 .downcast_ref::<StringArray>()
4572 .expect("downcast to string")
4573 .iter()
4574 .collect::<Vec<_>>(),
4575 vec![Some("a"), Some("b")]
4576 );
4577
4578 assert_eq!(
4579 nested_dict.keys().iter().collect::<Vec<_>>(),
4580 vec![Some(0), Some(0), Some(0), Some(1)]
4581 );
4582
4583 assert_eq!(
4584 nested
4585 .column(1)
4586 .as_any()
4587 .downcast_ref::<TimestampNanosecondArray>()
4588 .expect("downcast to timestamp nanosecond")
4589 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4590 .map(|v| v.to_string())
4591 .expect("value as datetime"),
4592 "1970-01-01 10:00:00.000000001 +10:00"
4593 );
4594 }
4595
4596 #[test]
4597 fn test_empty_projection() {
4598 let testdata = arrow::util::test_util::parquet_test_data();
4599 let path = format!("{testdata}/alltypes_plain.parquet");
4600 let file = File::open(path).unwrap();
4601
4602 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4603 let file_metadata = builder.metadata().file_metadata();
4604 let expected_rows = file_metadata.num_rows() as usize;
4605
4606 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4607 let batch_reader = builder
4608 .with_projection(mask)
4609 .with_batch_size(2)
4610 .build()
4611 .unwrap();
4612
4613 let mut total_rows = 0;
4614 for maybe_batch in batch_reader {
4615 let batch = maybe_batch.unwrap();
4616 total_rows += batch.num_rows();
4617 assert_eq!(batch.num_columns(), 0);
4618 assert!(batch.num_rows() <= 2);
4619 }
4620
4621 assert_eq!(total_rows, expected_rows);
4622 }
4623
4624 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4625 let schema = Arc::new(Schema::new(vec![Field::new(
4626 "list",
4627 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4628 true,
4629 )]));
4630
4631 let mut buf = Vec::with_capacity(1024);
4632
4633 let mut writer = ArrowWriter::try_new(
4634 &mut buf,
4635 schema.clone(),
4636 Some(
4637 WriterProperties::builder()
4638 .set_max_row_group_size(row_group_size)
4639 .build(),
4640 ),
4641 )
4642 .unwrap();
4643 for _ in 0..2 {
4644 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4645 for _ in 0..(batch_size) {
4646 list_builder.append(true);
4647 }
4648 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4649 .unwrap();
4650 writer.write(&batch).unwrap();
4651 }
4652 writer.close().unwrap();
4653
4654 let mut record_reader =
4655 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4656 assert_eq!(
4657 batch_size,
4658 record_reader.next().unwrap().unwrap().num_rows()
4659 );
4660 assert_eq!(
4661 batch_size,
4662 record_reader.next().unwrap().unwrap().num_rows()
4663 );
4664 }
4665
4666 #[test]
4667 fn test_row_group_exact_multiple() {
4668 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4669 test_row_group_batch(8, 8);
4670 test_row_group_batch(10, 8);
4671 test_row_group_batch(8, 10);
4672 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4673 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4674 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4675 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4676 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4677 }
4678
4679 fn get_expected_batches(
4682 column: &RecordBatch,
4683 selection: &RowSelection,
4684 batch_size: usize,
4685 ) -> Vec<RecordBatch> {
4686 let mut expected_batches = vec![];
4687
4688 let mut selection: VecDeque<_> = selection.clone().into();
4689 let mut row_offset = 0;
4690 let mut last_start = None;
4691 while row_offset < column.num_rows() && !selection.is_empty() {
4692 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4693 while batch_remaining > 0 && !selection.is_empty() {
4694 let (to_read, skip) = match selection.front_mut() {
4695 Some(selection) if selection.row_count > batch_remaining => {
4696 selection.row_count -= batch_remaining;
4697 (batch_remaining, selection.skip)
4698 }
4699 Some(_) => {
4700 let select = selection.pop_front().unwrap();
4701 (select.row_count, select.skip)
4702 }
4703 None => break,
4704 };
4705
4706 batch_remaining -= to_read;
4707
4708 match skip {
4709 true => {
4710 if let Some(last_start) = last_start.take() {
4711 expected_batches.push(column.slice(last_start, row_offset - last_start))
4712 }
4713 row_offset += to_read
4714 }
4715 false => {
4716 last_start.get_or_insert(row_offset);
4717 row_offset += to_read
4718 }
4719 }
4720 }
4721 }
4722
4723 if let Some(last_start) = last_start.take() {
4724 expected_batches.push(column.slice(last_start, row_offset - last_start))
4725 }
4726
4727 for batch in &expected_batches[..expected_batches.len() - 1] {
4729 assert_eq!(batch.num_rows(), batch_size);
4730 }
4731
4732 expected_batches
4733 }
4734
4735 fn create_test_selection(
4736 step_len: usize,
4737 total_len: usize,
4738 skip_first: bool,
4739 ) -> (RowSelection, usize) {
4740 let mut remaining = total_len;
4741 let mut skip = skip_first;
4742 let mut vec = vec![];
4743 let mut selected_count = 0;
4744 while remaining != 0 {
4745 let step = if remaining > step_len {
4746 step_len
4747 } else {
4748 remaining
4749 };
4750 vec.push(RowSelector {
4751 row_count: step,
4752 skip,
4753 });
4754 remaining -= step;
4755 if !skip {
4756 selected_count += step;
4757 }
4758 skip = !skip;
4759 }
4760 (vec.into(), selected_count)
4761 }
4762
4763 #[test]
4764 fn test_scan_row_with_selection() {
4765 let testdata = arrow::util::test_util::parquet_test_data();
4766 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4767 let test_file = File::open(&path).unwrap();
4768
4769 let mut serial_reader =
4770 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4771 let data = serial_reader.next().unwrap().unwrap();
4772
4773 let do_test = |batch_size: usize, selection_len: usize| {
4774 for skip_first in [false, true] {
4775 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4776
4777 let expected = get_expected_batches(&data, &selections, batch_size);
4778 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4779 assert_eq!(
4780 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4781 expected,
4782 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4783 );
4784 }
4785 };
4786
4787 do_test(1000, 1000);
4790
4791 do_test(20, 20);
4793
4794 do_test(20, 5);
4796
4797 do_test(20, 5);
4800
4801 fn create_skip_reader(
4802 test_file: &File,
4803 batch_size: usize,
4804 selections: RowSelection,
4805 ) -> ParquetRecordBatchReader {
4806 let options =
4807 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4808 let file = test_file.try_clone().unwrap();
4809 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4810 .unwrap()
4811 .with_batch_size(batch_size)
4812 .with_row_selection(selections)
4813 .build()
4814 .unwrap()
4815 }
4816 }
4817
4818 #[test]
4819 fn test_batch_size_overallocate() {
4820 let testdata = arrow::util::test_util::parquet_test_data();
4821 let path = format!("{testdata}/alltypes_plain.parquet");
4823 let test_file = File::open(path).unwrap();
4824
4825 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4826 let num_rows = builder.metadata.file_metadata().num_rows();
4827 let reader = builder
4828 .with_batch_size(1024)
4829 .with_projection(ProjectionMask::all())
4830 .build()
4831 .unwrap();
4832 assert_ne!(1024, num_rows);
4833 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4834 }
4835
4836 #[test]
4837 fn test_read_with_page_index_enabled() {
4838 let testdata = arrow::util::test_util::parquet_test_data();
4839
4840 {
4841 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4843 let test_file = File::open(path).unwrap();
4844 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4845 test_file,
4846 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4847 )
4848 .unwrap();
4849 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4850 let reader = builder.build().unwrap();
4851 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4852 assert_eq!(batches.len(), 8);
4853 }
4854
4855 {
4856 let path = format!("{testdata}/alltypes_plain.parquet");
4858 let test_file = File::open(path).unwrap();
4859 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4860 test_file,
4861 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4862 )
4863 .unwrap();
4864 assert!(builder.metadata().offset_index().is_none());
4867 let reader = builder.build().unwrap();
4868 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4869 assert_eq!(batches.len(), 1);
4870 }
4871 }
4872
4873 #[test]
4874 fn test_raw_repetition() {
4875 const MESSAGE_TYPE: &str = "
4876 message Log {
4877 OPTIONAL INT32 eventType;
4878 REPEATED INT32 category;
4879 REPEATED group filter {
4880 OPTIONAL INT32 error;
4881 }
4882 }
4883 ";
4884 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4885 let props = Default::default();
4886
4887 let mut buf = Vec::with_capacity(1024);
4888 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4889 let mut row_group_writer = writer.next_row_group().unwrap();
4890
4891 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4893 col_writer
4894 .typed::<Int32Type>()
4895 .write_batch(&[1], Some(&[1]), None)
4896 .unwrap();
4897 col_writer.close().unwrap();
4898 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4900 col_writer
4901 .typed::<Int32Type>()
4902 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4903 .unwrap();
4904 col_writer.close().unwrap();
4905 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4907 col_writer
4908 .typed::<Int32Type>()
4909 .write_batch(&[1], Some(&[1]), Some(&[0]))
4910 .unwrap();
4911 col_writer.close().unwrap();
4912
4913 let rg_md = row_group_writer.close().unwrap();
4914 assert_eq!(rg_md.num_rows(), 1);
4915 writer.close().unwrap();
4916
4917 let bytes = Bytes::from(buf);
4918
4919 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4920 let full = no_mask.next().unwrap().unwrap();
4921
4922 assert_eq!(full.num_columns(), 3);
4923
4924 for idx in 0..3 {
4925 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4926 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4927 let mut reader = b.with_projection(mask).build().unwrap();
4928 let projected = reader.next().unwrap().unwrap();
4929
4930 assert_eq!(projected.num_columns(), 1);
4931 assert_eq!(full.column(idx), projected.column(0));
4932 }
4933 }
4934
4935 #[test]
4936 fn test_read_lz4_raw() {
4937 let testdata = arrow::util::test_util::parquet_test_data();
4938 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4939 let file = File::open(path).unwrap();
4940
4941 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4942 .unwrap()
4943 .collect::<Result<Vec<_>, _>>()
4944 .unwrap();
4945 assert_eq!(batches.len(), 1);
4946 let batch = &batches[0];
4947
4948 assert_eq!(batch.num_columns(), 3);
4949 assert_eq!(batch.num_rows(), 4);
4950
4951 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4953 assert_eq!(
4954 a.values(),
4955 &[1593604800, 1593604800, 1593604801, 1593604801]
4956 );
4957
4958 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4959 let a: Vec<_> = a.iter().flatten().collect();
4960 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4961
4962 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4963 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4964 }
4965
4966 #[test]
4976 fn test_read_lz4_hadoop_fallback() {
4977 for file in [
4978 "hadoop_lz4_compressed.parquet",
4979 "non_hadoop_lz4_compressed.parquet",
4980 ] {
4981 let testdata = arrow::util::test_util::parquet_test_data();
4982 let path = format!("{testdata}/{file}");
4983 let file = File::open(path).unwrap();
4984 let expected_rows = 4;
4985
4986 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4987 .unwrap()
4988 .collect::<Result<Vec<_>, _>>()
4989 .unwrap();
4990 assert_eq!(batches.len(), 1);
4991 let batch = &batches[0];
4992
4993 assert_eq!(batch.num_columns(), 3);
4994 assert_eq!(batch.num_rows(), expected_rows);
4995
4996 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4997 assert_eq!(
4998 a.values(),
4999 &[1593604800, 1593604800, 1593604801, 1593604801]
5000 );
5001
5002 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
5003 let b: Vec<_> = b.iter().flatten().collect();
5004 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
5005
5006 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
5007 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
5008 }
5009 }
5010
5011 #[test]
5012 fn test_read_lz4_hadoop_large() {
5013 let testdata = arrow::util::test_util::parquet_test_data();
5014 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
5015 let file = File::open(path).unwrap();
5016 let expected_rows = 10000;
5017
5018 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
5019 .unwrap()
5020 .collect::<Result<Vec<_>, _>>()
5021 .unwrap();
5022 assert_eq!(batches.len(), 1);
5023 let batch = &batches[0];
5024
5025 assert_eq!(batch.num_columns(), 1);
5026 assert_eq!(batch.num_rows(), expected_rows);
5027
5028 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
5029 let a: Vec<_> = a.iter().flatten().collect();
5030 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
5031 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
5032 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
5033 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
5034 }
5035
5036 #[test]
5037 #[cfg(feature = "snap")]
5038 fn test_read_nested_lists() {
5039 let testdata = arrow::util::test_util::parquet_test_data();
5040 let path = format!("{testdata}/nested_lists.snappy.parquet");
5041 let file = File::open(path).unwrap();
5042
5043 let f = file.try_clone().unwrap();
5044 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
5045 let expected = reader.next().unwrap().unwrap();
5046 assert_eq!(expected.num_rows(), 3);
5047
5048 let selection = RowSelection::from(vec![
5049 RowSelector::skip(1),
5050 RowSelector::select(1),
5051 RowSelector::skip(1),
5052 ]);
5053 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5054 .unwrap()
5055 .with_row_selection(selection)
5056 .build()
5057 .unwrap();
5058
5059 let actual = reader.next().unwrap().unwrap();
5060 assert_eq!(actual.num_rows(), 1);
5061 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
5062 }
5063
5064 #[test]
5065 fn test_arbitrary_decimal() {
5066 let values = [1, 2, 3, 4, 5, 6, 7, 8];
5067 let decimals_19_0 = Decimal128Array::from_iter_values(values)
5068 .with_precision_and_scale(19, 0)
5069 .unwrap();
5070 let decimals_12_0 = Decimal128Array::from_iter_values(values)
5071 .with_precision_and_scale(12, 0)
5072 .unwrap();
5073 let decimals_17_10 = Decimal128Array::from_iter_values(values)
5074 .with_precision_and_scale(17, 10)
5075 .unwrap();
5076
5077 let written = RecordBatch::try_from_iter([
5078 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
5079 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
5080 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
5081 ])
5082 .unwrap();
5083
5084 let mut buffer = Vec::with_capacity(1024);
5085 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
5086 writer.write(&written).unwrap();
5087 writer.close().unwrap();
5088
5089 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
5090 .unwrap()
5091 .collect::<Result<Vec<_>, _>>()
5092 .unwrap();
5093
5094 assert_eq!(&written.slice(0, 8), &read[0]);
5095 }
5096
5097 #[test]
5098 fn test_list_skip() {
5099 let mut list = ListBuilder::new(Int32Builder::new());
5100 list.append_value([Some(1), Some(2)]);
5101 list.append_value([Some(3)]);
5102 list.append_value([Some(4)]);
5103 let list = list.finish();
5104 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
5105
5106 let props = WriterProperties::builder()
5108 .set_data_page_row_count_limit(1)
5109 .set_write_batch_size(2)
5110 .build();
5111
5112 let mut buffer = Vec::with_capacity(1024);
5113 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
5114 writer.write(&batch).unwrap();
5115 writer.close().unwrap();
5116
5117 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
5118 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
5119 .unwrap()
5120 .with_row_selection(selection.into())
5121 .build()
5122 .unwrap();
5123 let out = reader.next().unwrap().unwrap();
5124 assert_eq!(out.num_rows(), 1);
5125 assert_eq!(out, batch.slice(2, 1));
5126 }
5127
5128 #[test]
5129 fn test_row_selection_interleaved_skip() -> Result<()> {
5130 let schema = Arc::new(Schema::new(vec![Field::new(
5131 "v",
5132 ArrowDataType::Int32,
5133 false,
5134 )]));
5135
5136 let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
5137 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
5138
5139 let mut buffer = Vec::with_capacity(1024);
5140 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
5141 writer.write(&batch)?;
5142 writer.close()?;
5143
5144 let selection = RowSelection::from(vec![
5145 RowSelector::select(1),
5146 RowSelector::skip(2),
5147 RowSelector::select(2),
5148 ]);
5149
5150 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
5151 .with_batch_size(4)
5152 .with_row_selection(selection)
5153 .build()?;
5154
5155 let out = reader.next().unwrap()?;
5156 assert_eq!(out.num_rows(), 3);
5157 let values = out
5158 .column(0)
5159 .as_primitive::<arrow_array::types::Int32Type>()
5160 .values();
5161 assert_eq!(values, &[0, 3, 4]);
5162 assert!(reader.next().is_none());
5163 Ok(())
5164 }
5165
5166 #[test]
5167 fn test_row_selection_mask_sparse_rows() -> Result<()> {
5168 let schema = Arc::new(Schema::new(vec![Field::new(
5169 "v",
5170 ArrowDataType::Int32,
5171 false,
5172 )]));
5173
5174 let values = Int32Array::from((0..30).collect::<Vec<i32>>());
5175 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
5176
5177 let mut buffer = Vec::with_capacity(1024);
5178 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
5179 writer.write(&batch)?;
5180 writer.close()?;
5181
5182 let total_rows = batch.num_rows();
5183 let ranges = (1..total_rows)
5184 .step_by(2)
5185 .map(|i| i..i + 1)
5186 .collect::<Vec<_>>();
5187 let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
5188
5189 let selectors: Vec<RowSelector> = selection.clone().into();
5190 assert!(total_rows < selectors.len() * 8);
5191
5192 let bytes = Bytes::from(buffer);
5193
5194 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
5195 .with_batch_size(7)
5196 .with_row_selection(selection)
5197 .build()?;
5198
5199 let mut collected = Vec::new();
5200 for batch in reader {
5201 let batch = batch?;
5202 collected.extend_from_slice(
5203 batch
5204 .column(0)
5205 .as_primitive::<arrow_array::types::Int32Type>()
5206 .values(),
5207 );
5208 }
5209
5210 let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
5211 assert_eq!(collected, expected);
5212 Ok(())
5213 }
5214
5215 fn test_decimal32_roundtrip() {
5216 let d = |values: Vec<i32>, p: u8| {
5217 let iter = values.into_iter();
5218 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5219 .with_precision_and_scale(p, 2)
5220 .unwrap()
5221 };
5222
5223 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5224 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5225
5226 let mut buffer = Vec::with_capacity(1024);
5227 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5228 writer.write(&batch).unwrap();
5229 writer.close().unwrap();
5230
5231 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5232 let t1 = builder.parquet_schema().columns()[0].physical_type();
5233 assert_eq!(t1, PhysicalType::INT32);
5234
5235 let mut reader = builder.build().unwrap();
5236 assert_eq!(batch.schema(), reader.schema());
5237
5238 let out = reader.next().unwrap().unwrap();
5239 assert_eq!(batch, out);
5240 }
5241
5242 fn test_decimal64_roundtrip() {
5243 let d = |values: Vec<i64>, p: u8| {
5247 let iter = values.into_iter();
5248 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5249 .with_precision_and_scale(p, 2)
5250 .unwrap()
5251 };
5252
5253 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5254 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5255 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5256
5257 let batch = RecordBatch::try_from_iter([
5258 ("d1", Arc::new(d1) as ArrayRef),
5259 ("d2", Arc::new(d2) as ArrayRef),
5260 ("d3", Arc::new(d3) as ArrayRef),
5261 ])
5262 .unwrap();
5263
5264 let mut buffer = Vec::with_capacity(1024);
5265 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5266 writer.write(&batch).unwrap();
5267 writer.close().unwrap();
5268
5269 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5270 let t1 = builder.parquet_schema().columns()[0].physical_type();
5271 assert_eq!(t1, PhysicalType::INT32);
5272 let t2 = builder.parquet_schema().columns()[1].physical_type();
5273 assert_eq!(t2, PhysicalType::INT64);
5274 let t3 = builder.parquet_schema().columns()[2].physical_type();
5275 assert_eq!(t3, PhysicalType::INT64);
5276
5277 let mut reader = builder.build().unwrap();
5278 assert_eq!(batch.schema(), reader.schema());
5279
5280 let out = reader.next().unwrap().unwrap();
5281 assert_eq!(batch, out);
5282 }
5283
5284 fn test_decimal_roundtrip<T: DecimalType>() {
5285 let d = |values: Vec<usize>, p: u8| {
5290 let iter = values.into_iter().map(T::Native::usize_as);
5291 PrimitiveArray::<T>::from_iter_values(iter)
5292 .with_precision_and_scale(p, 2)
5293 .unwrap()
5294 };
5295
5296 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5297 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5298 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5299 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5300
5301 let batch = RecordBatch::try_from_iter([
5302 ("d1", Arc::new(d1) as ArrayRef),
5303 ("d2", Arc::new(d2) as ArrayRef),
5304 ("d3", Arc::new(d3) as ArrayRef),
5305 ("d4", Arc::new(d4) as ArrayRef),
5306 ])
5307 .unwrap();
5308
5309 let mut buffer = Vec::with_capacity(1024);
5310 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5311 writer.write(&batch).unwrap();
5312 writer.close().unwrap();
5313
5314 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5315 let t1 = builder.parquet_schema().columns()[0].physical_type();
5316 assert_eq!(t1, PhysicalType::INT32);
5317 let t2 = builder.parquet_schema().columns()[1].physical_type();
5318 assert_eq!(t2, PhysicalType::INT64);
5319 let t3 = builder.parquet_schema().columns()[2].physical_type();
5320 assert_eq!(t3, PhysicalType::INT64);
5321 let t4 = builder.parquet_schema().columns()[3].physical_type();
5322 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5323
5324 let mut reader = builder.build().unwrap();
5325 assert_eq!(batch.schema(), reader.schema());
5326
5327 let out = reader.next().unwrap().unwrap();
5328 assert_eq!(batch, out);
5329 }
5330
5331 #[test]
5332 fn test_decimal() {
5333 test_decimal32_roundtrip();
5334 test_decimal64_roundtrip();
5335 test_decimal_roundtrip::<Decimal128Type>();
5336 test_decimal_roundtrip::<Decimal256Type>();
5337 }
5338
5339 #[test]
5340 fn test_list_selection() {
5341 let schema = Arc::new(Schema::new(vec![Field::new_list(
5342 "list",
5343 Field::new_list_field(ArrowDataType::Utf8, true),
5344 false,
5345 )]));
5346 let mut buf = Vec::with_capacity(1024);
5347
5348 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5349
5350 for i in 0..2 {
5351 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5352 for j in 0..1024 {
5353 list_a_builder.values().append_value(format!("{i} {j}"));
5354 list_a_builder.append(true);
5355 }
5356 let batch =
5357 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5358 .unwrap();
5359 writer.write(&batch).unwrap();
5360 }
5361 let _metadata = writer.close().unwrap();
5362
5363 let buf = Bytes::from(buf);
5364 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5365 .unwrap()
5366 .with_row_selection(RowSelection::from(vec![
5367 RowSelector::skip(100),
5368 RowSelector::select(924),
5369 RowSelector::skip(100),
5370 RowSelector::select(924),
5371 ]))
5372 .build()
5373 .unwrap();
5374
5375 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5376 let batch = concat_batches(&schema, &batches).unwrap();
5377
5378 assert_eq!(batch.num_rows(), 924 * 2);
5379 let list = batch.column(0).as_list::<i32>();
5380
5381 for w in list.value_offsets().windows(2) {
5382 assert_eq!(w[0] + 1, w[1])
5383 }
5384 let mut values = list.values().as_string::<i32>().iter();
5385
5386 for i in 0..2 {
5387 for j in 100..1024 {
5388 let expected = format!("{i} {j}");
5389 assert_eq!(values.next().unwrap().unwrap(), &expected);
5390 }
5391 }
5392 }
5393
5394 #[test]
5395 fn test_list_selection_fuzz() {
5396 let mut rng = rng();
5397 let schema = Arc::new(Schema::new(vec![Field::new_list(
5398 "list",
5399 Field::new_list(
5400 Field::LIST_FIELD_DEFAULT_NAME,
5401 Field::new_list_field(ArrowDataType::Int32, true),
5402 true,
5403 ),
5404 true,
5405 )]));
5406 let mut buf = Vec::with_capacity(1024);
5407 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5408
5409 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5410
5411 for _ in 0..2048 {
5412 if rng.random_bool(0.2) {
5413 list_a_builder.append(false);
5414 continue;
5415 }
5416
5417 let list_a_len = rng.random_range(0..10);
5418 let list_b_builder = list_a_builder.values();
5419
5420 for _ in 0..list_a_len {
5421 if rng.random_bool(0.2) {
5422 list_b_builder.append(false);
5423 continue;
5424 }
5425
5426 let list_b_len = rng.random_range(0..10);
5427 let int_builder = list_b_builder.values();
5428 for _ in 0..list_b_len {
5429 match rng.random_bool(0.2) {
5430 true => int_builder.append_null(),
5431 false => int_builder.append_value(rng.random()),
5432 }
5433 }
5434 list_b_builder.append(true)
5435 }
5436 list_a_builder.append(true);
5437 }
5438
5439 let array = Arc::new(list_a_builder.finish());
5440 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5441
5442 writer.write(&batch).unwrap();
5443 let _metadata = writer.close().unwrap();
5444
5445 let buf = Bytes::from(buf);
5446
5447 let cases = [
5448 vec![
5449 RowSelector::skip(100),
5450 RowSelector::select(924),
5451 RowSelector::skip(100),
5452 RowSelector::select(924),
5453 ],
5454 vec![
5455 RowSelector::select(924),
5456 RowSelector::skip(100),
5457 RowSelector::select(924),
5458 RowSelector::skip(100),
5459 ],
5460 vec![
5461 RowSelector::skip(1023),
5462 RowSelector::select(1),
5463 RowSelector::skip(1023),
5464 RowSelector::select(1),
5465 ],
5466 vec![
5467 RowSelector::select(1),
5468 RowSelector::skip(1023),
5469 RowSelector::select(1),
5470 RowSelector::skip(1023),
5471 ],
5472 ];
5473
5474 for batch_size in [100, 1024, 2048] {
5475 for selection in &cases {
5476 let selection = RowSelection::from(selection.clone());
5477 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5478 .unwrap()
5479 .with_row_selection(selection.clone())
5480 .with_batch_size(batch_size)
5481 .build()
5482 .unwrap();
5483
5484 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5485 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5486 assert_eq!(actual.num_rows(), selection.row_count());
5487
5488 let mut batch_offset = 0;
5489 let mut actual_offset = 0;
5490 for selector in selection.iter() {
5491 if selector.skip {
5492 batch_offset += selector.row_count;
5493 continue;
5494 }
5495
5496 assert_eq!(
5497 batch.slice(batch_offset, selector.row_count),
5498 actual.slice(actual_offset, selector.row_count)
5499 );
5500
5501 batch_offset += selector.row_count;
5502 actual_offset += selector.row_count;
5503 }
5504 }
5505 }
5506 }
5507
5508 #[test]
5509 fn test_read_old_nested_list() {
5510 use arrow::datatypes::DataType;
5511 use arrow::datatypes::ToByteSlice;
5512
5513 let testdata = arrow::util::test_util::parquet_test_data();
5514 let path = format!("{testdata}/old_list_structure.parquet");
5523 let test_file = File::open(path).unwrap();
5524
5525 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5527
5528 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5530
5531 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5533 "array",
5534 DataType::Int32,
5535 false,
5536 ))))
5537 .len(2)
5538 .add_buffer(a_value_offsets)
5539 .add_child_data(a_values.into_data())
5540 .build()
5541 .unwrap();
5542 let a = ListArray::from(a_list_data);
5543
5544 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5545 let mut reader = builder.build().unwrap();
5546 let out = reader.next().unwrap().unwrap();
5547 assert_eq!(out.num_rows(), 1);
5548 assert_eq!(out.num_columns(), 1);
5549 let c0 = out.column(0);
5551 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5552 let r0 = c0arr.value(0);
5554 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5555 assert_eq!(r0arr, &a);
5556 }
5557
5558 #[test]
5559 fn test_map_no_value() {
5560 let testdata = arrow::util::test_util::parquet_test_data();
5580 let path = format!("{testdata}/map_no_value.parquet");
5581 let file = File::open(path).unwrap();
5582
5583 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5584 .unwrap()
5585 .build()
5586 .unwrap();
5587 let out = reader.next().unwrap().unwrap();
5588 assert_eq!(out.num_rows(), 3);
5589 assert_eq!(out.num_columns(), 3);
5590 let c0 = out.column(1).as_list::<i32>();
5592 let c1 = out.column(2).as_list::<i32>();
5593 assert_eq!(c0.len(), c1.len());
5594 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5595 }
5596
5597 #[test]
5598 fn test_row_filter_full_page_skip_is_handled() {
5599 let first_value: i64 = 1111;
5600 let last_value: i64 = 9999;
5601 let num_rows: usize = 12;
5602
5603 let schema = Arc::new(Schema::new(vec![
5607 Field::new("key", arrow_schema::DataType::Int64, false),
5608 Field::new("value", arrow_schema::DataType::Int64, false),
5609 ]));
5610
5611 let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5612 int_values[0] = first_value;
5613 int_values[num_rows - 1] = last_value;
5614 let keys = Int64Array::from(int_values.clone());
5615 let values = Int64Array::from(int_values.clone());
5616 let batch = RecordBatch::try_new(
5617 Arc::clone(&schema),
5618 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5619 )
5620 .unwrap();
5621
5622 let props = WriterProperties::builder()
5623 .set_write_batch_size(2)
5624 .set_data_page_row_count_limit(2)
5625 .build();
5626
5627 let mut buffer = Vec::new();
5628 let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5629 writer.write(&batch).unwrap();
5630 writer.close().unwrap();
5631 let data = Bytes::from(buffer);
5632
5633 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5634 let builder =
5635 ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5636 let schema = builder.parquet_schema().clone();
5637 let filter_mask = ProjectionMask::leaves(&schema, [0]);
5638
5639 let make_predicate = |mask: ProjectionMask| {
5640 ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5641 let column = batch.column(0);
5642 let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5643 let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5644 or(&match_first, &match_second)
5645 })
5646 };
5647
5648 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
5649 let predicate = make_predicate(filter_mask.clone());
5650
5651 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5654 .unwrap()
5655 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5656 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5657 .with_batch_size(12)
5658 .build()
5659 .unwrap();
5660
5661 let schema = reader.schema().clone();
5664 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5665 let result = concat_batches(&schema, &batches).unwrap();
5666 assert_eq!(result.num_rows(), 2);
5667 }
5668
5669 #[test]
5670 fn test_get_row_group_column_bloom_filter_with_length() {
5671 let testdata = arrow::util::test_util::parquet_test_data();
5673 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5674 let file = File::open(path).unwrap();
5675 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5676 let schema = builder.schema().clone();
5677 let reader = builder.build().unwrap();
5678
5679 let mut parquet_data = Vec::new();
5680 let props = WriterProperties::builder()
5681 .set_bloom_filter_enabled(true)
5682 .build();
5683 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5684 for batch in reader {
5685 let batch = batch.unwrap();
5686 writer.write(&batch).unwrap();
5687 }
5688 writer.close().unwrap();
5689
5690 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5692 }
5693
5694 #[test]
5695 fn test_get_row_group_column_bloom_filter_without_length() {
5696 let testdata = arrow::util::test_util::parquet_test_data();
5697 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5698 let data = Bytes::from(std::fs::read(path).unwrap());
5699 test_get_row_group_column_bloom_filter(data, false);
5700 }
5701
5702 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5703 let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5704
5705 let metadata = builder.metadata();
5706 assert_eq!(metadata.num_row_groups(), 1);
5707 let row_group = metadata.row_group(0);
5708 let column = row_group.column(0);
5709 assert_eq!(column.bloom_filter_length().is_some(), with_length);
5710
5711 let sbbf = builder
5712 .get_row_group_column_bloom_filter(0, 0)
5713 .unwrap()
5714 .unwrap();
5715 assert!(sbbf.check(&"Hello"));
5716 assert!(!sbbf.check(&"Hello_Not_Exists"));
5717 }
5718
5719 #[test]
5720 fn test_read_unknown_logical_type() {
5721 let testdata = arrow::util::test_util::parquet_test_data();
5722 let path = format!("{testdata}/unknown-logical-type.parquet");
5723 let test_file = File::open(path).unwrap();
5724
5725 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5726 .expect("Error creating reader builder");
5727
5728 let schema = builder.metadata().file_metadata().schema_descr();
5729 assert_eq!(
5730 schema.column(0).logical_type_ref(),
5731 Some(&LogicalType::String)
5732 );
5733 assert_eq!(
5734 schema.column(1).logical_type_ref(),
5735 Some(&LogicalType::_Unknown { field_id: 2555 })
5736 );
5737 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5738
5739 let mut reader = builder.build().unwrap();
5740 let out = reader.next().unwrap().unwrap();
5741 assert_eq!(out.num_rows(), 3);
5742 assert_eq!(out.num_columns(), 2);
5743 }
5744
5745 #[test]
5746 fn test_read_row_numbers() {
5747 let file = write_parquet_from_iter(vec![(
5748 "value",
5749 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5750 )]);
5751 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5752
5753 let row_number_field = Arc::new(
5754 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5755 );
5756
5757 let options = ArrowReaderOptions::new()
5758 .with_schema(Arc::new(Schema::new(supplied_fields)))
5759 .with_virtual_columns(vec![row_number_field.clone()])
5760 .unwrap();
5761 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5762 file.try_clone().unwrap(),
5763 options,
5764 )
5765 .expect("reader builder with schema")
5766 .build()
5767 .expect("reader with schema");
5768
5769 let batch = arrow_reader.next().unwrap().unwrap();
5770 let schema = Arc::new(Schema::new(vec![
5771 Field::new("value", ArrowDataType::Int64, false),
5772 (*row_number_field).clone(),
5773 ]));
5774
5775 assert_eq!(batch.schema(), schema);
5776 assert_eq!(batch.num_columns(), 2);
5777 assert_eq!(batch.num_rows(), 3);
5778 assert_eq!(
5779 batch
5780 .column(0)
5781 .as_primitive::<types::Int64Type>()
5782 .iter()
5783 .collect::<Vec<_>>(),
5784 vec![Some(1), Some(2), Some(3)]
5785 );
5786 assert_eq!(
5787 batch
5788 .column(1)
5789 .as_primitive::<types::Int64Type>()
5790 .iter()
5791 .collect::<Vec<_>>(),
5792 vec![Some(0), Some(1), Some(2)]
5793 );
5794 }
5795
5796 #[test]
5797 fn test_read_only_row_numbers() {
5798 let file = write_parquet_from_iter(vec![(
5799 "value",
5800 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5801 )]);
5802 let row_number_field = Arc::new(
5803 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5804 );
5805 let options = ArrowReaderOptions::new()
5806 .with_virtual_columns(vec![row_number_field.clone()])
5807 .unwrap();
5808 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5809 let num_columns = metadata
5810 .metadata
5811 .file_metadata()
5812 .schema_descr()
5813 .num_columns();
5814
5815 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5816 .with_projection(ProjectionMask::none(num_columns))
5817 .build()
5818 .expect("reader with schema");
5819
5820 let batch = arrow_reader.next().unwrap().unwrap();
5821 let schema = Arc::new(Schema::new(vec![row_number_field]));
5822
5823 assert_eq!(batch.schema(), schema);
5824 assert_eq!(batch.num_columns(), 1);
5825 assert_eq!(batch.num_rows(), 3);
5826 assert_eq!(
5827 batch
5828 .column(0)
5829 .as_primitive::<types::Int64Type>()
5830 .iter()
5831 .collect::<Vec<_>>(),
5832 vec![Some(0), Some(1), Some(2)]
5833 );
5834 }
5835
5836 #[test]
5837 fn test_read_row_numbers_row_group_order() -> Result<()> {
5838 let array = Int64Array::from_iter_values(5000..5100);
5840 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5841 let mut buffer = Vec::new();
5842 let options = WriterProperties::builder()
5843 .set_max_row_group_size(50)
5844 .build();
5845 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5846 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5848 writer.write(&batch_chunk)?;
5849 }
5850 writer.close()?;
5851
5852 let row_number_field = Arc::new(
5853 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5854 );
5855
5856 let buffer = Bytes::from(buffer);
5857
5858 let options =
5859 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5860
5861 let arrow_reader =
5863 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5864 .build()?;
5865
5866 assert_eq!(
5867 ValuesAndRowNumbers {
5868 values: (5000..5100).collect(),
5869 row_numbers: (0..100).collect()
5870 },
5871 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5872 );
5873
5874 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5876 .with_row_groups(vec![1, 0])
5877 .build()?;
5878
5879 assert_eq!(
5880 ValuesAndRowNumbers {
5881 values: (5050..5100).chain(5000..5050).collect(),
5882 row_numbers: (50..100).chain(0..50).collect(),
5883 },
5884 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5885 );
5886
5887 Ok(())
5888 }
5889
5890 #[derive(Debug, PartialEq)]
5891 struct ValuesAndRowNumbers {
5892 values: Vec<i64>,
5893 row_numbers: Vec<i64>,
5894 }
5895 impl ValuesAndRowNumbers {
5896 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5897 let mut values = vec![];
5898 let mut row_numbers = vec![];
5899 for batch in reader {
5900 let batch = batch.expect("Could not read batch");
5901 values.extend(
5902 batch
5903 .column_by_name("col")
5904 .expect("Could not get col column")
5905 .as_primitive::<arrow::datatypes::Int64Type>()
5906 .iter()
5907 .map(|v| v.expect("Could not get value")),
5908 );
5909
5910 row_numbers.extend(
5911 batch
5912 .column_by_name("row_number")
5913 .expect("Could not get row_number column")
5914 .as_primitive::<arrow::datatypes::Int64Type>()
5915 .iter()
5916 .map(|v| v.expect("Could not get row number"))
5917 .collect::<Vec<_>>(),
5918 );
5919 }
5920 Self {
5921 values,
5922 row_numbers,
5923 }
5924 }
5925 }
5926
5927 #[test]
5928 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5929 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5931 assert_eq!(
5932 ArrowReaderOptions::new()
5933 .with_virtual_columns(vec![regular_field])
5934 .unwrap_err()
5935 .to_string(),
5936 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5937 );
5938 }
5939
5940 #[test]
5941 fn test_row_numbers_with_multiple_row_groups() {
5942 test_row_numbers_with_multiple_row_groups_helper(
5943 false,
5944 |path, selection, _row_filter, batch_size| {
5945 let file = File::open(path).unwrap();
5946 let row_number_field = Arc::new(
5947 Field::new("row_number", ArrowDataType::Int64, false)
5948 .with_extension_type(RowNumber),
5949 );
5950 let options = ArrowReaderOptions::new()
5951 .with_virtual_columns(vec![row_number_field])
5952 .unwrap();
5953 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5954 .unwrap()
5955 .with_row_selection(selection)
5956 .with_batch_size(batch_size)
5957 .build()
5958 .expect("Could not create reader");
5959 reader
5960 .collect::<Result<Vec<_>, _>>()
5961 .expect("Could not read")
5962 },
5963 );
5964 }
5965
5966 #[test]
5967 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5968 test_row_numbers_with_multiple_row_groups_helper(
5969 true,
5970 |path, selection, row_filter, batch_size| {
5971 let file = File::open(path).unwrap();
5972 let row_number_field = Arc::new(
5973 Field::new("row_number", ArrowDataType::Int64, false)
5974 .with_extension_type(RowNumber),
5975 );
5976 let options = ArrowReaderOptions::new()
5977 .with_virtual_columns(vec![row_number_field])
5978 .unwrap();
5979 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5980 .unwrap()
5981 .with_row_selection(selection)
5982 .with_batch_size(batch_size)
5983 .with_row_filter(row_filter.expect("No filter"))
5984 .build()
5985 .expect("Could not create reader");
5986 reader
5987 .collect::<Result<Vec<_>, _>>()
5988 .expect("Could not read")
5989 },
5990 );
5991 }
5992
5993 #[test]
5994 fn test_read_row_group_indices() {
5995 let array1 = Int64Array::from(vec![1, 2]);
5997 let array2 = Int64Array::from(vec![3, 4]);
5998 let array3 = Int64Array::from(vec![5, 6]);
5999
6000 let batch1 =
6001 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6002 let batch2 =
6003 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6004 let batch3 =
6005 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
6006
6007 let mut buffer = Vec::new();
6008 let options = WriterProperties::builder()
6009 .set_max_row_group_size(2)
6010 .build();
6011 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6012 writer.write(&batch1).unwrap();
6013 writer.write(&batch2).unwrap();
6014 writer.write(&batch3).unwrap();
6015 writer.close().unwrap();
6016
6017 let file = Bytes::from(buffer);
6018 let row_group_index_field = Arc::new(
6019 Field::new("row_group_index", ArrowDataType::Int64, false)
6020 .with_extension_type(RowGroupIndex),
6021 );
6022
6023 let options = ArrowReaderOptions::new()
6024 .with_virtual_columns(vec![row_group_index_field.clone()])
6025 .unwrap();
6026 let mut arrow_reader =
6027 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
6028 .expect("reader builder with virtual columns")
6029 .build()
6030 .expect("reader with virtual columns");
6031
6032 let batch = arrow_reader.next().unwrap().unwrap();
6033
6034 assert_eq!(batch.num_columns(), 2);
6035 assert_eq!(batch.num_rows(), 6);
6036
6037 assert_eq!(
6038 batch
6039 .column(0)
6040 .as_primitive::<types::Int64Type>()
6041 .iter()
6042 .collect::<Vec<_>>(),
6043 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
6044 );
6045
6046 assert_eq!(
6047 batch
6048 .column(1)
6049 .as_primitive::<types::Int64Type>()
6050 .iter()
6051 .collect::<Vec<_>>(),
6052 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
6053 );
6054 }
6055
6056 #[test]
6057 fn test_read_only_row_group_indices() {
6058 let array1 = Int64Array::from(vec![1, 2, 3]);
6059 let array2 = Int64Array::from(vec![4, 5]);
6060
6061 let batch1 =
6062 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
6063 let batch2 =
6064 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
6065
6066 let mut buffer = Vec::new();
6067 let options = WriterProperties::builder()
6068 .set_max_row_group_size(3)
6069 .build();
6070 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
6071 writer.write(&batch1).unwrap();
6072 writer.write(&batch2).unwrap();
6073 writer.close().unwrap();
6074
6075 let file = Bytes::from(buffer);
6076 let row_group_index_field = Arc::new(
6077 Field::new("row_group_index", ArrowDataType::Int64, false)
6078 .with_extension_type(RowGroupIndex),
6079 );
6080
6081 let options = ArrowReaderOptions::new()
6082 .with_virtual_columns(vec![row_group_index_field.clone()])
6083 .unwrap();
6084 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
6085 let num_columns = metadata
6086 .metadata
6087 .file_metadata()
6088 .schema_descr()
6089 .num_columns();
6090
6091 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
6092 .with_projection(ProjectionMask::none(num_columns))
6093 .build()
6094 .expect("reader with virtual columns only");
6095
6096 let batch = arrow_reader.next().unwrap().unwrap();
6097 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
6098
6099 assert_eq!(batch.schema(), schema);
6100 assert_eq!(batch.num_columns(), 1);
6101 assert_eq!(batch.num_rows(), 5);
6102
6103 assert_eq!(
6104 batch
6105 .column(0)
6106 .as_primitive::<types::Int64Type>()
6107 .iter()
6108 .collect::<Vec<_>>(),
6109 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
6110 );
6111 }
6112
6113 #[test]
6114 fn test_read_row_group_indices_with_selection() -> Result<()> {
6115 let mut buffer = Vec::new();
6116 let options = WriterProperties::builder()
6117 .set_max_row_group_size(10)
6118 .build();
6119
6120 let schema = Arc::new(Schema::new(vec![Field::new(
6121 "value",
6122 ArrowDataType::Int64,
6123 false,
6124 )]));
6125
6126 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
6127
6128 for i in 0..3 {
6130 let start = i * 10;
6131 let array = Int64Array::from_iter_values(start..start + 10);
6132 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
6133 writer.write(&batch)?;
6134 }
6135 writer.close()?;
6136
6137 let file = Bytes::from(buffer);
6138 let row_group_index_field = Arc::new(
6139 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
6140 );
6141
6142 let options =
6143 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
6144
6145 let arrow_reader =
6147 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
6148 .with_row_groups(vec![2, 1, 0])
6149 .build()?;
6150
6151 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
6152 let combined = concat_batches(&batches[0].schema(), &batches)?;
6153
6154 let values = combined.column(0).as_primitive::<types::Int64Type>();
6155 let first_val = values.value(0);
6156 let last_val = values.value(combined.num_rows() - 1);
6157 assert_eq!(first_val, 20);
6159 assert_eq!(last_val, 9);
6161
6162 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
6163 assert_eq!(rg_indices.value(0), 2);
6164 assert_eq!(rg_indices.value(10), 1);
6165 assert_eq!(rg_indices.value(20), 0);
6166
6167 Ok(())
6168 }
6169
6170 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
6171 use_filter: bool,
6172 test_case: F,
6173 ) where
6174 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
6175 {
6176 let seed: u64 = random();
6177 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
6178 let mut rng = StdRng::seed_from_u64(seed);
6179
6180 use tempfile::TempDir;
6181 let tempdir = TempDir::new().expect("Could not create temp dir");
6182
6183 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
6184
6185 let path = tempdir.path().join("test.parquet");
6186 std::fs::write(&path, bytes).expect("Could not write file");
6187
6188 let mut case = vec![];
6189 let mut remaining = metadata.file_metadata().num_rows();
6190 while remaining > 0 {
6191 let row_count = rng.random_range(1..=remaining);
6192 remaining -= row_count;
6193 case.push(RowSelector {
6194 row_count: row_count as usize,
6195 skip: rng.random_bool(0.5),
6196 });
6197 }
6198
6199 let filter = use_filter.then(|| {
6200 let filter = (0..metadata.file_metadata().num_rows())
6201 .map(|_| rng.random_bool(0.99))
6202 .collect::<Vec<_>>();
6203 let mut filter_offset = 0;
6204 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
6205 ProjectionMask::all(),
6206 move |b| {
6207 let array = BooleanArray::from_iter(
6208 filter
6209 .iter()
6210 .skip(filter_offset)
6211 .take(b.num_rows())
6212 .map(|x| Some(*x)),
6213 );
6214 filter_offset += b.num_rows();
6215 Ok(array)
6216 },
6217 ))])
6218 });
6219
6220 let selection = RowSelection::from(case);
6221 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
6222
6223 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
6224 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
6225 return;
6226 }
6227 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
6228 .expect("Failed to concatenate");
6229 let values = actual
6231 .column(0)
6232 .as_primitive::<types::Int64Type>()
6233 .iter()
6234 .collect::<Vec<_>>();
6235 let row_numbers = actual
6236 .column(1)
6237 .as_primitive::<types::Int64Type>()
6238 .iter()
6239 .collect::<Vec<_>>();
6240 assert_eq!(
6241 row_numbers
6242 .into_iter()
6243 .map(|number| number.map(|number| number + 1))
6244 .collect::<Vec<_>>(),
6245 values
6246 );
6247 }
6248
6249 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
6250 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
6251 "value",
6252 ArrowDataType::Int64,
6253 false,
6254 )])));
6255
6256 let mut buf = Vec::with_capacity(1024);
6257 let mut writer =
6258 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
6259
6260 let mut values = 1..=rng.random_range(1..4096);
6261 while !values.is_empty() {
6262 let batch_values = values
6263 .by_ref()
6264 .take(rng.random_range(1..4096))
6265 .collect::<Vec<_>>();
6266 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
6267 let batch =
6268 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
6269 writer.write(&batch).expect("Could not write batch");
6270 writer.flush().expect("Could not flush");
6271 }
6272 let metadata = writer.close().expect("Could not close writer");
6273
6274 (Bytes::from(buf), metadata)
6275 }
6276}