1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub struct ArrowReaderBuilder<T> {
108 pub(crate) input: T,
116
117 pub(crate) metadata: Arc<ParquetMetaData>,
118
119 pub(crate) schema: SchemaRef,
120
121 pub(crate) fields: Option<Arc<ParquetField>>,
122
123 pub(crate) batch_size: usize,
124
125 pub(crate) row_groups: Option<Vec<usize>>,
126
127 pub(crate) projection: ProjectionMask,
128
129 pub(crate) filter: Option<RowFilter>,
130
131 pub(crate) selection: Option<RowSelection>,
132
133 pub(crate) row_selection_policy: RowSelectionPolicy,
134
135 pub(crate) limit: Option<usize>,
136
137 pub(crate) offset: Option<usize>,
138
139 pub(crate) metrics: ArrowReaderMetrics,
140
141 pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("ArrowReaderBuilder<T>")
147 .field("input", &self.input)
148 .field("metadata", &self.metadata)
149 .field("schema", &self.schema)
150 .field("fields", &self.fields)
151 .field("batch_size", &self.batch_size)
152 .field("row_groups", &self.row_groups)
153 .field("projection", &self.projection)
154 .field("filter", &self.filter)
155 .field("selection", &self.selection)
156 .field("row_selection_policy", &self.row_selection_policy)
157 .field("limit", &self.limit)
158 .field("offset", &self.offset)
159 .field("metrics", &self.metrics)
160 .finish()
161 }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166 Self {
167 input,
168 metadata: metadata.metadata,
169 schema: metadata.schema,
170 fields: metadata.fields,
171 batch_size: 1024,
172 row_groups: None,
173 projection: ProjectionMask::all(),
174 filter: None,
175 selection: None,
176 row_selection_policy: RowSelectionPolicy::default(),
177 limit: None,
178 offset: None,
179 metrics: ArrowReaderMetrics::Disabled,
180 max_predicate_cache_size: 100 * 1024 * 1024, }
182 }
183
184 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186 &self.metadata
187 }
188
189 pub fn parquet_schema(&self) -> &SchemaDescriptor {
191 self.metadata.file_metadata().schema_descr()
192 }
193
194 pub fn schema(&self) -> &SchemaRef {
196 &self.schema
197 }
198
199 pub fn with_batch_size(self, batch_size: usize) -> Self {
202 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204 Self { batch_size, ..self }
205 }
206
207 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211 Self {
212 row_groups: Some(row_groups),
213 ..self
214 }
215 }
216
217 pub fn with_projection(self, mask: ProjectionMask) -> Self {
219 Self {
220 projection: mask,
221 ..self
222 }
223 }
224
225 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229 Self {
230 row_selection_policy: policy,
231 ..self
232 }
233 }
234
235 pub fn with_row_selection(self, selection: RowSelection) -> Self {
295 Self {
296 selection: Some(selection),
297 ..self
298 }
299 }
300
301 pub fn with_row_filter(self, filter: RowFilter) -> Self {
341 Self {
342 filter: Some(filter),
343 ..self
344 }
345 }
346
347 pub fn with_limit(self, limit: usize) -> Self {
355 Self {
356 limit: Some(limit),
357 ..self
358 }
359 }
360
361 pub fn with_offset(self, offset: usize) -> Self {
369 Self {
370 offset: Some(offset),
371 ..self
372 }
373 }
374
375 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410 Self { metrics, ..self }
411 }
412
413 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428 Self {
429 max_predicate_cache_size,
430 ..self
431 }
432 }
433}
434
435#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451 skip_arrow_metadata: bool,
453 supplied_schema: Option<SchemaRef>,
458
459 pub(crate) column_index: PageIndexPolicy,
460 pub(crate) offset_index: PageIndexPolicy,
461
462 metadata_options: ParquetMetaDataOptions,
464 #[cfg(feature = "encryption")]
466 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468 virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472 pub fn new() -> Self {
474 Self::default()
475 }
476
477 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484 Self {
485 skip_arrow_metadata,
486 ..self
487 }
488 }
489
490 pub fn with_schema(self, schema: SchemaRef) -> Self {
599 Self {
600 supplied_schema: Some(schema),
601 skip_arrow_metadata: true,
602 ..self
603 }
604 }
605
606 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607 pub fn with_page_index(self, page_index: bool) -> Self {
620 self.with_page_index_policy(PageIndexPolicy::from(page_index))
621 }
622
623 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631 self.with_column_index_policy(policy)
632 .with_offset_index_policy(policy)
633 }
634
635 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642 self.column_index = policy;
643 self
644 }
645
646 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653 self.offset_index = policy;
654 self
655 }
656
657 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663 self.metadata_options.set_schema(schema);
664 self
665 }
666
667 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678 self.metadata_options.set_encoding_stats_as_mask(val);
679 self
680 }
681
682 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687 self.metadata_options.set_encoding_stats_policy(policy);
688 self
689 }
690
691 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696 self.metadata_options.set_column_stats_policy(policy);
697 self
698 }
699
700 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705 self.metadata_options.set_size_stats_policy(policy);
706 self
707 }
708
709 #[cfg(feature = "encryption")]
713 pub fn with_file_decryption_properties(
714 self,
715 file_decryption_properties: Arc<FileDecryptionProperties>,
716 ) -> Self {
717 Self {
718 file_decryption_properties: Some(file_decryption_properties),
719 ..self
720 }
721 }
722
723 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776 for field in &virtual_columns {
778 if !is_virtual_column(field) {
779 return Err(ParquetError::General(format!(
780 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781 field.name()
782 )));
783 }
784 }
785 Ok(Self {
786 virtual_columns,
787 ..self
788 })
789 }
790
791 #[deprecated(
792 since = "57.2.0",
793 note = "Use `column_index_policy` or `offset_index_policy` instead"
794 )]
795 pub fn page_index(&self) -> bool {
802 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803 }
804
805 pub fn offset_index_policy(&self) -> PageIndexPolicy {
810 self.offset_index
811 }
812
813 pub fn column_index_policy(&self) -> PageIndexPolicy {
818 self.column_index
819 }
820
821 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823 &self.metadata_options
824 }
825
826 #[cfg(feature = "encryption")]
831 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832 self.file_decryption_properties.as_ref()
833 }
834}
835
836#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852 pub(crate) metadata: Arc<ParquetMetaData>,
854 pub(crate) schema: SchemaRef,
856 pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873 let metadata = ParquetMetaDataReader::new()
874 .with_column_index_policy(options.column_index)
875 .with_offset_index_policy(options.offset_index)
876 .with_metadata_options(Some(options.metadata_options.clone()));
877 #[cfg(feature = "encryption")]
878 let metadata = metadata.with_decryption_properties(
879 options.file_decryption_properties.as_ref().map(Arc::clone),
880 );
881 let metadata = metadata.parse_and_finish(reader)?;
882 Self::try_new(Arc::new(metadata), options)
883 }
884
885 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893 match options.supplied_schema {
894 Some(supplied_schema) => Self::with_supplied_schema(
895 metadata,
896 supplied_schema.clone(),
897 &options.virtual_columns,
898 ),
899 None => {
900 let kv_metadata = match options.skip_arrow_metadata {
901 true => None,
902 false => metadata.file_metadata().key_value_metadata(),
903 };
904
905 let (schema, fields) = parquet_to_arrow_schema_and_fields(
906 metadata.file_metadata().schema_descr(),
907 ProjectionMask::all(),
908 kv_metadata,
909 &options.virtual_columns,
910 )?;
911
912 Ok(Self {
913 metadata,
914 schema: Arc::new(schema),
915 fields: fields.map(Arc::new),
916 })
917 }
918 }
919 }
920
921 fn with_supplied_schema(
922 metadata: Arc<ParquetMetaData>,
923 supplied_schema: SchemaRef,
924 virtual_columns: &[FieldRef],
925 ) -> Result<Self> {
926 let parquet_schema = metadata.file_metadata().schema_descr();
927 let field_levels = parquet_to_arrow_field_levels_with_virtual(
928 parquet_schema,
929 ProjectionMask::all(),
930 Some(supplied_schema.fields()),
931 virtual_columns,
932 )?;
933 let fields = field_levels.fields;
934 let inferred_len = fields.len();
935 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936 if inferred_len != supplied_len {
940 return Err(arrow_err!(format!(
941 "Incompatible supplied Arrow schema: expected {} columns received {}",
942 inferred_len, supplied_len
943 )));
944 }
945
946 let mut errors = Vec::new();
947
948 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950 for (field1, field2) in field_iter {
951 if field1.data_type() != field2.data_type() {
952 errors.push(format!(
953 "data type mismatch for field {}: requested {} but found {}",
954 field1.name(),
955 field1.data_type(),
956 field2.data_type()
957 ));
958 }
959 if field1.is_nullable() != field2.is_nullable() {
960 errors.push(format!(
961 "nullability mismatch for field {}: expected {:?} but found {:?}",
962 field1.name(),
963 field1.is_nullable(),
964 field2.is_nullable()
965 ));
966 }
967 if field1.metadata() != field2.metadata() {
968 errors.push(format!(
969 "metadata mismatch for field {}: expected {:?} but found {:?}",
970 field1.name(),
971 field1.metadata(),
972 field2.metadata()
973 ));
974 }
975 }
976
977 if !errors.is_empty() {
978 let message = errors.join(", ");
979 return Err(ParquetError::ArrowError(format!(
980 "Incompatible supplied Arrow schema: {message}",
981 )));
982 }
983
984 Ok(Self {
985 metadata,
986 schema: supplied_schema,
987 fields: field_levels.levels.map(Arc::new),
988 })
989 }
990
991 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993 &self.metadata
994 }
995
996 pub fn parquet_schema(&self) -> &SchemaDescriptor {
998 self.metadata.file_metadata().schema_descr()
999 }
1000
1001 pub fn schema(&self) -> &SchemaRef {
1003 &self.schema
1004 }
1005}
1006
1007#[doc(hidden)]
1008pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013 f.debug_tuple("SyncReader").field(&self.0).finish()
1014 }
1015}
1016
1017pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026 pub fn try_new(reader: T) -> Result<Self> {
1057 Self::try_new_with_options(reader, Default::default())
1058 }
1059
1060 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066 Ok(Self::new_with_metadata(reader, metadata))
1067 }
1068
1069 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108 Self::new_builder(SyncReader(input), metadata)
1109 }
1110
1111 pub fn get_row_group_column_bloom_filter(
1117 &self,
1118 row_group_idx: usize,
1119 column_idx: usize,
1120 ) -> Result<Option<Sbbf>> {
1121 let metadata = self.metadata.row_group(row_group_idx);
1122 let column_metadata = metadata.column(column_idx);
1123
1124 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125 offset
1126 .try_into()
1127 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128 } else {
1129 return Ok(None);
1130 };
1131
1132 let buffer = match column_metadata.bloom_filter_length() {
1133 Some(length) => self.input.0.get_bytes(offset, length as usize),
1134 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135 }?;
1136
1137 let (header, bitset_offset) =
1138 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140 match header.algorithm {
1141 BloomFilterAlgorithm::BLOCK => {
1142 }
1144 }
1145 match header.compression {
1146 BloomFilterCompression::UNCOMPRESSED => {
1147 }
1149 }
1150 match header.hash {
1151 BloomFilterHash::XXHASH => {
1152 }
1154 }
1155
1156 let bitset = match column_metadata.bloom_filter_length() {
1157 Some(_) => buffer.slice(
1158 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159 - TryInto::<usize>::try_into(offset).unwrap())..,
1160 ),
1161 None => {
1162 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163 ParquetError::General("Bloom filter length is invalid".to_string())
1164 })?;
1165 self.input.0.get_bytes(bitset_offset, bitset_length)?
1166 }
1167 };
1168 Ok(Some(Sbbf::new(&bitset)))
1169 }
1170
1171 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175 let Self {
1176 input,
1177 metadata,
1178 schema: _,
1179 fields,
1180 batch_size,
1181 row_groups,
1182 projection,
1183 mut filter,
1184 selection,
1185 row_selection_policy,
1186 limit,
1187 offset,
1188 metrics,
1189 max_predicate_cache_size: _,
1191 } = self;
1192
1193 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198 let reader = ReaderRowGroups {
1199 reader: Arc::new(input.0),
1200 metadata,
1201 row_groups,
1202 };
1203
1204 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205 .with_selection(selection)
1206 .with_row_selection_policy(row_selection_policy);
1207
1208 if let Some(filter) = filter.as_mut() {
1210 for predicate in filter.predicates.iter_mut() {
1211 if !plan_builder.selects_any() {
1213 break;
1214 }
1215
1216 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1217 .with_parquet_metadata(&reader.metadata)
1218 .build_array_reader(fields.as_deref(), predicate.projection())?;
1219
1220 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1221 }
1222 }
1223
1224 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1225 .with_parquet_metadata(&reader.metadata)
1226 .build_array_reader(fields.as_deref(), &projection)?;
1227
1228 let read_plan = plan_builder
1229 .limited(reader.num_rows())
1230 .with_offset(offset)
1231 .with_limit(limit)
1232 .build_limited()
1233 .build();
1234
1235 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1236 }
1237}
1238
1239struct ReaderRowGroups<T: ChunkReader> {
1240 reader: Arc<T>,
1241
1242 metadata: Arc<ParquetMetaData>,
1243 row_groups: Vec<usize>,
1245}
1246
1247impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1248 fn num_rows(&self) -> usize {
1249 let meta = self.metadata.row_groups();
1250 self.row_groups
1251 .iter()
1252 .map(|x| meta[*x].num_rows() as usize)
1253 .sum()
1254 }
1255
1256 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1257 Ok(Box::new(ReaderPageIterator {
1258 column_idx: i,
1259 reader: self.reader.clone(),
1260 metadata: self.metadata.clone(),
1261 row_groups: self.row_groups.clone().into_iter(),
1262 }))
1263 }
1264
1265 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1266 Box::new(
1267 self.row_groups
1268 .iter()
1269 .map(move |i| self.metadata.row_group(*i)),
1270 )
1271 }
1272
1273 fn metadata(&self) -> &ParquetMetaData {
1274 self.metadata.as_ref()
1275 }
1276}
1277
1278struct ReaderPageIterator<T: ChunkReader> {
1279 reader: Arc<T>,
1280 column_idx: usize,
1281 row_groups: std::vec::IntoIter<usize>,
1282 metadata: Arc<ParquetMetaData>,
1283}
1284
1285impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1286 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1288 let rg = self.metadata.row_group(rg_idx);
1289 let column_chunk_metadata = rg.column(self.column_idx);
1290 let offset_index = self.metadata.offset_index();
1291 let page_locations = offset_index
1294 .filter(|i| !i[rg_idx].is_empty())
1295 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1296 let total_rows = rg.num_rows() as usize;
1297 let reader = self.reader.clone();
1298
1299 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1300 .add_crypto_context(
1301 rg_idx,
1302 self.column_idx,
1303 self.metadata.as_ref(),
1304 column_chunk_metadata,
1305 )
1306 }
1307}
1308
1309impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1310 type Item = Result<Box<dyn PageReader>>;
1311
1312 fn next(&mut self) -> Option<Self::Item> {
1313 let rg_idx = self.row_groups.next()?;
1314 let page_reader = self
1315 .next_page_reader(rg_idx)
1316 .map(|page_reader| Box::new(page_reader) as _);
1317 Some(page_reader)
1318 }
1319}
1320
1321impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1322
1323pub struct ParquetRecordBatchReader {
1334 array_reader: Box<dyn ArrayReader>,
1335 schema: SchemaRef,
1336 read_plan: ReadPlan,
1337}
1338
1339impl Debug for ParquetRecordBatchReader {
1340 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1341 f.debug_struct("ParquetRecordBatchReader")
1342 .field("array_reader", &"...")
1343 .field("schema", &self.schema)
1344 .field("read_plan", &self.read_plan)
1345 .finish()
1346 }
1347}
1348
1349impl Iterator for ParquetRecordBatchReader {
1350 type Item = Result<RecordBatch, ArrowError>;
1351
1352 fn next(&mut self) -> Option<Self::Item> {
1353 self.next_inner()
1354 .map_err(|arrow_err| arrow_err.into())
1355 .transpose()
1356 }
1357}
1358
1359impl ParquetRecordBatchReader {
1360 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1366 let mut read_records = 0;
1367 let batch_size = self.batch_size();
1368 if batch_size == 0 {
1369 return Ok(None);
1370 }
1371 match self.read_plan.row_selection_cursor_mut() {
1372 RowSelectionCursor::Mask(mask_cursor) => {
1373 while !mask_cursor.is_empty() {
1376 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1377 return Ok(None);
1378 };
1379
1380 if mask_chunk.initial_skip > 0 {
1381 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1382 if skipped != mask_chunk.initial_skip {
1383 return Err(general_err!(
1384 "failed to skip rows, expected {}, got {}",
1385 mask_chunk.initial_skip,
1386 skipped
1387 ));
1388 }
1389 }
1390
1391 if mask_chunk.chunk_rows == 0 {
1392 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1393 return Ok(None);
1394 }
1395 continue;
1396 }
1397
1398 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1399
1400 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1401 if read == 0 {
1402 return Err(general_err!(
1403 "reached end of column while expecting {} rows",
1404 mask_chunk.chunk_rows
1405 ));
1406 }
1407 if read != mask_chunk.chunk_rows {
1408 return Err(general_err!(
1409 "insufficient rows read from array reader - expected {}, got {}",
1410 mask_chunk.chunk_rows,
1411 read
1412 ));
1413 }
1414
1415 let array = self.array_reader.consume_batch()?;
1416 let struct_array = array.as_struct_opt().ok_or_else(|| {
1419 ArrowError::ParquetError(
1420 "Struct array reader should return struct array".to_string(),
1421 )
1422 })?;
1423
1424 let filtered_batch =
1425 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1426
1427 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1428 return Err(general_err!(
1429 "filtered rows mismatch selection - expected {}, got {}",
1430 mask_chunk.selected_rows,
1431 filtered_batch.num_rows()
1432 ));
1433 }
1434
1435 if filtered_batch.num_rows() == 0 {
1436 continue;
1437 }
1438
1439 return Ok(Some(filtered_batch));
1440 }
1441 }
1442 RowSelectionCursor::Selectors(selectors_cursor) => {
1443 while read_records < batch_size && !selectors_cursor.is_empty() {
1444 let front = selectors_cursor.next_selector();
1445 if front.skip {
1446 let skipped = self.array_reader.skip_records(front.row_count)?;
1447
1448 if skipped != front.row_count {
1449 return Err(general_err!(
1450 "failed to skip rows, expected {}, got {}",
1451 front.row_count,
1452 skipped
1453 ));
1454 }
1455 continue;
1456 }
1457
1458 if front.row_count == 0 {
1461 continue;
1462 }
1463
1464 let need_read = batch_size - read_records;
1466 let to_read = match front.row_count.checked_sub(need_read) {
1467 Some(remaining) if remaining != 0 => {
1468 selectors_cursor.return_selector(RowSelector::select(remaining));
1471 need_read
1472 }
1473 _ => front.row_count,
1474 };
1475 match self.array_reader.read_records(to_read)? {
1476 0 => break,
1477 rec => read_records += rec,
1478 };
1479 }
1480 }
1481 RowSelectionCursor::All => {
1482 self.array_reader.read_records(batch_size)?;
1483 }
1484 };
1485
1486 let array = self.array_reader.consume_batch()?;
1487 let struct_array = array.as_struct_opt().ok_or_else(|| {
1488 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1489 })?;
1490
1491 Ok(if struct_array.len() > 0 {
1492 Some(RecordBatch::from(struct_array))
1493 } else {
1494 None
1495 })
1496 }
1497}
1498
1499impl RecordBatchReader for ParquetRecordBatchReader {
1500 fn schema(&self) -> SchemaRef {
1505 self.schema.clone()
1506 }
1507}
1508
1509impl ParquetRecordBatchReader {
1510 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1514 ParquetRecordBatchReaderBuilder::try_new(reader)?
1515 .with_batch_size(batch_size)
1516 .build()
1517 }
1518
1519 pub fn try_new_with_row_groups(
1524 levels: &FieldLevels,
1525 row_groups: &dyn RowGroups,
1526 batch_size: usize,
1527 selection: Option<RowSelection>,
1528 ) -> Result<Self> {
1529 let metrics = ArrowReaderMetrics::disabled();
1531 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1532 .with_parquet_metadata(row_groups.metadata())
1533 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1534
1535 let read_plan = ReadPlanBuilder::new(batch_size)
1536 .with_selection(selection)
1537 .build();
1538
1539 Ok(Self {
1540 array_reader,
1541 schema: Arc::new(Schema::new(levels.fields.clone())),
1542 read_plan,
1543 })
1544 }
1545
1546 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1550 let schema = match array_reader.get_data_type() {
1551 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1552 _ => unreachable!("Struct array reader's data type is not struct!"),
1553 };
1554
1555 Self {
1556 array_reader,
1557 schema: Arc::new(schema),
1558 read_plan,
1559 }
1560 }
1561
1562 #[inline(always)]
1563 pub(crate) fn batch_size(&self) -> usize {
1564 self.read_plan.batch_size()
1565 }
1566}
1567
1568#[cfg(test)]
1569pub(crate) mod tests {
1570 use std::cmp::min;
1571 use std::collections::{HashMap, VecDeque};
1572 use std::fmt::Formatter;
1573 use std::fs::File;
1574 use std::io::Seek;
1575 use std::path::PathBuf;
1576 use std::sync::Arc;
1577
1578 use rand::rngs::StdRng;
1579 use rand::{Rng, RngCore, SeedableRng, random, rng};
1580 use tempfile::tempfile;
1581
1582 use crate::arrow::arrow_reader::{
1583 ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
1584 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1585 };
1586 use crate::arrow::schema::{
1587 add_encoded_arrow_schema_to_metadata,
1588 virtual_type::{RowGroupIndex, RowNumber},
1589 };
1590 use crate::arrow::{ArrowWriter, ProjectionMask};
1591 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1592 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1593 use crate::data_type::{
1594 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1595 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1596 };
1597 use crate::errors::Result;
1598 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1599 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1600 use crate::file::writer::SerializedFileWriter;
1601 use crate::schema::parser::parse_message_type;
1602 use crate::schema::types::{Type, TypePtr};
1603 use crate::util::test_common::rand_gen::RandGen;
1604 use arrow_array::builder::*;
1605 use arrow_array::cast::AsArray;
1606 use arrow_array::types::{
1607 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1608 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1609 Time64MicrosecondType,
1610 };
1611 use arrow_array::*;
1612 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1613 use arrow_data::{ArrayData, ArrayDataBuilder};
1614 use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
1615 use arrow_select::concat::concat_batches;
1616 use bytes::Bytes;
1617 use half::f16;
1618 use num_traits::PrimInt;
1619
1620 #[test]
1621 fn test_arrow_reader_all_columns() {
1622 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1623
1624 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1625 let original_schema = Arc::clone(builder.schema());
1626 let reader = builder.build().unwrap();
1627
1628 assert_eq!(original_schema.fields(), reader.schema().fields());
1630 }
1631
1632 #[test]
1633 fn test_reuse_schema() {
1634 let file = get_test_file("parquet/alltypes-java.parquet");
1635
1636 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1637 let expected = builder.metadata;
1638 let schema = expected.file_metadata().schema_descr_ptr();
1639
1640 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1641 let builder =
1642 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1643
1644 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1646 }
1647
1648 #[test]
1649 fn test_page_encoding_stats_mask() {
1650 let testdata = arrow::util::test_util::parquet_test_data();
1651 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1652 let file = File::open(path).unwrap();
1653
1654 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1655 let builder =
1656 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1657
1658 let row_group_metadata = builder.metadata.row_group(0);
1659
1660 let page_encoding_stats = row_group_metadata
1662 .column(0)
1663 .page_encoding_stats_mask()
1664 .unwrap();
1665 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1666 let page_encoding_stats = row_group_metadata
1667 .column(2)
1668 .page_encoding_stats_mask()
1669 .unwrap();
1670 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1671 }
1672
1673 #[test]
1674 fn test_stats_stats_skipped() {
1675 let testdata = arrow::util::test_util::parquet_test_data();
1676 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1677 let file = File::open(path).unwrap();
1678
1679 let arrow_options = ArrowReaderOptions::new()
1681 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1682 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1683 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1684 file.try_clone().unwrap(),
1685 arrow_options,
1686 )
1687 .unwrap();
1688
1689 let row_group_metadata = builder.metadata.row_group(0);
1690 for column in row_group_metadata.columns() {
1691 assert!(column.page_encoding_stats().is_none());
1692 assert!(column.page_encoding_stats_mask().is_none());
1693 assert!(column.statistics().is_none());
1694 }
1695
1696 let arrow_options = ArrowReaderOptions::new()
1698 .with_encoding_stats_as_mask(true)
1699 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1700 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1701 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1702 file.try_clone().unwrap(),
1703 arrow_options,
1704 )
1705 .unwrap();
1706
1707 let row_group_metadata = builder.metadata.row_group(0);
1708 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1709 assert!(column.page_encoding_stats().is_none());
1710 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1711 assert_eq!(column.statistics().is_some(), idx == 0);
1712 }
1713 }
1714
1715 #[test]
1716 fn test_size_stats_stats_skipped() {
1717 let testdata = arrow::util::test_util::parquet_test_data();
1718 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1719 let file = File::open(path).unwrap();
1720
1721 let arrow_options =
1723 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1724 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1725 file.try_clone().unwrap(),
1726 arrow_options,
1727 )
1728 .unwrap();
1729
1730 let row_group_metadata = builder.metadata.row_group(0);
1731 for column in row_group_metadata.columns() {
1732 assert!(column.repetition_level_histogram().is_none());
1733 assert!(column.definition_level_histogram().is_none());
1734 assert!(column.unencoded_byte_array_data_bytes().is_none());
1735 }
1736
1737 let arrow_options = ArrowReaderOptions::new()
1739 .with_encoding_stats_as_mask(true)
1740 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1741 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1742 file.try_clone().unwrap(),
1743 arrow_options,
1744 )
1745 .unwrap();
1746
1747 let row_group_metadata = builder.metadata.row_group(0);
1748 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1749 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1750 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1751 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1752 }
1753 }
1754
1755 #[test]
1756 fn test_arrow_reader_single_column() {
1757 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1758
1759 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1760 let original_schema = Arc::clone(builder.schema());
1761
1762 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1763 let reader = builder.with_projection(mask).build().unwrap();
1764
1765 assert_eq!(1, reader.schema().fields().len());
1767 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1768 }
1769
1770 #[test]
1771 fn test_arrow_reader_single_column_by_name() {
1772 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1773
1774 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1775 let original_schema = Arc::clone(builder.schema());
1776
1777 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1778 let reader = builder.with_projection(mask).build().unwrap();
1779
1780 assert_eq!(1, reader.schema().fields().len());
1782 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1783 }
1784
1785 #[test]
1786 fn test_null_column_reader_test() {
1787 let mut file = tempfile::tempfile().unwrap();
1788
1789 let schema = "
1790 message message {
1791 OPTIONAL INT32 int32;
1792 }
1793 ";
1794 let schema = Arc::new(parse_message_type(schema).unwrap());
1795
1796 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1797 generate_single_column_file_with_data::<Int32Type>(
1798 &[vec![], vec![]],
1799 Some(&def_levels),
1800 file.try_clone().unwrap(), schema,
1802 Some(Field::new("int32", ArrowDataType::Null, true)),
1803 &Default::default(),
1804 )
1805 .unwrap();
1806
1807 file.rewind().unwrap();
1808
1809 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1810 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1811
1812 assert_eq!(batches.len(), 4);
1813 for batch in &batches[0..3] {
1814 assert_eq!(batch.num_rows(), 2);
1815 assert_eq!(batch.num_columns(), 1);
1816 assert_eq!(batch.column(0).null_count(), 2);
1817 }
1818
1819 assert_eq!(batches[3].num_rows(), 1);
1820 assert_eq!(batches[3].num_columns(), 1);
1821 assert_eq!(batches[3].column(0).null_count(), 1);
1822 }
1823
1824 #[test]
1825 fn test_primitive_single_column_reader_test() {
1826 run_single_column_reader_tests::<BoolType, _, BoolType>(
1827 2,
1828 ConvertedType::NONE,
1829 None,
1830 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1831 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1832 );
1833 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1834 2,
1835 ConvertedType::NONE,
1836 None,
1837 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1838 &[
1839 Encoding::PLAIN,
1840 Encoding::RLE_DICTIONARY,
1841 Encoding::DELTA_BINARY_PACKED,
1842 Encoding::BYTE_STREAM_SPLIT,
1843 ],
1844 );
1845 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1846 2,
1847 ConvertedType::NONE,
1848 None,
1849 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1850 &[
1851 Encoding::PLAIN,
1852 Encoding::RLE_DICTIONARY,
1853 Encoding::DELTA_BINARY_PACKED,
1854 Encoding::BYTE_STREAM_SPLIT,
1855 ],
1856 );
1857 run_single_column_reader_tests::<FloatType, _, FloatType>(
1858 2,
1859 ConvertedType::NONE,
1860 None,
1861 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1862 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1863 );
1864 }
1865
1866 #[test]
1867 fn test_unsigned_primitive_single_column_reader_test() {
1868 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1869 2,
1870 ConvertedType::UINT_32,
1871 Some(ArrowDataType::UInt32),
1872 |vals| {
1873 Arc::new(UInt32Array::from_iter(
1874 vals.iter().map(|x| x.map(|x| x as u32)),
1875 ))
1876 },
1877 &[
1878 Encoding::PLAIN,
1879 Encoding::RLE_DICTIONARY,
1880 Encoding::DELTA_BINARY_PACKED,
1881 ],
1882 );
1883 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1884 2,
1885 ConvertedType::UINT_64,
1886 Some(ArrowDataType::UInt64),
1887 |vals| {
1888 Arc::new(UInt64Array::from_iter(
1889 vals.iter().map(|x| x.map(|x| x as u64)),
1890 ))
1891 },
1892 &[
1893 Encoding::PLAIN,
1894 Encoding::RLE_DICTIONARY,
1895 Encoding::DELTA_BINARY_PACKED,
1896 ],
1897 );
1898 }
1899
1900 #[test]
1901 fn test_unsigned_roundtrip() {
1902 let schema = Arc::new(Schema::new(vec![
1903 Field::new("uint32", ArrowDataType::UInt32, true),
1904 Field::new("uint64", ArrowDataType::UInt64, true),
1905 ]));
1906
1907 let mut buf = Vec::with_capacity(1024);
1908 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1909
1910 let original = RecordBatch::try_new(
1911 schema,
1912 vec![
1913 Arc::new(UInt32Array::from_iter_values([
1914 0,
1915 i32::MAX as u32,
1916 u32::MAX,
1917 ])),
1918 Arc::new(UInt64Array::from_iter_values([
1919 0,
1920 i64::MAX as u64,
1921 u64::MAX,
1922 ])),
1923 ],
1924 )
1925 .unwrap();
1926
1927 writer.write(&original).unwrap();
1928 writer.close().unwrap();
1929
1930 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1931 let ret = reader.next().unwrap().unwrap();
1932 assert_eq!(ret, original);
1933
1934 ret.column(0)
1936 .as_any()
1937 .downcast_ref::<UInt32Array>()
1938 .unwrap();
1939
1940 ret.column(1)
1941 .as_any()
1942 .downcast_ref::<UInt64Array>()
1943 .unwrap();
1944 }
1945
1946 #[test]
1947 fn test_float16_roundtrip() -> Result<()> {
1948 let schema = Arc::new(Schema::new(vec![
1949 Field::new("float16", ArrowDataType::Float16, false),
1950 Field::new("float16-nullable", ArrowDataType::Float16, true),
1951 ]));
1952
1953 let mut buf = Vec::with_capacity(1024);
1954 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1955
1956 let original = RecordBatch::try_new(
1957 schema,
1958 vec![
1959 Arc::new(Float16Array::from_iter_values([
1960 f16::EPSILON,
1961 f16::MIN,
1962 f16::MAX,
1963 f16::NAN,
1964 f16::INFINITY,
1965 f16::NEG_INFINITY,
1966 f16::ONE,
1967 f16::NEG_ONE,
1968 f16::ZERO,
1969 f16::NEG_ZERO,
1970 f16::E,
1971 f16::PI,
1972 f16::FRAC_1_PI,
1973 ])),
1974 Arc::new(Float16Array::from(vec![
1975 None,
1976 None,
1977 None,
1978 Some(f16::NAN),
1979 Some(f16::INFINITY),
1980 Some(f16::NEG_INFINITY),
1981 None,
1982 None,
1983 None,
1984 None,
1985 None,
1986 None,
1987 Some(f16::FRAC_1_PI),
1988 ])),
1989 ],
1990 )?;
1991
1992 writer.write(&original)?;
1993 writer.close()?;
1994
1995 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1996 let ret = reader.next().unwrap()?;
1997 assert_eq!(ret, original);
1998
1999 ret.column(0).as_primitive::<Float16Type>();
2001 ret.column(1).as_primitive::<Float16Type>();
2002
2003 Ok(())
2004 }
2005
2006 #[test]
2007 fn test_time_utc_roundtrip() -> Result<()> {
2008 let schema = Arc::new(Schema::new(vec![
2009 Field::new(
2010 "time_millis",
2011 ArrowDataType::Time32(TimeUnit::Millisecond),
2012 true,
2013 )
2014 .with_metadata(HashMap::from_iter(vec![(
2015 "adjusted_to_utc".to_string(),
2016 "".to_string(),
2017 )])),
2018 Field::new(
2019 "time_micros",
2020 ArrowDataType::Time64(TimeUnit::Microsecond),
2021 true,
2022 )
2023 .with_metadata(HashMap::from_iter(vec![(
2024 "adjusted_to_utc".to_string(),
2025 "".to_string(),
2026 )])),
2027 ]));
2028
2029 let mut buf = Vec::with_capacity(1024);
2030 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2031
2032 let original = RecordBatch::try_new(
2033 schema,
2034 vec![
2035 Arc::new(Time32MillisecondArray::from(vec![
2036 Some(-1),
2037 Some(0),
2038 Some(86_399_000),
2039 Some(86_400_000),
2040 Some(86_401_000),
2041 None,
2042 ])),
2043 Arc::new(Time64MicrosecondArray::from(vec![
2044 Some(-1),
2045 Some(0),
2046 Some(86_399 * 1_000_000),
2047 Some(86_400 * 1_000_000),
2048 Some(86_401 * 1_000_000),
2049 None,
2050 ])),
2051 ],
2052 )?;
2053
2054 writer.write(&original)?;
2055 writer.close()?;
2056
2057 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2058 let ret = reader.next().unwrap()?;
2059 assert_eq!(ret, original);
2060
2061 ret.column(0).as_primitive::<Time32MillisecondType>();
2063 ret.column(1).as_primitive::<Time64MicrosecondType>();
2064
2065 Ok(())
2066 }
2067
2068 #[test]
2069 fn test_date32_roundtrip() -> Result<()> {
2070 use arrow_array::Date32Array;
2071
2072 let schema = Arc::new(Schema::new(vec![Field::new(
2073 "date32",
2074 ArrowDataType::Date32,
2075 false,
2076 )]));
2077
2078 let mut buf = Vec::with_capacity(1024);
2079
2080 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2081
2082 let original = RecordBatch::try_new(
2083 schema,
2084 vec![Arc::new(Date32Array::from(vec![
2085 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2086 ]))],
2087 )?;
2088
2089 writer.write(&original)?;
2090 writer.close()?;
2091
2092 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2093 let ret = reader.next().unwrap()?;
2094 assert_eq!(ret, original);
2095
2096 ret.column(0).as_primitive::<Date32Type>();
2098
2099 Ok(())
2100 }
2101
2102 #[test]
2103 fn test_date64_roundtrip() -> Result<()> {
2104 use arrow_array::Date64Array;
2105
2106 let schema = Arc::new(Schema::new(vec![
2107 Field::new("small-date64", ArrowDataType::Date64, false),
2108 Field::new("big-date64", ArrowDataType::Date64, false),
2109 Field::new("invalid-date64", ArrowDataType::Date64, false),
2110 ]));
2111
2112 let mut default_buf = Vec::with_capacity(1024);
2113 let mut coerce_buf = Vec::with_capacity(1024);
2114
2115 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2116
2117 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2118 let mut coerce_writer =
2119 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2120
2121 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2122
2123 let original = RecordBatch::try_new(
2124 schema,
2125 vec![
2126 Arc::new(Date64Array::from(vec![
2128 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2129 -1_000 * NUM_MILLISECONDS_IN_DAY,
2130 0,
2131 1_000 * NUM_MILLISECONDS_IN_DAY,
2132 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2133 ])),
2134 Arc::new(Date64Array::from(vec![
2136 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2137 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2138 0,
2139 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2140 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2141 ])),
2142 Arc::new(Date64Array::from(vec![
2144 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2145 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2146 1,
2147 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2148 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2149 ])),
2150 ],
2151 )?;
2152
2153 default_writer.write(&original)?;
2154 coerce_writer.write(&original)?;
2155
2156 default_writer.close()?;
2157 coerce_writer.close()?;
2158
2159 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2160 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2161
2162 let default_ret = default_reader.next().unwrap()?;
2163 let coerce_ret = coerce_reader.next().unwrap()?;
2164
2165 assert_eq!(default_ret, original);
2167
2168 assert_eq!(coerce_ret.column(0), original.column(0));
2170 assert_ne!(coerce_ret.column(1), original.column(1));
2171 assert_ne!(coerce_ret.column(2), original.column(2));
2172
2173 default_ret.column(0).as_primitive::<Date64Type>();
2175 coerce_ret.column(0).as_primitive::<Date64Type>();
2176
2177 Ok(())
2178 }
2179 struct RandFixedLenGen {}
2180
2181 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2182 fn r#gen(len: i32) -> FixedLenByteArray {
2183 let mut v = vec![0u8; len as usize];
2184 rng().fill_bytes(&mut v);
2185 ByteArray::from(v).into()
2186 }
2187 }
2188
2189 #[test]
2190 fn test_fixed_length_binary_column_reader() {
2191 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2192 20,
2193 ConvertedType::NONE,
2194 None,
2195 |vals| {
2196 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2197 for val in vals {
2198 match val {
2199 Some(b) => builder.append_value(b).unwrap(),
2200 None => builder.append_null(),
2201 }
2202 }
2203 Arc::new(builder.finish())
2204 },
2205 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2206 );
2207 }
2208
2209 #[test]
2210 fn test_interval_day_time_column_reader() {
2211 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2212 12,
2213 ConvertedType::INTERVAL,
2214 None,
2215 |vals| {
2216 Arc::new(
2217 vals.iter()
2218 .map(|x| {
2219 x.as_ref().map(|b| IntervalDayTime {
2220 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2221 milliseconds: i32::from_le_bytes(
2222 b.as_ref()[8..12].try_into().unwrap(),
2223 ),
2224 })
2225 })
2226 .collect::<IntervalDayTimeArray>(),
2227 )
2228 },
2229 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2230 );
2231 }
2232
2233 #[test]
2234 fn test_int96_single_column_reader_test() {
2235 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2236
2237 type TypeHintAndConversionFunction =
2238 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2239
2240 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2241 (None, |vals: &[Option<Int96>]| {
2243 Arc::new(TimestampNanosecondArray::from_iter(
2244 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2245 )) as ArrayRef
2246 }),
2247 (
2249 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2250 |vals: &[Option<Int96>]| {
2251 Arc::new(TimestampSecondArray::from_iter(
2252 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2253 )) as ArrayRef
2254 },
2255 ),
2256 (
2257 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2258 |vals: &[Option<Int96>]| {
2259 Arc::new(TimestampMillisecondArray::from_iter(
2260 vals.iter().map(|x| x.map(|x| x.to_millis())),
2261 )) as ArrayRef
2262 },
2263 ),
2264 (
2265 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2266 |vals: &[Option<Int96>]| {
2267 Arc::new(TimestampMicrosecondArray::from_iter(
2268 vals.iter().map(|x| x.map(|x| x.to_micros())),
2269 )) as ArrayRef
2270 },
2271 ),
2272 (
2273 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2274 |vals: &[Option<Int96>]| {
2275 Arc::new(TimestampNanosecondArray::from_iter(
2276 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2277 )) as ArrayRef
2278 },
2279 ),
2280 (
2282 Some(ArrowDataType::Timestamp(
2283 TimeUnit::Second,
2284 Some(Arc::from("-05:00")),
2285 )),
2286 |vals: &[Option<Int96>]| {
2287 Arc::new(
2288 TimestampSecondArray::from_iter(
2289 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2290 )
2291 .with_timezone("-05:00"),
2292 ) as ArrayRef
2293 },
2294 ),
2295 ];
2296
2297 resolutions.iter().for_each(|(arrow_type, converter)| {
2298 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2299 2,
2300 ConvertedType::NONE,
2301 arrow_type.clone(),
2302 converter,
2303 encodings,
2304 );
2305 })
2306 }
2307
2308 #[test]
2309 fn test_int96_from_spark_file_with_provided_schema() {
2310 use arrow_schema::DataType::Timestamp;
2314 let test_data = arrow::util::test_util::parquet_test_data();
2315 let path = format!("{test_data}/int96_from_spark.parquet");
2316 let file = File::open(path).unwrap();
2317
2318 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2319 "a",
2320 Timestamp(TimeUnit::Microsecond, None),
2321 true,
2322 )]));
2323 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2324
2325 let mut record_reader =
2326 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2327 .unwrap()
2328 .build()
2329 .unwrap();
2330
2331 let batch = record_reader.next().unwrap().unwrap();
2332 assert_eq!(batch.num_columns(), 1);
2333 let column = batch.column(0);
2334 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2335
2336 let expected = Arc::new(Int64Array::from(vec![
2337 Some(1704141296123456),
2338 Some(1704070800000000),
2339 Some(253402225200000000),
2340 Some(1735599600000000),
2341 None,
2342 Some(9089380393200000000),
2343 ]));
2344
2345 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2350 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2351
2352 assert_eq!(casted_timestamps.len(), expected.len());
2353
2354 casted_timestamps
2355 .iter()
2356 .zip(expected.iter())
2357 .for_each(|(lhs, rhs)| {
2358 assert_eq!(lhs, rhs);
2359 });
2360 }
2361
2362 #[test]
2363 fn test_int96_from_spark_file_without_provided_schema() {
2364 use arrow_schema::DataType::Timestamp;
2368 let test_data = arrow::util::test_util::parquet_test_data();
2369 let path = format!("{test_data}/int96_from_spark.parquet");
2370 let file = File::open(path).unwrap();
2371
2372 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2373 .unwrap()
2374 .build()
2375 .unwrap();
2376
2377 let batch = record_reader.next().unwrap().unwrap();
2378 assert_eq!(batch.num_columns(), 1);
2379 let column = batch.column(0);
2380 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2381
2382 let expected = Arc::new(Int64Array::from(vec![
2383 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2388 Some(-4864435138808946688), ]));
2390
2391 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2396 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2397
2398 assert_eq!(casted_timestamps.len(), expected.len());
2399
2400 casted_timestamps
2401 .iter()
2402 .zip(expected.iter())
2403 .for_each(|(lhs, rhs)| {
2404 assert_eq!(lhs, rhs);
2405 });
2406 }
2407
2408 struct RandUtf8Gen {}
2409
2410 impl RandGen<ByteArrayType> for RandUtf8Gen {
2411 fn r#gen(len: i32) -> ByteArray {
2412 Int32Type::r#gen(len).to_string().as_str().into()
2413 }
2414 }
2415
2416 #[test]
2417 fn test_utf8_single_column_reader_test() {
2418 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2419 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2420 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2421 })))
2422 }
2423
2424 let encodings = &[
2425 Encoding::PLAIN,
2426 Encoding::RLE_DICTIONARY,
2427 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2428 Encoding::DELTA_BYTE_ARRAY,
2429 ];
2430
2431 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2432 2,
2433 ConvertedType::NONE,
2434 None,
2435 |vals| {
2436 Arc::new(BinaryArray::from_iter(
2437 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2438 ))
2439 },
2440 encodings,
2441 );
2442
2443 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2444 2,
2445 ConvertedType::UTF8,
2446 None,
2447 string_converter::<i32>,
2448 encodings,
2449 );
2450
2451 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2452 2,
2453 ConvertedType::UTF8,
2454 Some(ArrowDataType::Utf8),
2455 string_converter::<i32>,
2456 encodings,
2457 );
2458
2459 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2460 2,
2461 ConvertedType::UTF8,
2462 Some(ArrowDataType::LargeUtf8),
2463 string_converter::<i64>,
2464 encodings,
2465 );
2466
2467 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2468 for key in &small_key_types {
2469 for encoding in encodings {
2470 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2471 opts.encoding = *encoding;
2472
2473 let data_type =
2474 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2475
2476 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2478 opts,
2479 2,
2480 ConvertedType::UTF8,
2481 Some(data_type.clone()),
2482 move |vals| {
2483 let vals = string_converter::<i32>(vals);
2484 arrow::compute::cast(&vals, &data_type).unwrap()
2485 },
2486 );
2487 }
2488 }
2489
2490 let key_types = [
2491 ArrowDataType::Int16,
2492 ArrowDataType::UInt16,
2493 ArrowDataType::Int32,
2494 ArrowDataType::UInt32,
2495 ArrowDataType::Int64,
2496 ArrowDataType::UInt64,
2497 ];
2498
2499 for key in &key_types {
2500 let data_type =
2501 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2502
2503 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2504 2,
2505 ConvertedType::UTF8,
2506 Some(data_type.clone()),
2507 move |vals| {
2508 let vals = string_converter::<i32>(vals);
2509 arrow::compute::cast(&vals, &data_type).unwrap()
2510 },
2511 encodings,
2512 );
2513
2514 let data_type = ArrowDataType::Dictionary(
2515 Box::new(key.clone()),
2516 Box::new(ArrowDataType::LargeUtf8),
2517 );
2518
2519 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2520 2,
2521 ConvertedType::UTF8,
2522 Some(data_type.clone()),
2523 move |vals| {
2524 let vals = string_converter::<i64>(vals);
2525 arrow::compute::cast(&vals, &data_type).unwrap()
2526 },
2527 encodings,
2528 );
2529 }
2530 }
2531
2532 #[test]
2533 fn test_decimal_nullable_struct() {
2534 let decimals = Decimal256Array::from_iter_values(
2535 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2536 );
2537
2538 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2539 "decimals",
2540 decimals.data_type().clone(),
2541 false,
2542 )])))
2543 .len(8)
2544 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2545 .child_data(vec![decimals.into_data()])
2546 .build()
2547 .unwrap();
2548
2549 let written =
2550 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2551 .unwrap();
2552
2553 let mut buffer = Vec::with_capacity(1024);
2554 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2555 writer.write(&written).unwrap();
2556 writer.close().unwrap();
2557
2558 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2559 .unwrap()
2560 .collect::<Result<Vec<_>, _>>()
2561 .unwrap();
2562
2563 assert_eq!(&written.slice(0, 3), &read[0]);
2564 assert_eq!(&written.slice(3, 3), &read[1]);
2565 assert_eq!(&written.slice(6, 2), &read[2]);
2566 }
2567
2568 #[test]
2569 fn test_int32_nullable_struct() {
2570 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2571 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2572 "int32",
2573 int32.data_type().clone(),
2574 false,
2575 )])))
2576 .len(8)
2577 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2578 .child_data(vec![int32.into_data()])
2579 .build()
2580 .unwrap();
2581
2582 let written =
2583 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2584 .unwrap();
2585
2586 let mut buffer = Vec::with_capacity(1024);
2587 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2588 writer.write(&written).unwrap();
2589 writer.close().unwrap();
2590
2591 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2592 .unwrap()
2593 .collect::<Result<Vec<_>, _>>()
2594 .unwrap();
2595
2596 assert_eq!(&written.slice(0, 3), &read[0]);
2597 assert_eq!(&written.slice(3, 3), &read[1]);
2598 assert_eq!(&written.slice(6, 2), &read[2]);
2599 }
2600
2601 #[test]
2602 fn test_decimal_list() {
2603 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2604
2605 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2607 decimals.data_type().clone(),
2608 false,
2609 ))))
2610 .len(7)
2611 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2612 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2613 .child_data(vec![decimals.into_data()])
2614 .build()
2615 .unwrap();
2616
2617 let written =
2618 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2619 .unwrap();
2620
2621 let mut buffer = Vec::with_capacity(1024);
2622 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2623 writer.write(&written).unwrap();
2624 writer.close().unwrap();
2625
2626 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2627 .unwrap()
2628 .collect::<Result<Vec<_>, _>>()
2629 .unwrap();
2630
2631 assert_eq!(&written.slice(0, 3), &read[0]);
2632 assert_eq!(&written.slice(3, 3), &read[1]);
2633 assert_eq!(&written.slice(6, 1), &read[2]);
2634 }
2635
2636 #[test]
2637 fn test_read_decimal_file() {
2638 use arrow_array::Decimal128Array;
2639 let testdata = arrow::util::test_util::parquet_test_data();
2640 let file_variants = vec![
2641 ("byte_array", 4),
2642 ("fixed_length", 25),
2643 ("int32", 4),
2644 ("int64", 10),
2645 ];
2646 for (prefix, target_precision) in file_variants {
2647 let path = format!("{testdata}/{prefix}_decimal.parquet");
2648 let file = File::open(path).unwrap();
2649 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2650
2651 let batch = record_reader.next().unwrap().unwrap();
2652 assert_eq!(batch.num_rows(), 24);
2653 let col = batch
2654 .column(0)
2655 .as_any()
2656 .downcast_ref::<Decimal128Array>()
2657 .unwrap();
2658
2659 let expected = 1..25;
2660
2661 assert_eq!(col.precision(), target_precision);
2662 assert_eq!(col.scale(), 2);
2663
2664 for (i, v) in expected.enumerate() {
2665 assert_eq!(col.value(i), v * 100_i128);
2666 }
2667 }
2668 }
2669
2670 #[test]
2671 fn test_read_float16_nonzeros_file() {
2672 use arrow_array::Float16Array;
2673 let testdata = arrow::util::test_util::parquet_test_data();
2674 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2676 let file = File::open(path).unwrap();
2677 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2678
2679 let batch = record_reader.next().unwrap().unwrap();
2680 assert_eq!(batch.num_rows(), 8);
2681 let col = batch
2682 .column(0)
2683 .as_any()
2684 .downcast_ref::<Float16Array>()
2685 .unwrap();
2686
2687 let f16_two = f16::ONE + f16::ONE;
2688
2689 assert_eq!(col.null_count(), 1);
2690 assert!(col.is_null(0));
2691 assert_eq!(col.value(1), f16::ONE);
2692 assert_eq!(col.value(2), -f16_two);
2693 assert!(col.value(3).is_nan());
2694 assert_eq!(col.value(4), f16::ZERO);
2695 assert!(col.value(4).is_sign_positive());
2696 assert_eq!(col.value(5), f16::NEG_ONE);
2697 assert_eq!(col.value(6), f16::NEG_ZERO);
2698 assert!(col.value(6).is_sign_negative());
2699 assert_eq!(col.value(7), f16_two);
2700 }
2701
2702 #[test]
2703 fn test_read_float16_zeros_file() {
2704 use arrow_array::Float16Array;
2705 let testdata = arrow::util::test_util::parquet_test_data();
2706 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2708 let file = File::open(path).unwrap();
2709 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2710
2711 let batch = record_reader.next().unwrap().unwrap();
2712 assert_eq!(batch.num_rows(), 3);
2713 let col = batch
2714 .column(0)
2715 .as_any()
2716 .downcast_ref::<Float16Array>()
2717 .unwrap();
2718
2719 assert_eq!(col.null_count(), 1);
2720 assert!(col.is_null(0));
2721 assert_eq!(col.value(1), f16::ZERO);
2722 assert!(col.value(1).is_sign_positive());
2723 assert!(col.value(2).is_nan());
2724 }
2725
2726 #[test]
2727 fn test_read_float32_float64_byte_stream_split() {
2728 let path = format!(
2729 "{}/byte_stream_split.zstd.parquet",
2730 arrow::util::test_util::parquet_test_data(),
2731 );
2732 let file = File::open(path).unwrap();
2733 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2734
2735 let mut row_count = 0;
2736 for batch in record_reader {
2737 let batch = batch.unwrap();
2738 row_count += batch.num_rows();
2739 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2740 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2741
2742 for &x in f32_col.values() {
2744 assert!(x > -10.0);
2745 assert!(x < 10.0);
2746 }
2747 for &x in f64_col.values() {
2748 assert!(x > -10.0);
2749 assert!(x < 10.0);
2750 }
2751 }
2752 assert_eq!(row_count, 300);
2753 }
2754
2755 #[test]
2756 fn test_read_extended_byte_stream_split() {
2757 let path = format!(
2758 "{}/byte_stream_split_extended.gzip.parquet",
2759 arrow::util::test_util::parquet_test_data(),
2760 );
2761 let file = File::open(path).unwrap();
2762 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2763
2764 let mut row_count = 0;
2765 for batch in record_reader {
2766 let batch = batch.unwrap();
2767 row_count += batch.num_rows();
2768
2769 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2771 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2772 assert_eq!(f16_col.len(), f16_bss.len());
2773 f16_col
2774 .iter()
2775 .zip(f16_bss.iter())
2776 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2777
2778 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2780 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2781 assert_eq!(f32_col.len(), f32_bss.len());
2782 f32_col
2783 .iter()
2784 .zip(f32_bss.iter())
2785 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2786
2787 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2789 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2790 assert_eq!(f64_col.len(), f64_bss.len());
2791 f64_col
2792 .iter()
2793 .zip(f64_bss.iter())
2794 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2795
2796 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2798 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2799 assert_eq!(i32_col.len(), i32_bss.len());
2800 i32_col
2801 .iter()
2802 .zip(i32_bss.iter())
2803 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2804
2805 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2807 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2808 assert_eq!(i64_col.len(), i64_bss.len());
2809 i64_col
2810 .iter()
2811 .zip(i64_bss.iter())
2812 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2813
2814 let flba_col = batch.column(10).as_fixed_size_binary();
2816 let flba_bss = batch.column(11).as_fixed_size_binary();
2817 assert_eq!(flba_col.len(), flba_bss.len());
2818 flba_col
2819 .iter()
2820 .zip(flba_bss.iter())
2821 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2822
2823 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2825 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2826 assert_eq!(dec_col.len(), dec_bss.len());
2827 dec_col
2828 .iter()
2829 .zip(dec_bss.iter())
2830 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2831 }
2832 assert_eq!(row_count, 200);
2833 }
2834
2835 #[test]
2836 fn test_read_incorrect_map_schema_file() {
2837 let testdata = arrow::util::test_util::parquet_test_data();
2838 let path = format!("{testdata}/incorrect_map_schema.parquet");
2840 let file = File::open(path).unwrap();
2841 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2842
2843 let batch = record_reader.next().unwrap().unwrap();
2844 assert_eq!(batch.num_rows(), 1);
2845
2846 let expected_schema = Schema::new(vec![Field::new(
2847 "my_map",
2848 ArrowDataType::Map(
2849 Arc::new(Field::new(
2850 "key_value",
2851 ArrowDataType::Struct(Fields::from(vec![
2852 Field::new("key", ArrowDataType::Utf8, false),
2853 Field::new("value", ArrowDataType::Utf8, true),
2854 ])),
2855 false,
2856 )),
2857 false,
2858 ),
2859 true,
2860 )]);
2861 assert_eq!(batch.schema().as_ref(), &expected_schema);
2862
2863 assert_eq!(batch.num_rows(), 1);
2864 assert_eq!(batch.column(0).null_count(), 0);
2865 assert_eq!(
2866 batch.column(0).as_map().keys().as_ref(),
2867 &StringArray::from(vec!["parent", "name"])
2868 );
2869 assert_eq!(
2870 batch.column(0).as_map().values().as_ref(),
2871 &StringArray::from(vec!["another", "report"])
2872 );
2873 }
2874
2875 #[test]
2876 fn test_read_dict_fixed_size_binary() {
2877 let schema = Arc::new(Schema::new(vec![Field::new(
2878 "a",
2879 ArrowDataType::Dictionary(
2880 Box::new(ArrowDataType::UInt8),
2881 Box::new(ArrowDataType::FixedSizeBinary(8)),
2882 ),
2883 true,
2884 )]));
2885 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2886 let values = FixedSizeBinaryArray::try_from_iter(
2887 vec![
2888 (0u8..8u8).collect::<Vec<u8>>(),
2889 (24u8..32u8).collect::<Vec<u8>>(),
2890 ]
2891 .into_iter(),
2892 )
2893 .unwrap();
2894 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2895 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2896
2897 let mut buffer = Vec::with_capacity(1024);
2898 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2899 writer.write(&batch).unwrap();
2900 writer.close().unwrap();
2901 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2902 .unwrap()
2903 .collect::<Result<Vec<_>, _>>()
2904 .unwrap();
2905
2906 assert_eq!(read.len(), 1);
2907 assert_eq!(&batch, &read[0])
2908 }
2909
2910 #[test]
2911 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2912 let struct_fields = Fields::from(vec![
2919 Field::new(
2920 "city",
2921 ArrowDataType::Dictionary(
2922 Box::new(ArrowDataType::UInt8),
2923 Box::new(ArrowDataType::Utf8),
2924 ),
2925 true,
2926 ),
2927 Field::new("name", ArrowDataType::Utf8, true),
2928 ]);
2929 let schema = Arc::new(Schema::new(vec![Field::new(
2930 "items",
2931 ArrowDataType::Struct(struct_fields.clone()),
2932 true,
2933 )]));
2934
2935 let items_arr = StructArray::new(
2936 struct_fields,
2937 vec![
2938 Arc::new(DictionaryArray::new(
2939 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2940 Arc::new(StringArray::from_iter_values(vec![
2941 "quebec",
2942 "fredericton",
2943 "halifax",
2944 ])),
2945 )),
2946 Arc::new(StringArray::from_iter_values(vec![
2947 "albert", "terry", "lance", "", "tim",
2948 ])),
2949 ],
2950 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2951 );
2952
2953 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2954 let mut buffer = Vec::with_capacity(1024);
2955 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2956 writer.write(&batch).unwrap();
2957 writer.close().unwrap();
2958 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2959 .unwrap()
2960 .collect::<Result<Vec<_>, _>>()
2961 .unwrap();
2962
2963 assert_eq!(read.len(), 1);
2964 assert_eq!(&batch, &read[0])
2965 }
2966
2967 #[derive(Clone)]
2969 struct TestOptions {
2970 num_row_groups: usize,
2973 num_rows: usize,
2975 record_batch_size: usize,
2977 null_percent: Option<usize>,
2979 write_batch_size: usize,
2984 max_data_page_size: usize,
2986 max_dict_page_size: usize,
2988 writer_version: WriterVersion,
2990 enabled_statistics: EnabledStatistics,
2992 encoding: Encoding,
2994 row_selections: Option<(RowSelection, usize)>,
2996 row_filter: Option<Vec<bool>>,
2998 limit: Option<usize>,
3000 offset: Option<usize>,
3002 }
3003
3004 impl std::fmt::Debug for TestOptions {
3006 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3007 f.debug_struct("TestOptions")
3008 .field("num_row_groups", &self.num_row_groups)
3009 .field("num_rows", &self.num_rows)
3010 .field("record_batch_size", &self.record_batch_size)
3011 .field("null_percent", &self.null_percent)
3012 .field("write_batch_size", &self.write_batch_size)
3013 .field("max_data_page_size", &self.max_data_page_size)
3014 .field("max_dict_page_size", &self.max_dict_page_size)
3015 .field("writer_version", &self.writer_version)
3016 .field("enabled_statistics", &self.enabled_statistics)
3017 .field("encoding", &self.encoding)
3018 .field("row_selections", &self.row_selections.is_some())
3019 .field("row_filter", &self.row_filter.is_some())
3020 .field("limit", &self.limit)
3021 .field("offset", &self.offset)
3022 .finish()
3023 }
3024 }
3025
3026 impl Default for TestOptions {
3027 fn default() -> Self {
3028 Self {
3029 num_row_groups: 2,
3030 num_rows: 100,
3031 record_batch_size: 15,
3032 null_percent: None,
3033 write_batch_size: 64,
3034 max_data_page_size: 1024 * 1024,
3035 max_dict_page_size: 1024 * 1024,
3036 writer_version: WriterVersion::PARQUET_1_0,
3037 enabled_statistics: EnabledStatistics::Page,
3038 encoding: Encoding::PLAIN,
3039 row_selections: None,
3040 row_filter: None,
3041 limit: None,
3042 offset: None,
3043 }
3044 }
3045 }
3046
3047 impl TestOptions {
3048 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3049 Self {
3050 num_row_groups,
3051 num_rows,
3052 record_batch_size,
3053 ..Default::default()
3054 }
3055 }
3056
3057 fn with_null_percent(self, null_percent: usize) -> Self {
3058 Self {
3059 null_percent: Some(null_percent),
3060 ..self
3061 }
3062 }
3063
3064 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3065 Self {
3066 max_data_page_size,
3067 ..self
3068 }
3069 }
3070
3071 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3072 Self {
3073 max_dict_page_size,
3074 ..self
3075 }
3076 }
3077
3078 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3079 Self {
3080 enabled_statistics,
3081 ..self
3082 }
3083 }
3084
3085 fn with_row_selections(self) -> Self {
3086 assert!(self.row_filter.is_none(), "Must set row selection first");
3087
3088 let mut rng = rng();
3089 let step = rng.random_range(self.record_batch_size..self.num_rows);
3090 let row_selections = create_test_selection(
3091 step,
3092 self.num_row_groups * self.num_rows,
3093 rng.random::<bool>(),
3094 );
3095 Self {
3096 row_selections: Some(row_selections),
3097 ..self
3098 }
3099 }
3100
3101 fn with_row_filter(self) -> Self {
3102 let row_count = match &self.row_selections {
3103 Some((_, count)) => *count,
3104 None => self.num_row_groups * self.num_rows,
3105 };
3106
3107 let mut rng = rng();
3108 Self {
3109 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3110 ..self
3111 }
3112 }
3113
3114 fn with_limit(self, limit: usize) -> Self {
3115 Self {
3116 limit: Some(limit),
3117 ..self
3118 }
3119 }
3120
3121 fn with_offset(self, offset: usize) -> Self {
3122 Self {
3123 offset: Some(offset),
3124 ..self
3125 }
3126 }
3127
3128 fn writer_props(&self) -> WriterProperties {
3129 let builder = WriterProperties::builder()
3130 .set_data_page_size_limit(self.max_data_page_size)
3131 .set_write_batch_size(self.write_batch_size)
3132 .set_writer_version(self.writer_version)
3133 .set_statistics_enabled(self.enabled_statistics);
3134
3135 let builder = match self.encoding {
3136 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3137 .set_dictionary_enabled(true)
3138 .set_dictionary_page_size_limit(self.max_dict_page_size),
3139 _ => builder
3140 .set_dictionary_enabled(false)
3141 .set_encoding(self.encoding),
3142 };
3143
3144 builder.build()
3145 }
3146 }
3147
3148 fn run_single_column_reader_tests<T, F, G>(
3155 rand_max: i32,
3156 converted_type: ConvertedType,
3157 arrow_type: Option<ArrowDataType>,
3158 converter: F,
3159 encodings: &[Encoding],
3160 ) where
3161 T: DataType,
3162 G: RandGen<T>,
3163 F: Fn(&[Option<T::T>]) -> ArrayRef,
3164 {
3165 let all_options = vec![
3166 TestOptions::new(2, 100, 15),
3169 TestOptions::new(3, 25, 5),
3174 TestOptions::new(4, 100, 25),
3178 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3180 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3182 TestOptions::new(2, 256, 127).with_null_percent(0),
3184 TestOptions::new(2, 256, 93).with_null_percent(25),
3186 TestOptions::new(4, 100, 25).with_limit(0),
3188 TestOptions::new(4, 100, 25).with_limit(50),
3190 TestOptions::new(4, 100, 25).with_limit(10),
3192 TestOptions::new(4, 100, 25).with_limit(101),
3194 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3196 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3198 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3200 TestOptions::new(2, 256, 91)
3202 .with_null_percent(25)
3203 .with_enabled_statistics(EnabledStatistics::Chunk),
3204 TestOptions::new(2, 256, 91)
3206 .with_null_percent(25)
3207 .with_enabled_statistics(EnabledStatistics::None),
3208 TestOptions::new(2, 128, 91)
3210 .with_null_percent(100)
3211 .with_enabled_statistics(EnabledStatistics::None),
3212 TestOptions::new(2, 100, 15).with_row_selections(),
3217 TestOptions::new(3, 25, 5).with_row_selections(),
3222 TestOptions::new(4, 100, 25).with_row_selections(),
3226 TestOptions::new(3, 256, 73)
3228 .with_max_data_page_size(128)
3229 .with_row_selections(),
3230 TestOptions::new(3, 256, 57)
3232 .with_max_dict_page_size(128)
3233 .with_row_selections(),
3234 TestOptions::new(2, 256, 127)
3236 .with_null_percent(0)
3237 .with_row_selections(),
3238 TestOptions::new(2, 256, 93)
3240 .with_null_percent(25)
3241 .with_row_selections(),
3242 TestOptions::new(2, 256, 93)
3244 .with_null_percent(25)
3245 .with_row_selections()
3246 .with_limit(10),
3247 TestOptions::new(2, 256, 93)
3249 .with_null_percent(25)
3250 .with_row_selections()
3251 .with_offset(20)
3252 .with_limit(10),
3253 TestOptions::new(4, 100, 25).with_row_filter(),
3257 TestOptions::new(4, 100, 25)
3259 .with_row_selections()
3260 .with_row_filter(),
3261 TestOptions::new(2, 256, 93)
3263 .with_null_percent(25)
3264 .with_max_data_page_size(10)
3265 .with_row_filter(),
3266 TestOptions::new(2, 256, 93)
3268 .with_null_percent(25)
3269 .with_max_data_page_size(10)
3270 .with_row_selections()
3271 .with_row_filter(),
3272 TestOptions::new(2, 256, 93)
3274 .with_enabled_statistics(EnabledStatistics::None)
3275 .with_max_data_page_size(10)
3276 .with_row_selections(),
3277 ];
3278
3279 all_options.into_iter().for_each(|opts| {
3280 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3281 for encoding in encodings {
3282 let opts = TestOptions {
3283 writer_version,
3284 encoding: *encoding,
3285 ..opts.clone()
3286 };
3287
3288 single_column_reader_test::<T, _, G>(
3289 opts,
3290 rand_max,
3291 converted_type,
3292 arrow_type.clone(),
3293 &converter,
3294 )
3295 }
3296 }
3297 });
3298 }
3299
3300 fn single_column_reader_test<T, F, G>(
3304 opts: TestOptions,
3305 rand_max: i32,
3306 converted_type: ConvertedType,
3307 arrow_type: Option<ArrowDataType>,
3308 converter: F,
3309 ) where
3310 T: DataType,
3311 G: RandGen<T>,
3312 F: Fn(&[Option<T::T>]) -> ArrayRef,
3313 {
3314 println!(
3316 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3317 T::get_physical_type(),
3318 converted_type,
3319 arrow_type,
3320 opts
3321 );
3322
3323 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3325 Some(null_percent) => {
3326 let mut rng = rng();
3327
3328 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3329 .map(|_| {
3330 std::iter::from_fn(|| {
3331 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3332 })
3333 .take(opts.num_rows)
3334 .collect()
3335 })
3336 .collect();
3337 (Repetition::OPTIONAL, Some(def_levels))
3338 }
3339 None => (Repetition::REQUIRED, None),
3340 };
3341
3342 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3344 .map(|idx| {
3345 let null_count = match def_levels.as_ref() {
3346 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3347 None => 0,
3348 };
3349 G::gen_vec(rand_max, opts.num_rows - null_count)
3350 })
3351 .collect();
3352
3353 let len = match T::get_physical_type() {
3354 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3355 crate::basic::Type::INT96 => 12,
3356 _ => -1,
3357 };
3358
3359 let fields = vec![Arc::new(
3360 Type::primitive_type_builder("leaf", T::get_physical_type())
3361 .with_repetition(repetition)
3362 .with_converted_type(converted_type)
3363 .with_length(len)
3364 .build()
3365 .unwrap(),
3366 )];
3367
3368 let schema = Arc::new(
3369 Type::group_type_builder("test_schema")
3370 .with_fields(fields)
3371 .build()
3372 .unwrap(),
3373 );
3374
3375 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3376
3377 let mut file = tempfile::tempfile().unwrap();
3378
3379 generate_single_column_file_with_data::<T>(
3380 &values,
3381 def_levels.as_ref(),
3382 file.try_clone().unwrap(), schema,
3384 arrow_field,
3385 &opts,
3386 )
3387 .unwrap();
3388
3389 file.rewind().unwrap();
3390
3391 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3392 opts.enabled_statistics == EnabledStatistics::Page,
3393 ));
3394
3395 let mut builder =
3396 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3397
3398 let expected_data = match opts.row_selections {
3399 Some((selections, row_count)) => {
3400 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3401
3402 let mut skip_data: Vec<Option<T::T>> = vec![];
3403 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3404 for select in dequeue {
3405 if select.skip {
3406 without_skip_data.drain(0..select.row_count);
3407 } else {
3408 skip_data.extend(without_skip_data.drain(0..select.row_count));
3409 }
3410 }
3411 builder = builder.with_row_selection(selections);
3412
3413 assert_eq!(skip_data.len(), row_count);
3414 skip_data
3415 }
3416 None => {
3417 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3419 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3420 expected_data
3421 }
3422 };
3423
3424 let mut expected_data = match opts.row_filter {
3425 Some(filter) => {
3426 let expected_data = expected_data
3427 .into_iter()
3428 .zip(filter.iter())
3429 .filter_map(|(d, f)| f.then(|| d))
3430 .collect();
3431
3432 let mut filter_offset = 0;
3433 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3434 ProjectionMask::all(),
3435 move |b| {
3436 let array = BooleanArray::from_iter(
3437 filter
3438 .iter()
3439 .skip(filter_offset)
3440 .take(b.num_rows())
3441 .map(|x| Some(*x)),
3442 );
3443 filter_offset += b.num_rows();
3444 Ok(array)
3445 },
3446 ))]);
3447
3448 builder = builder.with_row_filter(filter);
3449 expected_data
3450 }
3451 None => expected_data,
3452 };
3453
3454 if let Some(offset) = opts.offset {
3455 builder = builder.with_offset(offset);
3456 expected_data = expected_data.into_iter().skip(offset).collect();
3457 }
3458
3459 if let Some(limit) = opts.limit {
3460 builder = builder.with_limit(limit);
3461 expected_data = expected_data.into_iter().take(limit).collect();
3462 }
3463
3464 let mut record_reader = builder
3465 .with_batch_size(opts.record_batch_size)
3466 .build()
3467 .unwrap();
3468
3469 let mut total_read = 0;
3470 loop {
3471 let maybe_batch = record_reader.next();
3472 if total_read < expected_data.len() {
3473 let end = min(total_read + opts.record_batch_size, expected_data.len());
3474 let batch = maybe_batch.unwrap().unwrap();
3475 assert_eq!(end - total_read, batch.num_rows());
3476
3477 let a = converter(&expected_data[total_read..end]);
3478 let b = batch.column(0);
3479
3480 assert_eq!(a.data_type(), b.data_type());
3481 assert_eq!(a.to_data(), b.to_data());
3482 assert_eq!(
3483 a.as_any().type_id(),
3484 b.as_any().type_id(),
3485 "incorrect type ids"
3486 );
3487
3488 total_read = end;
3489 } else {
3490 assert!(maybe_batch.is_none());
3491 break;
3492 }
3493 }
3494 }
3495
3496 fn gen_expected_data<T: DataType>(
3497 def_levels: Option<&Vec<Vec<i16>>>,
3498 values: &[Vec<T::T>],
3499 ) -> Vec<Option<T::T>> {
3500 let data: Vec<Option<T::T>> = match def_levels {
3501 Some(levels) => {
3502 let mut values_iter = values.iter().flatten();
3503 levels
3504 .iter()
3505 .flatten()
3506 .map(|d| match d {
3507 1 => Some(values_iter.next().cloned().unwrap()),
3508 0 => None,
3509 _ => unreachable!(),
3510 })
3511 .collect()
3512 }
3513 None => values.iter().flatten().cloned().map(Some).collect(),
3514 };
3515 data
3516 }
3517
3518 fn generate_single_column_file_with_data<T: DataType>(
3519 values: &[Vec<T::T>],
3520 def_levels: Option<&Vec<Vec<i16>>>,
3521 file: File,
3522 schema: TypePtr,
3523 field: Option<Field>,
3524 opts: &TestOptions,
3525 ) -> Result<ParquetMetaData> {
3526 let mut writer_props = opts.writer_props();
3527 if let Some(field) = field {
3528 let arrow_schema = Schema::new(vec![field]);
3529 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3530 }
3531
3532 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3533
3534 for (idx, v) in values.iter().enumerate() {
3535 let def_levels = def_levels.map(|d| d[idx].as_slice());
3536 let mut row_group_writer = writer.next_row_group()?;
3537 {
3538 let mut column_writer = row_group_writer
3539 .next_column()?
3540 .expect("Column writer is none!");
3541
3542 column_writer
3543 .typed::<T>()
3544 .write_batch(v, def_levels, None)?;
3545
3546 column_writer.close()?;
3547 }
3548 row_group_writer.close()?;
3549 }
3550
3551 writer.close()
3552 }
3553
3554 fn get_test_file(file_name: &str) -> File {
3555 let mut path = PathBuf::new();
3556 path.push(arrow::util::test_util::arrow_test_data());
3557 path.push(file_name);
3558
3559 File::open(path.as_path()).expect("File not found!")
3560 }
3561
3562 #[test]
3563 fn test_read_structs() {
3564 let testdata = arrow::util::test_util::parquet_test_data();
3568 let path = format!("{testdata}/nested_structs.rust.parquet");
3569 let file = File::open(&path).unwrap();
3570 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3571
3572 for batch in record_batch_reader {
3573 batch.unwrap();
3574 }
3575
3576 let file = File::open(&path).unwrap();
3577 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3578
3579 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3580 let projected_reader = builder
3581 .with_projection(mask)
3582 .with_batch_size(60)
3583 .build()
3584 .unwrap();
3585
3586 let expected_schema = Schema::new(vec![
3587 Field::new(
3588 "roll_num",
3589 ArrowDataType::Struct(Fields::from(vec![Field::new(
3590 "count",
3591 ArrowDataType::UInt64,
3592 false,
3593 )])),
3594 false,
3595 ),
3596 Field::new(
3597 "PC_CUR",
3598 ArrowDataType::Struct(Fields::from(vec![
3599 Field::new("mean", ArrowDataType::Int64, false),
3600 Field::new("sum", ArrowDataType::Int64, false),
3601 ])),
3602 false,
3603 ),
3604 ]);
3605
3606 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3608
3609 for batch in projected_reader {
3610 let batch = batch.unwrap();
3611 assert_eq!(batch.schema().as_ref(), &expected_schema);
3612 }
3613 }
3614
3615 #[test]
3616 fn test_read_structs_by_name() {
3618 let testdata = arrow::util::test_util::parquet_test_data();
3619 let path = format!("{testdata}/nested_structs.rust.parquet");
3620 let file = File::open(&path).unwrap();
3621 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3622
3623 for batch in record_batch_reader {
3624 batch.unwrap();
3625 }
3626
3627 let file = File::open(&path).unwrap();
3628 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3629
3630 let mask = ProjectionMask::columns(
3631 builder.parquet_schema(),
3632 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3633 );
3634 let projected_reader = builder
3635 .with_projection(mask)
3636 .with_batch_size(60)
3637 .build()
3638 .unwrap();
3639
3640 let expected_schema = Schema::new(vec![
3641 Field::new(
3642 "roll_num",
3643 ArrowDataType::Struct(Fields::from(vec![Field::new(
3644 "count",
3645 ArrowDataType::UInt64,
3646 false,
3647 )])),
3648 false,
3649 ),
3650 Field::new(
3651 "PC_CUR",
3652 ArrowDataType::Struct(Fields::from(vec![
3653 Field::new("mean", ArrowDataType::Int64, false),
3654 Field::new("sum", ArrowDataType::Int64, false),
3655 ])),
3656 false,
3657 ),
3658 ]);
3659
3660 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3661
3662 for batch in projected_reader {
3663 let batch = batch.unwrap();
3664 assert_eq!(batch.schema().as_ref(), &expected_schema);
3665 }
3666 }
3667
3668 #[test]
3669 fn test_read_maps() {
3670 let testdata = arrow::util::test_util::parquet_test_data();
3671 let path = format!("{testdata}/nested_maps.snappy.parquet");
3672 let file = File::open(path).unwrap();
3673 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3674
3675 for batch in record_batch_reader {
3676 batch.unwrap();
3677 }
3678 }
3679
3680 #[test]
3681 fn test_nested_nullability() {
3682 let message_type = "message nested {
3683 OPTIONAL Group group {
3684 REQUIRED INT32 leaf;
3685 }
3686 }";
3687
3688 let file = tempfile::tempfile().unwrap();
3689 let schema = Arc::new(parse_message_type(message_type).unwrap());
3690
3691 {
3692 let mut writer =
3694 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3695 .unwrap();
3696
3697 {
3698 let mut row_group_writer = writer.next_row_group().unwrap();
3699 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3700
3701 column_writer
3702 .typed::<Int32Type>()
3703 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3704 .unwrap();
3705
3706 column_writer.close().unwrap();
3707 row_group_writer.close().unwrap();
3708 }
3709
3710 writer.close().unwrap();
3711 }
3712
3713 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3714 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3715
3716 let reader = builder.with_projection(mask).build().unwrap();
3717
3718 let expected_schema = Schema::new(vec![Field::new(
3719 "group",
3720 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3721 true,
3722 )]);
3723
3724 let batch = reader.into_iter().next().unwrap().unwrap();
3725 assert_eq!(batch.schema().as_ref(), &expected_schema);
3726 assert_eq!(batch.num_rows(), 4);
3727 assert_eq!(batch.column(0).null_count(), 2);
3728 }
3729
3730 #[test]
3731 fn test_dictionary_preservation() {
3732 let fields = vec![Arc::new(
3733 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3734 .with_repetition(Repetition::OPTIONAL)
3735 .with_converted_type(ConvertedType::UTF8)
3736 .build()
3737 .unwrap(),
3738 )];
3739
3740 let schema = Arc::new(
3741 Type::group_type_builder("test_schema")
3742 .with_fields(fields)
3743 .build()
3744 .unwrap(),
3745 );
3746
3747 let dict_type = ArrowDataType::Dictionary(
3748 Box::new(ArrowDataType::Int32),
3749 Box::new(ArrowDataType::Utf8),
3750 );
3751
3752 let arrow_field = Field::new("leaf", dict_type, true);
3753
3754 let mut file = tempfile::tempfile().unwrap();
3755
3756 let values = vec![
3757 vec![
3758 ByteArray::from("hello"),
3759 ByteArray::from("a"),
3760 ByteArray::from("b"),
3761 ByteArray::from("d"),
3762 ],
3763 vec![
3764 ByteArray::from("c"),
3765 ByteArray::from("a"),
3766 ByteArray::from("b"),
3767 ],
3768 ];
3769
3770 let def_levels = vec![
3771 vec![1, 0, 0, 1, 0, 0, 1, 1],
3772 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3773 ];
3774
3775 let opts = TestOptions {
3776 encoding: Encoding::RLE_DICTIONARY,
3777 ..Default::default()
3778 };
3779
3780 generate_single_column_file_with_data::<ByteArrayType>(
3781 &values,
3782 Some(&def_levels),
3783 file.try_clone().unwrap(), schema,
3785 Some(arrow_field),
3786 &opts,
3787 )
3788 .unwrap();
3789
3790 file.rewind().unwrap();
3791
3792 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3793
3794 let batches = record_reader
3795 .collect::<Result<Vec<RecordBatch>, _>>()
3796 .unwrap();
3797
3798 assert_eq!(batches.len(), 6);
3799 assert!(batches.iter().all(|x| x.num_columns() == 1));
3800
3801 let row_counts = batches
3802 .iter()
3803 .map(|x| (x.num_rows(), x.column(0).null_count()))
3804 .collect::<Vec<_>>();
3805
3806 assert_eq!(
3807 row_counts,
3808 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3809 );
3810
3811 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3812
3813 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3815 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3817 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3818 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3820 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3821 }
3822
3823 #[test]
3824 fn test_read_null_list() {
3825 let testdata = arrow::util::test_util::parquet_test_data();
3826 let path = format!("{testdata}/null_list.parquet");
3827 let file = File::open(path).unwrap();
3828 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3829
3830 let batch = record_batch_reader.next().unwrap().unwrap();
3831 assert_eq!(batch.num_rows(), 1);
3832 assert_eq!(batch.num_columns(), 1);
3833 assert_eq!(batch.column(0).len(), 1);
3834
3835 let list = batch
3836 .column(0)
3837 .as_any()
3838 .downcast_ref::<ListArray>()
3839 .unwrap();
3840 assert_eq!(list.len(), 1);
3841 assert!(list.is_valid(0));
3842
3843 let val = list.value(0);
3844 assert_eq!(val.len(), 0);
3845 }
3846
3847 #[test]
3848 fn test_null_schema_inference() {
3849 let testdata = arrow::util::test_util::parquet_test_data();
3850 let path = format!("{testdata}/null_list.parquet");
3851 let file = File::open(path).unwrap();
3852
3853 let arrow_field = Field::new(
3854 "emptylist",
3855 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3856 true,
3857 );
3858
3859 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3860 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3861 let schema = builder.schema();
3862 assert_eq!(schema.fields().len(), 1);
3863 assert_eq!(schema.field(0), &arrow_field);
3864 }
3865
3866 #[test]
3867 fn test_skip_metadata() {
3868 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3869 let field = Field::new("col", col.data_type().clone(), true);
3870
3871 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3872
3873 let metadata = [("key".to_string(), "value".to_string())]
3874 .into_iter()
3875 .collect();
3876
3877 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3878
3879 assert_ne!(schema_with_metadata, schema_without_metadata);
3880
3881 let batch =
3882 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3883
3884 let file = |version: WriterVersion| {
3885 let props = WriterProperties::builder()
3886 .set_writer_version(version)
3887 .build();
3888
3889 let file = tempfile().unwrap();
3890 let mut writer =
3891 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3892 .unwrap();
3893 writer.write(&batch).unwrap();
3894 writer.close().unwrap();
3895 file
3896 };
3897
3898 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3899
3900 let v1_reader = file(WriterVersion::PARQUET_1_0);
3901 let v2_reader = file(WriterVersion::PARQUET_2_0);
3902
3903 let arrow_reader =
3904 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3905 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3906
3907 let reader =
3908 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3909 .unwrap()
3910 .build()
3911 .unwrap();
3912 assert_eq!(reader.schema(), schema_without_metadata);
3913
3914 let arrow_reader =
3915 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3916 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3917
3918 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3919 .unwrap()
3920 .build()
3921 .unwrap();
3922 assert_eq!(reader.schema(), schema_without_metadata);
3923 }
3924
3925 fn write_parquet_from_iter<I, F>(value: I) -> File
3926 where
3927 I: IntoIterator<Item = (F, ArrayRef)>,
3928 F: AsRef<str>,
3929 {
3930 let batch = RecordBatch::try_from_iter(value).unwrap();
3931 let file = tempfile().unwrap();
3932 let mut writer =
3933 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3934 writer.write(&batch).unwrap();
3935 writer.close().unwrap();
3936 file
3937 }
3938
3939 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3940 where
3941 I: IntoIterator<Item = (F, ArrayRef)>,
3942 F: AsRef<str>,
3943 {
3944 let file = write_parquet_from_iter(value);
3945 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3946 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3947 file.try_clone().unwrap(),
3948 options_with_schema,
3949 );
3950 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3951 }
3952
3953 #[test]
3954 fn test_schema_too_few_columns() {
3955 run_schema_test_with_error(
3956 vec![
3957 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3958 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3959 ],
3960 Arc::new(Schema::new(vec![Field::new(
3961 "int64",
3962 ArrowDataType::Int64,
3963 false,
3964 )])),
3965 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3966 );
3967 }
3968
3969 #[test]
3970 fn test_schema_too_many_columns() {
3971 run_schema_test_with_error(
3972 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3973 Arc::new(Schema::new(vec![
3974 Field::new("int64", ArrowDataType::Int64, false),
3975 Field::new("int32", ArrowDataType::Int32, false),
3976 ])),
3977 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3978 );
3979 }
3980
3981 #[test]
3982 fn test_schema_mismatched_column_names() {
3983 run_schema_test_with_error(
3984 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3985 Arc::new(Schema::new(vec![Field::new(
3986 "other",
3987 ArrowDataType::Int64,
3988 false,
3989 )])),
3990 "Arrow: incompatible arrow schema, expected field named int64 got other",
3991 );
3992 }
3993
3994 #[test]
3995 fn test_schema_incompatible_columns() {
3996 run_schema_test_with_error(
3997 vec![
3998 (
3999 "col1_invalid",
4000 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4001 ),
4002 (
4003 "col2_valid",
4004 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4005 ),
4006 (
4007 "col3_invalid",
4008 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4009 ),
4010 ],
4011 Arc::new(Schema::new(vec![
4012 Field::new("col1_invalid", ArrowDataType::Int32, false),
4013 Field::new("col2_valid", ArrowDataType::Int32, false),
4014 Field::new("col3_invalid", ArrowDataType::Int32, false),
4015 ])),
4016 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4017 );
4018 }
4019
4020 #[test]
4021 fn test_one_incompatible_nested_column() {
4022 let nested_fields = Fields::from(vec![
4023 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4024 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4025 ]);
4026 let nested = StructArray::try_new(
4027 nested_fields,
4028 vec![
4029 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4030 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4031 ],
4032 None,
4033 )
4034 .expect("struct array");
4035 let supplied_nested_fields = Fields::from(vec![
4036 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4037 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4038 ]);
4039 run_schema_test_with_error(
4040 vec![
4041 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4042 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4043 ("nested", Arc::new(nested) as ArrayRef),
4044 ],
4045 Arc::new(Schema::new(vec![
4046 Field::new("col1", ArrowDataType::Int64, false),
4047 Field::new("col2", ArrowDataType::Int32, false),
4048 Field::new(
4049 "nested",
4050 ArrowDataType::Struct(supplied_nested_fields),
4051 false,
4052 ),
4053 ])),
4054 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4055 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4056 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4057 );
4058 }
4059
4060 fn utf8_parquet() -> Bytes {
4062 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4063 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4064 let props = None;
4065 let mut parquet_data = vec![];
4067 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4068 writer.write(&batch).unwrap();
4069 writer.close().unwrap();
4070 Bytes::from(parquet_data)
4071 }
4072
4073 #[test]
4074 fn test_schema_error_bad_types() {
4075 let parquet_data = utf8_parquet();
4077
4078 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4080 "column1",
4081 arrow::datatypes::DataType::Int32,
4082 false,
4083 )]));
4084
4085 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4087 let err =
4088 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4089 .unwrap_err();
4090 assert_eq!(
4091 err.to_string(),
4092 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4093 )
4094 }
4095
4096 #[test]
4097 fn test_schema_error_bad_nullability() {
4098 let parquet_data = utf8_parquet();
4100
4101 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4103 "column1",
4104 arrow::datatypes::DataType::Utf8,
4105 true,
4106 )]));
4107
4108 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4110 let err =
4111 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4112 .unwrap_err();
4113 assert_eq!(
4114 err.to_string(),
4115 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4116 )
4117 }
4118
4119 #[test]
4120 fn test_read_binary_as_utf8() {
4121 let file = write_parquet_from_iter(vec![
4122 (
4123 "binary_to_utf8",
4124 Arc::new(BinaryArray::from(vec![
4125 b"one".as_ref(),
4126 b"two".as_ref(),
4127 b"three".as_ref(),
4128 ])) as ArrayRef,
4129 ),
4130 (
4131 "large_binary_to_large_utf8",
4132 Arc::new(LargeBinaryArray::from(vec![
4133 b"one".as_ref(),
4134 b"two".as_ref(),
4135 b"three".as_ref(),
4136 ])) as ArrayRef,
4137 ),
4138 (
4139 "binary_view_to_utf8_view",
4140 Arc::new(BinaryViewArray::from(vec![
4141 b"one".as_ref(),
4142 b"two".as_ref(),
4143 b"three".as_ref(),
4144 ])) as ArrayRef,
4145 ),
4146 ]);
4147 let supplied_fields = Fields::from(vec![
4148 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4149 Field::new(
4150 "large_binary_to_large_utf8",
4151 ArrowDataType::LargeUtf8,
4152 false,
4153 ),
4154 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4155 ]);
4156
4157 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4158 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4159 file.try_clone().unwrap(),
4160 options,
4161 )
4162 .expect("reader builder with schema")
4163 .build()
4164 .expect("reader with schema");
4165
4166 let batch = arrow_reader.next().unwrap().unwrap();
4167 assert_eq!(batch.num_columns(), 3);
4168 assert_eq!(batch.num_rows(), 3);
4169 assert_eq!(
4170 batch
4171 .column(0)
4172 .as_string::<i32>()
4173 .iter()
4174 .collect::<Vec<_>>(),
4175 vec![Some("one"), Some("two"), Some("three")]
4176 );
4177
4178 assert_eq!(
4179 batch
4180 .column(1)
4181 .as_string::<i64>()
4182 .iter()
4183 .collect::<Vec<_>>(),
4184 vec![Some("one"), Some("two"), Some("three")]
4185 );
4186
4187 assert_eq!(
4188 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4189 vec![Some("one"), Some("two"), Some("three")]
4190 );
4191 }
4192
4193 #[test]
4194 #[should_panic(expected = "Invalid UTF8 sequence at")]
4195 fn test_read_non_utf8_binary_as_utf8() {
4196 let file = write_parquet_from_iter(vec![(
4197 "non_utf8_binary",
4198 Arc::new(BinaryArray::from(vec![
4199 b"\xDE\x00\xFF".as_ref(),
4200 b"\xDE\x01\xAA".as_ref(),
4201 b"\xDE\x02\xFF".as_ref(),
4202 ])) as ArrayRef,
4203 )]);
4204 let supplied_fields = Fields::from(vec![Field::new(
4205 "non_utf8_binary",
4206 ArrowDataType::Utf8,
4207 false,
4208 )]);
4209
4210 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4211 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4212 file.try_clone().unwrap(),
4213 options,
4214 )
4215 .expect("reader builder with schema")
4216 .build()
4217 .expect("reader with schema");
4218 arrow_reader.next().unwrap().unwrap_err();
4219 }
4220
4221 #[test]
4222 fn test_with_schema() {
4223 let nested_fields = Fields::from(vec![
4224 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4225 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4226 ]);
4227
4228 let nested_arrays: Vec<ArrayRef> = vec![
4229 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4230 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4231 ];
4232
4233 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4234
4235 let file = write_parquet_from_iter(vec![
4236 (
4237 "int32_to_ts_second",
4238 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4239 ),
4240 (
4241 "date32_to_date64",
4242 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4243 ),
4244 ("nested", Arc::new(nested) as ArrayRef),
4245 ]);
4246
4247 let supplied_nested_fields = Fields::from(vec![
4248 Field::new(
4249 "utf8_to_dict",
4250 ArrowDataType::Dictionary(
4251 Box::new(ArrowDataType::Int32),
4252 Box::new(ArrowDataType::Utf8),
4253 ),
4254 false,
4255 ),
4256 Field::new(
4257 "int64_to_ts_nano",
4258 ArrowDataType::Timestamp(
4259 arrow::datatypes::TimeUnit::Nanosecond,
4260 Some("+10:00".into()),
4261 ),
4262 false,
4263 ),
4264 ]);
4265
4266 let supplied_schema = Arc::new(Schema::new(vec![
4267 Field::new(
4268 "int32_to_ts_second",
4269 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4270 false,
4271 ),
4272 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4273 Field::new(
4274 "nested",
4275 ArrowDataType::Struct(supplied_nested_fields),
4276 false,
4277 ),
4278 ]));
4279
4280 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4281 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4282 file.try_clone().unwrap(),
4283 options,
4284 )
4285 .expect("reader builder with schema")
4286 .build()
4287 .expect("reader with schema");
4288
4289 assert_eq!(arrow_reader.schema(), supplied_schema);
4290 let batch = arrow_reader.next().unwrap().unwrap();
4291 assert_eq!(batch.num_columns(), 3);
4292 assert_eq!(batch.num_rows(), 4);
4293 assert_eq!(
4294 batch
4295 .column(0)
4296 .as_any()
4297 .downcast_ref::<TimestampSecondArray>()
4298 .expect("downcast to timestamp second")
4299 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4300 .map(|v| v.to_string())
4301 .expect("value as datetime"),
4302 "1970-01-01 01:00:00 +01:00"
4303 );
4304 assert_eq!(
4305 batch
4306 .column(1)
4307 .as_any()
4308 .downcast_ref::<Date64Array>()
4309 .expect("downcast to date64")
4310 .value_as_date(0)
4311 .map(|v| v.to_string())
4312 .expect("value as date"),
4313 "1970-01-01"
4314 );
4315
4316 let nested = batch
4317 .column(2)
4318 .as_any()
4319 .downcast_ref::<StructArray>()
4320 .expect("downcast to struct");
4321
4322 let nested_dict = nested
4323 .column(0)
4324 .as_any()
4325 .downcast_ref::<Int32DictionaryArray>()
4326 .expect("downcast to dictionary");
4327
4328 assert_eq!(
4329 nested_dict
4330 .values()
4331 .as_any()
4332 .downcast_ref::<StringArray>()
4333 .expect("downcast to string")
4334 .iter()
4335 .collect::<Vec<_>>(),
4336 vec![Some("a"), Some("b")]
4337 );
4338
4339 assert_eq!(
4340 nested_dict.keys().iter().collect::<Vec<_>>(),
4341 vec![Some(0), Some(0), Some(0), Some(1)]
4342 );
4343
4344 assert_eq!(
4345 nested
4346 .column(1)
4347 .as_any()
4348 .downcast_ref::<TimestampNanosecondArray>()
4349 .expect("downcast to timestamp nanosecond")
4350 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4351 .map(|v| v.to_string())
4352 .expect("value as datetime"),
4353 "1970-01-01 10:00:00.000000001 +10:00"
4354 );
4355 }
4356
4357 #[test]
4358 fn test_empty_projection() {
4359 let testdata = arrow::util::test_util::parquet_test_data();
4360 let path = format!("{testdata}/alltypes_plain.parquet");
4361 let file = File::open(path).unwrap();
4362
4363 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4364 let file_metadata = builder.metadata().file_metadata();
4365 let expected_rows = file_metadata.num_rows() as usize;
4366
4367 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4368 let batch_reader = builder
4369 .with_projection(mask)
4370 .with_batch_size(2)
4371 .build()
4372 .unwrap();
4373
4374 let mut total_rows = 0;
4375 for maybe_batch in batch_reader {
4376 let batch = maybe_batch.unwrap();
4377 total_rows += batch.num_rows();
4378 assert_eq!(batch.num_columns(), 0);
4379 assert!(batch.num_rows() <= 2);
4380 }
4381
4382 assert_eq!(total_rows, expected_rows);
4383 }
4384
4385 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4386 let schema = Arc::new(Schema::new(vec![Field::new(
4387 "list",
4388 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4389 true,
4390 )]));
4391
4392 let mut buf = Vec::with_capacity(1024);
4393
4394 let mut writer = ArrowWriter::try_new(
4395 &mut buf,
4396 schema.clone(),
4397 Some(
4398 WriterProperties::builder()
4399 .set_max_row_group_row_count(Some(row_group_size))
4400 .build(),
4401 ),
4402 )
4403 .unwrap();
4404 for _ in 0..2 {
4405 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4406 for _ in 0..(batch_size) {
4407 list_builder.append(true);
4408 }
4409 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4410 .unwrap();
4411 writer.write(&batch).unwrap();
4412 }
4413 writer.close().unwrap();
4414
4415 let mut record_reader =
4416 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4417 assert_eq!(
4418 batch_size,
4419 record_reader.next().unwrap().unwrap().num_rows()
4420 );
4421 assert_eq!(
4422 batch_size,
4423 record_reader.next().unwrap().unwrap().num_rows()
4424 );
4425 }
4426
4427 #[test]
4428 fn test_row_group_exact_multiple() {
4429 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4430 test_row_group_batch(8, 8);
4431 test_row_group_batch(10, 8);
4432 test_row_group_batch(8, 10);
4433 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4434 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4435 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4436 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4437 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4438 }
4439
4440 fn get_expected_batches(
4443 column: &RecordBatch,
4444 selection: &RowSelection,
4445 batch_size: usize,
4446 ) -> Vec<RecordBatch> {
4447 let mut expected_batches = vec![];
4448
4449 let mut selection: VecDeque<_> = selection.clone().into();
4450 let mut row_offset = 0;
4451 let mut last_start = None;
4452 while row_offset < column.num_rows() && !selection.is_empty() {
4453 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4454 while batch_remaining > 0 && !selection.is_empty() {
4455 let (to_read, skip) = match selection.front_mut() {
4456 Some(selection) if selection.row_count > batch_remaining => {
4457 selection.row_count -= batch_remaining;
4458 (batch_remaining, selection.skip)
4459 }
4460 Some(_) => {
4461 let select = selection.pop_front().unwrap();
4462 (select.row_count, select.skip)
4463 }
4464 None => break,
4465 };
4466
4467 batch_remaining -= to_read;
4468
4469 match skip {
4470 true => {
4471 if let Some(last_start) = last_start.take() {
4472 expected_batches.push(column.slice(last_start, row_offset - last_start))
4473 }
4474 row_offset += to_read
4475 }
4476 false => {
4477 last_start.get_or_insert(row_offset);
4478 row_offset += to_read
4479 }
4480 }
4481 }
4482 }
4483
4484 if let Some(last_start) = last_start.take() {
4485 expected_batches.push(column.slice(last_start, row_offset - last_start))
4486 }
4487
4488 for batch in &expected_batches[..expected_batches.len() - 1] {
4490 assert_eq!(batch.num_rows(), batch_size);
4491 }
4492
4493 expected_batches
4494 }
4495
4496 fn create_test_selection(
4497 step_len: usize,
4498 total_len: usize,
4499 skip_first: bool,
4500 ) -> (RowSelection, usize) {
4501 let mut remaining = total_len;
4502 let mut skip = skip_first;
4503 let mut vec = vec![];
4504 let mut selected_count = 0;
4505 while remaining != 0 {
4506 let step = if remaining > step_len {
4507 step_len
4508 } else {
4509 remaining
4510 };
4511 vec.push(RowSelector {
4512 row_count: step,
4513 skip,
4514 });
4515 remaining -= step;
4516 if !skip {
4517 selected_count += step;
4518 }
4519 skip = !skip;
4520 }
4521 (vec.into(), selected_count)
4522 }
4523
4524 #[test]
4525 fn test_scan_row_with_selection() {
4526 let testdata = arrow::util::test_util::parquet_test_data();
4527 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4528 let test_file = File::open(&path).unwrap();
4529
4530 let mut serial_reader =
4531 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4532 let data = serial_reader.next().unwrap().unwrap();
4533
4534 let do_test = |batch_size: usize, selection_len: usize| {
4535 for skip_first in [false, true] {
4536 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4537
4538 let expected = get_expected_batches(&data, &selections, batch_size);
4539 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4540 assert_eq!(
4541 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4542 expected,
4543 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4544 );
4545 }
4546 };
4547
4548 do_test(1000, 1000);
4551
4552 do_test(20, 20);
4554
4555 do_test(20, 5);
4557
4558 do_test(20, 5);
4561
4562 fn create_skip_reader(
4563 test_file: &File,
4564 batch_size: usize,
4565 selections: RowSelection,
4566 ) -> ParquetRecordBatchReader {
4567 let options =
4568 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4569 let file = test_file.try_clone().unwrap();
4570 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4571 .unwrap()
4572 .with_batch_size(batch_size)
4573 .with_row_selection(selections)
4574 .build()
4575 .unwrap()
4576 }
4577 }
4578
4579 #[test]
4580 fn test_batch_size_overallocate() {
4581 let testdata = arrow::util::test_util::parquet_test_data();
4582 let path = format!("{testdata}/alltypes_plain.parquet");
4584 let test_file = File::open(path).unwrap();
4585
4586 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4587 let num_rows = builder.metadata.file_metadata().num_rows();
4588 let reader = builder
4589 .with_batch_size(1024)
4590 .with_projection(ProjectionMask::all())
4591 .build()
4592 .unwrap();
4593 assert_ne!(1024, num_rows);
4594 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4595 }
4596
4597 #[test]
4598 fn test_read_with_page_index_enabled() {
4599 let testdata = arrow::util::test_util::parquet_test_data();
4600
4601 {
4602 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4604 let test_file = File::open(path).unwrap();
4605 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4606 test_file,
4607 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4608 )
4609 .unwrap();
4610 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4611 let reader = builder.build().unwrap();
4612 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4613 assert_eq!(batches.len(), 8);
4614 }
4615
4616 {
4617 let path = format!("{testdata}/alltypes_plain.parquet");
4619 let test_file = File::open(path).unwrap();
4620 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4621 test_file,
4622 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4623 )
4624 .unwrap();
4625 assert!(builder.metadata().offset_index().is_none());
4628 let reader = builder.build().unwrap();
4629 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4630 assert_eq!(batches.len(), 1);
4631 }
4632 }
4633
4634 #[test]
4635 fn test_raw_repetition() {
4636 const MESSAGE_TYPE: &str = "
4637 message Log {
4638 OPTIONAL INT32 eventType;
4639 REPEATED INT32 category;
4640 REPEATED group filter {
4641 OPTIONAL INT32 error;
4642 }
4643 }
4644 ";
4645 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4646 let props = Default::default();
4647
4648 let mut buf = Vec::with_capacity(1024);
4649 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4650 let mut row_group_writer = writer.next_row_group().unwrap();
4651
4652 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4654 col_writer
4655 .typed::<Int32Type>()
4656 .write_batch(&[1], Some(&[1]), None)
4657 .unwrap();
4658 col_writer.close().unwrap();
4659 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4661 col_writer
4662 .typed::<Int32Type>()
4663 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4664 .unwrap();
4665 col_writer.close().unwrap();
4666 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4668 col_writer
4669 .typed::<Int32Type>()
4670 .write_batch(&[1], Some(&[1]), Some(&[0]))
4671 .unwrap();
4672 col_writer.close().unwrap();
4673
4674 let rg_md = row_group_writer.close().unwrap();
4675 assert_eq!(rg_md.num_rows(), 1);
4676 writer.close().unwrap();
4677
4678 let bytes = Bytes::from(buf);
4679
4680 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4681 let full = no_mask.next().unwrap().unwrap();
4682
4683 assert_eq!(full.num_columns(), 3);
4684
4685 for idx in 0..3 {
4686 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4687 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4688 let mut reader = b.with_projection(mask).build().unwrap();
4689 let projected = reader.next().unwrap().unwrap();
4690
4691 assert_eq!(projected.num_columns(), 1);
4692 assert_eq!(full.column(idx), projected.column(0));
4693 }
4694 }
4695
4696 #[test]
4697 fn test_read_lz4_raw() {
4698 let testdata = arrow::util::test_util::parquet_test_data();
4699 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4700 let file = File::open(path).unwrap();
4701
4702 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4703 .unwrap()
4704 .collect::<Result<Vec<_>, _>>()
4705 .unwrap();
4706 assert_eq!(batches.len(), 1);
4707 let batch = &batches[0];
4708
4709 assert_eq!(batch.num_columns(), 3);
4710 assert_eq!(batch.num_rows(), 4);
4711
4712 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4714 assert_eq!(
4715 a.values(),
4716 &[1593604800, 1593604800, 1593604801, 1593604801]
4717 );
4718
4719 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4720 let a: Vec<_> = a.iter().flatten().collect();
4721 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4722
4723 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4724 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4725 }
4726
4727 #[test]
4737 fn test_read_lz4_hadoop_fallback() {
4738 for file in [
4739 "hadoop_lz4_compressed.parquet",
4740 "non_hadoop_lz4_compressed.parquet",
4741 ] {
4742 let testdata = arrow::util::test_util::parquet_test_data();
4743 let path = format!("{testdata}/{file}");
4744 let file = File::open(path).unwrap();
4745 let expected_rows = 4;
4746
4747 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4748 .unwrap()
4749 .collect::<Result<Vec<_>, _>>()
4750 .unwrap();
4751 assert_eq!(batches.len(), 1);
4752 let batch = &batches[0];
4753
4754 assert_eq!(batch.num_columns(), 3);
4755 assert_eq!(batch.num_rows(), expected_rows);
4756
4757 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4758 assert_eq!(
4759 a.values(),
4760 &[1593604800, 1593604800, 1593604801, 1593604801]
4761 );
4762
4763 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4764 let b: Vec<_> = b.iter().flatten().collect();
4765 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4766
4767 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4768 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4769 }
4770 }
4771
4772 #[test]
4773 fn test_read_lz4_hadoop_large() {
4774 let testdata = arrow::util::test_util::parquet_test_data();
4775 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4776 let file = File::open(path).unwrap();
4777 let expected_rows = 10000;
4778
4779 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4780 .unwrap()
4781 .collect::<Result<Vec<_>, _>>()
4782 .unwrap();
4783 assert_eq!(batches.len(), 1);
4784 let batch = &batches[0];
4785
4786 assert_eq!(batch.num_columns(), 1);
4787 assert_eq!(batch.num_rows(), expected_rows);
4788
4789 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4790 let a: Vec<_> = a.iter().flatten().collect();
4791 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4792 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4793 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4794 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4795 }
4796
4797 #[test]
4798 #[cfg(feature = "snap")]
4799 fn test_read_nested_lists() {
4800 let testdata = arrow::util::test_util::parquet_test_data();
4801 let path = format!("{testdata}/nested_lists.snappy.parquet");
4802 let file = File::open(path).unwrap();
4803
4804 let f = file.try_clone().unwrap();
4805 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4806 let expected = reader.next().unwrap().unwrap();
4807 assert_eq!(expected.num_rows(), 3);
4808
4809 let selection = RowSelection::from(vec![
4810 RowSelector::skip(1),
4811 RowSelector::select(1),
4812 RowSelector::skip(1),
4813 ]);
4814 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4815 .unwrap()
4816 .with_row_selection(selection)
4817 .build()
4818 .unwrap();
4819
4820 let actual = reader.next().unwrap().unwrap();
4821 assert_eq!(actual.num_rows(), 1);
4822 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4823 }
4824
4825 #[test]
4826 fn test_arbitrary_decimal() {
4827 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4828 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4829 .with_precision_and_scale(19, 0)
4830 .unwrap();
4831 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4832 .with_precision_and_scale(12, 0)
4833 .unwrap();
4834 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4835 .with_precision_and_scale(17, 10)
4836 .unwrap();
4837
4838 let written = RecordBatch::try_from_iter([
4839 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4840 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4841 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4842 ])
4843 .unwrap();
4844
4845 let mut buffer = Vec::with_capacity(1024);
4846 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4847 writer.write(&written).unwrap();
4848 writer.close().unwrap();
4849
4850 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4851 .unwrap()
4852 .collect::<Result<Vec<_>, _>>()
4853 .unwrap();
4854
4855 assert_eq!(&written.slice(0, 8), &read[0]);
4856 }
4857
4858 #[test]
4859 fn test_list_skip() {
4860 let mut list = ListBuilder::new(Int32Builder::new());
4861 list.append_value([Some(1), Some(2)]);
4862 list.append_value([Some(3)]);
4863 list.append_value([Some(4)]);
4864 let list = list.finish();
4865 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4866
4867 let props = WriterProperties::builder()
4869 .set_data_page_row_count_limit(1)
4870 .set_write_batch_size(2)
4871 .build();
4872
4873 let mut buffer = Vec::with_capacity(1024);
4874 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4875 writer.write(&batch).unwrap();
4876 writer.close().unwrap();
4877
4878 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4879 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4880 .unwrap()
4881 .with_row_selection(selection.into())
4882 .build()
4883 .unwrap();
4884 let out = reader.next().unwrap().unwrap();
4885 assert_eq!(out.num_rows(), 1);
4886 assert_eq!(out, batch.slice(2, 1));
4887 }
4888
4889 fn test_decimal32_roundtrip() {
4890 let d = |values: Vec<i32>, p: u8| {
4891 let iter = values.into_iter();
4892 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4893 .with_precision_and_scale(p, 2)
4894 .unwrap()
4895 };
4896
4897 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4898 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4899
4900 let mut buffer = Vec::with_capacity(1024);
4901 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4902 writer.write(&batch).unwrap();
4903 writer.close().unwrap();
4904
4905 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4906 let t1 = builder.parquet_schema().columns()[0].physical_type();
4907 assert_eq!(t1, PhysicalType::INT32);
4908
4909 let mut reader = builder.build().unwrap();
4910 assert_eq!(batch.schema(), reader.schema());
4911
4912 let out = reader.next().unwrap().unwrap();
4913 assert_eq!(batch, out);
4914 }
4915
4916 fn test_decimal64_roundtrip() {
4917 let d = |values: Vec<i64>, p: u8| {
4921 let iter = values.into_iter();
4922 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4923 .with_precision_and_scale(p, 2)
4924 .unwrap()
4925 };
4926
4927 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4928 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4929 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4930
4931 let batch = RecordBatch::try_from_iter([
4932 ("d1", Arc::new(d1) as ArrayRef),
4933 ("d2", Arc::new(d2) as ArrayRef),
4934 ("d3", Arc::new(d3) as ArrayRef),
4935 ])
4936 .unwrap();
4937
4938 let mut buffer = Vec::with_capacity(1024);
4939 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4940 writer.write(&batch).unwrap();
4941 writer.close().unwrap();
4942
4943 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4944 let t1 = builder.parquet_schema().columns()[0].physical_type();
4945 assert_eq!(t1, PhysicalType::INT32);
4946 let t2 = builder.parquet_schema().columns()[1].physical_type();
4947 assert_eq!(t2, PhysicalType::INT64);
4948 let t3 = builder.parquet_schema().columns()[2].physical_type();
4949 assert_eq!(t3, PhysicalType::INT64);
4950
4951 let mut reader = builder.build().unwrap();
4952 assert_eq!(batch.schema(), reader.schema());
4953
4954 let out = reader.next().unwrap().unwrap();
4955 assert_eq!(batch, out);
4956 }
4957
4958 fn test_decimal_roundtrip<T: DecimalType>() {
4959 let d = |values: Vec<usize>, p: u8| {
4964 let iter = values.into_iter().map(T::Native::usize_as);
4965 PrimitiveArray::<T>::from_iter_values(iter)
4966 .with_precision_and_scale(p, 2)
4967 .unwrap()
4968 };
4969
4970 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4971 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4972 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4973 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4974
4975 let batch = RecordBatch::try_from_iter([
4976 ("d1", Arc::new(d1) as ArrayRef),
4977 ("d2", Arc::new(d2) as ArrayRef),
4978 ("d3", Arc::new(d3) as ArrayRef),
4979 ("d4", Arc::new(d4) as ArrayRef),
4980 ])
4981 .unwrap();
4982
4983 let mut buffer = Vec::with_capacity(1024);
4984 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4985 writer.write(&batch).unwrap();
4986 writer.close().unwrap();
4987
4988 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4989 let t1 = builder.parquet_schema().columns()[0].physical_type();
4990 assert_eq!(t1, PhysicalType::INT32);
4991 let t2 = builder.parquet_schema().columns()[1].physical_type();
4992 assert_eq!(t2, PhysicalType::INT64);
4993 let t3 = builder.parquet_schema().columns()[2].physical_type();
4994 assert_eq!(t3, PhysicalType::INT64);
4995 let t4 = builder.parquet_schema().columns()[3].physical_type();
4996 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4997
4998 let mut reader = builder.build().unwrap();
4999 assert_eq!(batch.schema(), reader.schema());
5000
5001 let out = reader.next().unwrap().unwrap();
5002 assert_eq!(batch, out);
5003 }
5004
5005 #[test]
5006 fn test_decimal() {
5007 test_decimal32_roundtrip();
5008 test_decimal64_roundtrip();
5009 test_decimal_roundtrip::<Decimal128Type>();
5010 test_decimal_roundtrip::<Decimal256Type>();
5011 }
5012
5013 #[test]
5014 fn test_list_selection() {
5015 let schema = Arc::new(Schema::new(vec![Field::new_list(
5016 "list",
5017 Field::new_list_field(ArrowDataType::Utf8, true),
5018 false,
5019 )]));
5020 let mut buf = Vec::with_capacity(1024);
5021
5022 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5023
5024 for i in 0..2 {
5025 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5026 for j in 0..1024 {
5027 list_a_builder.values().append_value(format!("{i} {j}"));
5028 list_a_builder.append(true);
5029 }
5030 let batch =
5031 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5032 .unwrap();
5033 writer.write(&batch).unwrap();
5034 }
5035 let _metadata = writer.close().unwrap();
5036
5037 let buf = Bytes::from(buf);
5038 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5039 .unwrap()
5040 .with_row_selection(RowSelection::from(vec![
5041 RowSelector::skip(100),
5042 RowSelector::select(924),
5043 RowSelector::skip(100),
5044 RowSelector::select(924),
5045 ]))
5046 .build()
5047 .unwrap();
5048
5049 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5050 let batch = concat_batches(&schema, &batches).unwrap();
5051
5052 assert_eq!(batch.num_rows(), 924 * 2);
5053 let list = batch.column(0).as_list::<i32>();
5054
5055 for w in list.value_offsets().windows(2) {
5056 assert_eq!(w[0] + 1, w[1])
5057 }
5058 let mut values = list.values().as_string::<i32>().iter();
5059
5060 for i in 0..2 {
5061 for j in 100..1024 {
5062 let expected = format!("{i} {j}");
5063 assert_eq!(values.next().unwrap().unwrap(), &expected);
5064 }
5065 }
5066 }
5067
5068 #[test]
5069 fn test_list_selection_fuzz() {
5070 let mut rng = rng();
5071 let schema = Arc::new(Schema::new(vec![Field::new_list(
5072 "list",
5073 Field::new_list(
5074 Field::LIST_FIELD_DEFAULT_NAME,
5075 Field::new_list_field(ArrowDataType::Int32, true),
5076 true,
5077 ),
5078 true,
5079 )]));
5080 let mut buf = Vec::with_capacity(1024);
5081 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5082
5083 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5084
5085 for _ in 0..2048 {
5086 if rng.random_bool(0.2) {
5087 list_a_builder.append(false);
5088 continue;
5089 }
5090
5091 let list_a_len = rng.random_range(0..10);
5092 let list_b_builder = list_a_builder.values();
5093
5094 for _ in 0..list_a_len {
5095 if rng.random_bool(0.2) {
5096 list_b_builder.append(false);
5097 continue;
5098 }
5099
5100 let list_b_len = rng.random_range(0..10);
5101 let int_builder = list_b_builder.values();
5102 for _ in 0..list_b_len {
5103 match rng.random_bool(0.2) {
5104 true => int_builder.append_null(),
5105 false => int_builder.append_value(rng.random()),
5106 }
5107 }
5108 list_b_builder.append(true)
5109 }
5110 list_a_builder.append(true);
5111 }
5112
5113 let array = Arc::new(list_a_builder.finish());
5114 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5115
5116 writer.write(&batch).unwrap();
5117 let _metadata = writer.close().unwrap();
5118
5119 let buf = Bytes::from(buf);
5120
5121 let cases = [
5122 vec![
5123 RowSelector::skip(100),
5124 RowSelector::select(924),
5125 RowSelector::skip(100),
5126 RowSelector::select(924),
5127 ],
5128 vec![
5129 RowSelector::select(924),
5130 RowSelector::skip(100),
5131 RowSelector::select(924),
5132 RowSelector::skip(100),
5133 ],
5134 vec![
5135 RowSelector::skip(1023),
5136 RowSelector::select(1),
5137 RowSelector::skip(1023),
5138 RowSelector::select(1),
5139 ],
5140 vec![
5141 RowSelector::select(1),
5142 RowSelector::skip(1023),
5143 RowSelector::select(1),
5144 RowSelector::skip(1023),
5145 ],
5146 ];
5147
5148 for batch_size in [100, 1024, 2048] {
5149 for selection in &cases {
5150 let selection = RowSelection::from(selection.clone());
5151 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5152 .unwrap()
5153 .with_row_selection(selection.clone())
5154 .with_batch_size(batch_size)
5155 .build()
5156 .unwrap();
5157
5158 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5159 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5160 assert_eq!(actual.num_rows(), selection.row_count());
5161
5162 let mut batch_offset = 0;
5163 let mut actual_offset = 0;
5164 for selector in selection.iter() {
5165 if selector.skip {
5166 batch_offset += selector.row_count;
5167 continue;
5168 }
5169
5170 assert_eq!(
5171 batch.slice(batch_offset, selector.row_count),
5172 actual.slice(actual_offset, selector.row_count)
5173 );
5174
5175 batch_offset += selector.row_count;
5176 actual_offset += selector.row_count;
5177 }
5178 }
5179 }
5180 }
5181
5182 #[test]
5183 fn test_read_old_nested_list() {
5184 use arrow::datatypes::DataType;
5185 use arrow::datatypes::ToByteSlice;
5186
5187 let testdata = arrow::util::test_util::parquet_test_data();
5188 let path = format!("{testdata}/old_list_structure.parquet");
5197 let test_file = File::open(path).unwrap();
5198
5199 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5201
5202 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5204
5205 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5207 "array",
5208 DataType::Int32,
5209 false,
5210 ))))
5211 .len(2)
5212 .add_buffer(a_value_offsets)
5213 .add_child_data(a_values.into_data())
5214 .build()
5215 .unwrap();
5216 let a = ListArray::from(a_list_data);
5217
5218 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5219 let mut reader = builder.build().unwrap();
5220 let out = reader.next().unwrap().unwrap();
5221 assert_eq!(out.num_rows(), 1);
5222 assert_eq!(out.num_columns(), 1);
5223 let c0 = out.column(0);
5225 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5226 let r0 = c0arr.value(0);
5228 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5229 assert_eq!(r0arr, &a);
5230 }
5231
5232 #[test]
5233 fn test_map_no_value() {
5234 let testdata = arrow::util::test_util::parquet_test_data();
5254 let path = format!("{testdata}/map_no_value.parquet");
5255 let file = File::open(path).unwrap();
5256
5257 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5258 .unwrap()
5259 .build()
5260 .unwrap();
5261 let out = reader.next().unwrap().unwrap();
5262 assert_eq!(out.num_rows(), 3);
5263 assert_eq!(out.num_columns(), 3);
5264 let c0 = out.column(1).as_list::<i32>();
5266 let c1 = out.column(2).as_list::<i32>();
5267 assert_eq!(c0.len(), c1.len());
5268 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5269 }
5270
5271 #[test]
5272 fn test_read_unknown_logical_type() {
5273 let testdata = arrow::util::test_util::parquet_test_data();
5274 let path = format!("{testdata}/unknown-logical-type.parquet");
5275 let test_file = File::open(path).unwrap();
5276
5277 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5278 .expect("Error creating reader builder");
5279
5280 let schema = builder.metadata().file_metadata().schema_descr();
5281 assert_eq!(
5282 schema.column(0).logical_type_ref(),
5283 Some(&LogicalType::String)
5284 );
5285 assert_eq!(
5286 schema.column(1).logical_type_ref(),
5287 Some(&LogicalType::_Unknown { field_id: 2555 })
5288 );
5289 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5290
5291 let mut reader = builder.build().unwrap();
5292 let out = reader.next().unwrap().unwrap();
5293 assert_eq!(out.num_rows(), 3);
5294 assert_eq!(out.num_columns(), 2);
5295 }
5296
5297 #[test]
5298 fn test_read_row_numbers() {
5299 let file = write_parquet_from_iter(vec![(
5300 "value",
5301 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5302 )]);
5303 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5304
5305 let row_number_field = Arc::new(
5306 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5307 );
5308
5309 let options = ArrowReaderOptions::new()
5310 .with_schema(Arc::new(Schema::new(supplied_fields)))
5311 .with_virtual_columns(vec![row_number_field.clone()])
5312 .unwrap();
5313 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5314 file.try_clone().unwrap(),
5315 options,
5316 )
5317 .expect("reader builder with schema")
5318 .build()
5319 .expect("reader with schema");
5320
5321 let batch = arrow_reader.next().unwrap().unwrap();
5322 let schema = Arc::new(Schema::new(vec![
5323 Field::new("value", ArrowDataType::Int64, false),
5324 (*row_number_field).clone(),
5325 ]));
5326
5327 assert_eq!(batch.schema(), schema);
5328 assert_eq!(batch.num_columns(), 2);
5329 assert_eq!(batch.num_rows(), 3);
5330 assert_eq!(
5331 batch
5332 .column(0)
5333 .as_primitive::<types::Int64Type>()
5334 .iter()
5335 .collect::<Vec<_>>(),
5336 vec![Some(1), Some(2), Some(3)]
5337 );
5338 assert_eq!(
5339 batch
5340 .column(1)
5341 .as_primitive::<types::Int64Type>()
5342 .iter()
5343 .collect::<Vec<_>>(),
5344 vec![Some(0), Some(1), Some(2)]
5345 );
5346 }
5347
5348 #[test]
5349 fn test_read_only_row_numbers() {
5350 let file = write_parquet_from_iter(vec![(
5351 "value",
5352 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5353 )]);
5354 let row_number_field = Arc::new(
5355 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5356 );
5357 let options = ArrowReaderOptions::new()
5358 .with_virtual_columns(vec![row_number_field.clone()])
5359 .unwrap();
5360 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5361 let num_columns = metadata
5362 .metadata
5363 .file_metadata()
5364 .schema_descr()
5365 .num_columns();
5366
5367 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5368 .with_projection(ProjectionMask::none(num_columns))
5369 .build()
5370 .expect("reader with schema");
5371
5372 let batch = arrow_reader.next().unwrap().unwrap();
5373 let schema = Arc::new(Schema::new(vec![row_number_field]));
5374
5375 assert_eq!(batch.schema(), schema);
5376 assert_eq!(batch.num_columns(), 1);
5377 assert_eq!(batch.num_rows(), 3);
5378 assert_eq!(
5379 batch
5380 .column(0)
5381 .as_primitive::<types::Int64Type>()
5382 .iter()
5383 .collect::<Vec<_>>(),
5384 vec![Some(0), Some(1), Some(2)]
5385 );
5386 }
5387
5388 #[test]
5389 fn test_read_row_numbers_row_group_order() -> Result<()> {
5390 let array = Int64Array::from_iter_values(5000..5100);
5392 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5393 let mut buffer = Vec::new();
5394 let options = WriterProperties::builder()
5395 .set_max_row_group_row_count(Some(50))
5396 .build();
5397 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5398 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5400 writer.write(&batch_chunk)?;
5401 }
5402 writer.close()?;
5403
5404 let row_number_field = Arc::new(
5405 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5406 );
5407
5408 let buffer = Bytes::from(buffer);
5409
5410 let options =
5411 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5412
5413 let arrow_reader =
5415 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5416 .build()?;
5417
5418 assert_eq!(
5419 ValuesAndRowNumbers {
5420 values: (5000..5100).collect(),
5421 row_numbers: (0..100).collect()
5422 },
5423 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5424 );
5425
5426 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5428 .with_row_groups(vec![1, 0])
5429 .build()?;
5430
5431 assert_eq!(
5432 ValuesAndRowNumbers {
5433 values: (5050..5100).chain(5000..5050).collect(),
5434 row_numbers: (50..100).chain(0..50).collect(),
5435 },
5436 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5437 );
5438
5439 Ok(())
5440 }
5441
5442 #[derive(Debug, PartialEq)]
5443 struct ValuesAndRowNumbers {
5444 values: Vec<i64>,
5445 row_numbers: Vec<i64>,
5446 }
5447 impl ValuesAndRowNumbers {
5448 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5449 let mut values = vec![];
5450 let mut row_numbers = vec![];
5451 for batch in reader {
5452 let batch = batch.expect("Could not read batch");
5453 values.extend(
5454 batch
5455 .column_by_name("col")
5456 .expect("Could not get col column")
5457 .as_primitive::<arrow::datatypes::Int64Type>()
5458 .iter()
5459 .map(|v| v.expect("Could not get value")),
5460 );
5461
5462 row_numbers.extend(
5463 batch
5464 .column_by_name("row_number")
5465 .expect("Could not get row_number column")
5466 .as_primitive::<arrow::datatypes::Int64Type>()
5467 .iter()
5468 .map(|v| v.expect("Could not get row number"))
5469 .collect::<Vec<_>>(),
5470 );
5471 }
5472 Self {
5473 values,
5474 row_numbers,
5475 }
5476 }
5477 }
5478
5479 #[test]
5480 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5481 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5483 assert_eq!(
5484 ArrowReaderOptions::new()
5485 .with_virtual_columns(vec![regular_field])
5486 .unwrap_err()
5487 .to_string(),
5488 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5489 );
5490 }
5491
5492 #[test]
5493 fn test_row_numbers_with_multiple_row_groups() {
5494 test_row_numbers_with_multiple_row_groups_helper(
5495 false,
5496 |path, selection, _row_filter, batch_size| {
5497 let file = File::open(path).unwrap();
5498 let row_number_field = Arc::new(
5499 Field::new("row_number", ArrowDataType::Int64, false)
5500 .with_extension_type(RowNumber),
5501 );
5502 let options = ArrowReaderOptions::new()
5503 .with_virtual_columns(vec![row_number_field])
5504 .unwrap();
5505 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5506 .unwrap()
5507 .with_row_selection(selection)
5508 .with_batch_size(batch_size)
5509 .build()
5510 .expect("Could not create reader");
5511 reader
5512 .collect::<Result<Vec<_>, _>>()
5513 .expect("Could not read")
5514 },
5515 );
5516 }
5517
5518 #[test]
5519 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5520 test_row_numbers_with_multiple_row_groups_helper(
5521 true,
5522 |path, selection, row_filter, batch_size| {
5523 let file = File::open(path).unwrap();
5524 let row_number_field = Arc::new(
5525 Field::new("row_number", ArrowDataType::Int64, false)
5526 .with_extension_type(RowNumber),
5527 );
5528 let options = ArrowReaderOptions::new()
5529 .with_virtual_columns(vec![row_number_field])
5530 .unwrap();
5531 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5532 .unwrap()
5533 .with_row_selection(selection)
5534 .with_batch_size(batch_size)
5535 .with_row_filter(row_filter.expect("No filter"))
5536 .build()
5537 .expect("Could not create reader");
5538 reader
5539 .collect::<Result<Vec<_>, _>>()
5540 .expect("Could not read")
5541 },
5542 );
5543 }
5544
5545 #[test]
5546 fn test_read_row_group_indices() {
5547 let array1 = Int64Array::from(vec![1, 2]);
5549 let array2 = Int64Array::from(vec![3, 4]);
5550 let array3 = Int64Array::from(vec![5, 6]);
5551
5552 let batch1 =
5553 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5554 let batch2 =
5555 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5556 let batch3 =
5557 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5558
5559 let mut buffer = Vec::new();
5560 let options = WriterProperties::builder()
5561 .set_max_row_group_row_count(Some(2))
5562 .build();
5563 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5564 writer.write(&batch1).unwrap();
5565 writer.write(&batch2).unwrap();
5566 writer.write(&batch3).unwrap();
5567 writer.close().unwrap();
5568
5569 let file = Bytes::from(buffer);
5570 let row_group_index_field = Arc::new(
5571 Field::new("row_group_index", ArrowDataType::Int64, false)
5572 .with_extension_type(RowGroupIndex),
5573 );
5574
5575 let options = ArrowReaderOptions::new()
5576 .with_virtual_columns(vec![row_group_index_field.clone()])
5577 .unwrap();
5578 let mut arrow_reader =
5579 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5580 .expect("reader builder with virtual columns")
5581 .build()
5582 .expect("reader with virtual columns");
5583
5584 let batch = arrow_reader.next().unwrap().unwrap();
5585
5586 assert_eq!(batch.num_columns(), 2);
5587 assert_eq!(batch.num_rows(), 6);
5588
5589 assert_eq!(
5590 batch
5591 .column(0)
5592 .as_primitive::<types::Int64Type>()
5593 .iter()
5594 .collect::<Vec<_>>(),
5595 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5596 );
5597
5598 assert_eq!(
5599 batch
5600 .column(1)
5601 .as_primitive::<types::Int64Type>()
5602 .iter()
5603 .collect::<Vec<_>>(),
5604 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5605 );
5606 }
5607
5608 #[test]
5609 fn test_read_only_row_group_indices() {
5610 let array1 = Int64Array::from(vec![1, 2, 3]);
5611 let array2 = Int64Array::from(vec![4, 5]);
5612
5613 let batch1 =
5614 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5615 let batch2 =
5616 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5617
5618 let mut buffer = Vec::new();
5619 let options = WriterProperties::builder()
5620 .set_max_row_group_row_count(Some(3))
5621 .build();
5622 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5623 writer.write(&batch1).unwrap();
5624 writer.write(&batch2).unwrap();
5625 writer.close().unwrap();
5626
5627 let file = Bytes::from(buffer);
5628 let row_group_index_field = Arc::new(
5629 Field::new("row_group_index", ArrowDataType::Int64, false)
5630 .with_extension_type(RowGroupIndex),
5631 );
5632
5633 let options = ArrowReaderOptions::new()
5634 .with_virtual_columns(vec![row_group_index_field.clone()])
5635 .unwrap();
5636 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5637 let num_columns = metadata
5638 .metadata
5639 .file_metadata()
5640 .schema_descr()
5641 .num_columns();
5642
5643 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5644 .with_projection(ProjectionMask::none(num_columns))
5645 .build()
5646 .expect("reader with virtual columns only");
5647
5648 let batch = arrow_reader.next().unwrap().unwrap();
5649 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5650
5651 assert_eq!(batch.schema(), schema);
5652 assert_eq!(batch.num_columns(), 1);
5653 assert_eq!(batch.num_rows(), 5);
5654
5655 assert_eq!(
5656 batch
5657 .column(0)
5658 .as_primitive::<types::Int64Type>()
5659 .iter()
5660 .collect::<Vec<_>>(),
5661 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5662 );
5663 }
5664
5665 #[test]
5666 fn test_read_row_group_indices_with_selection() -> Result<()> {
5667 let mut buffer = Vec::new();
5668 let options = WriterProperties::builder()
5669 .set_max_row_group_row_count(Some(10))
5670 .build();
5671
5672 let schema = Arc::new(Schema::new(vec![Field::new(
5673 "value",
5674 ArrowDataType::Int64,
5675 false,
5676 )]));
5677
5678 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5679
5680 for i in 0..3 {
5682 let start = i * 10;
5683 let array = Int64Array::from_iter_values(start..start + 10);
5684 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5685 writer.write(&batch)?;
5686 }
5687 writer.close()?;
5688
5689 let file = Bytes::from(buffer);
5690 let row_group_index_field = Arc::new(
5691 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5692 );
5693
5694 let options =
5695 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5696
5697 let arrow_reader =
5699 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5700 .with_row_groups(vec![2, 1, 0])
5701 .build()?;
5702
5703 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5704 let combined = concat_batches(&batches[0].schema(), &batches)?;
5705
5706 let values = combined.column(0).as_primitive::<types::Int64Type>();
5707 let first_val = values.value(0);
5708 let last_val = values.value(combined.num_rows() - 1);
5709 assert_eq!(first_val, 20);
5711 assert_eq!(last_val, 9);
5713
5714 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5715 assert_eq!(rg_indices.value(0), 2);
5716 assert_eq!(rg_indices.value(10), 1);
5717 assert_eq!(rg_indices.value(20), 0);
5718
5719 Ok(())
5720 }
5721
5722 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5723 use_filter: bool,
5724 test_case: F,
5725 ) where
5726 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5727 {
5728 let seed: u64 = random();
5729 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5730 let mut rng = StdRng::seed_from_u64(seed);
5731
5732 use tempfile::TempDir;
5733 let tempdir = TempDir::new().expect("Could not create temp dir");
5734
5735 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5736
5737 let path = tempdir.path().join("test.parquet");
5738 std::fs::write(&path, bytes).expect("Could not write file");
5739
5740 let mut case = vec![];
5741 let mut remaining = metadata.file_metadata().num_rows();
5742 while remaining > 0 {
5743 let row_count = rng.random_range(1..=remaining);
5744 remaining -= row_count;
5745 case.push(RowSelector {
5746 row_count: row_count as usize,
5747 skip: rng.random_bool(0.5),
5748 });
5749 }
5750
5751 let filter = use_filter.then(|| {
5752 let filter = (0..metadata.file_metadata().num_rows())
5753 .map(|_| rng.random_bool(0.99))
5754 .collect::<Vec<_>>();
5755 let mut filter_offset = 0;
5756 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5757 ProjectionMask::all(),
5758 move |b| {
5759 let array = BooleanArray::from_iter(
5760 filter
5761 .iter()
5762 .skip(filter_offset)
5763 .take(b.num_rows())
5764 .map(|x| Some(*x)),
5765 );
5766 filter_offset += b.num_rows();
5767 Ok(array)
5768 },
5769 ))])
5770 });
5771
5772 let selection = RowSelection::from(case);
5773 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5774
5775 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5776 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5777 return;
5778 }
5779 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5780 .expect("Failed to concatenate");
5781 let values = actual
5783 .column(0)
5784 .as_primitive::<types::Int64Type>()
5785 .iter()
5786 .collect::<Vec<_>>();
5787 let row_numbers = actual
5788 .column(1)
5789 .as_primitive::<types::Int64Type>()
5790 .iter()
5791 .collect::<Vec<_>>();
5792 assert_eq!(
5793 row_numbers
5794 .into_iter()
5795 .map(|number| number.map(|number| number + 1))
5796 .collect::<Vec<_>>(),
5797 values
5798 );
5799 }
5800
5801 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5802 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5803 "value",
5804 ArrowDataType::Int64,
5805 false,
5806 )])));
5807
5808 let mut buf = Vec::with_capacity(1024);
5809 let mut writer =
5810 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5811
5812 let mut values = 1..=rng.random_range(1..4096);
5813 while !values.is_empty() {
5814 let batch_values = values
5815 .by_ref()
5816 .take(rng.random_range(1..4096))
5817 .collect::<Vec<_>>();
5818 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5819 let batch =
5820 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5821 writer.write(&batch).expect("Could not write batch");
5822 writer.flush().expect("Could not flush");
5823 }
5824 let metadata = writer.close().expect("Could not close writer");
5825
5826 (Bytes::from(buf), metadata)
5827 }
5828}